劉婧妍,廖湖聲,高紅雨
(北京工業(yè)大學(xué)信息學(xué)部計(jì)算機(jī)學(xué)院,北京 100124)
在當(dāng)前的大數(shù)據(jù)背景下,很多領(lǐng)域需要對(duì)實(shí)時(shí)快速到來的數(shù)據(jù)(即流數(shù)據(jù))進(jìn)行分析。流數(shù)據(jù)往往單體價(jià)值較小,數(shù)據(jù)價(jià)值隨時(shí)間后移而降低。傳統(tǒng)對(duì)大數(shù)據(jù)的批量計(jì)算模式已很難滿足對(duì)流數(shù)據(jù)的實(shí)時(shí)性查詢需求。
復(fù)雜事件處理(Complex Event Processing, CEP)[1]是對(duì)流數(shù)據(jù)進(jìn)行即時(shí)處理的一種技術(shù)。復(fù)雜事件處理模型使用事件模型驅(qū)動(dòng)。在處理數(shù)據(jù)時(shí),使用事件間的因果關(guān)系、模式匹配等對(duì)事件進(jìn)行過濾,對(duì)事件做聚合映射,從而篩選出用戶感興趣的信息。復(fù)雜事件處理廣泛應(yīng)用于醫(yī)療[2-3]、物流[4-5]、傳感器網(wǎng)絡(luò)[6-8]等多個(gè)領(lǐng)域。
目前大多數(shù)的復(fù)雜事件處理語言都擁有基礎(chǔ)算子[9](選擇、映射、邏輯算子、序列化等),可供用戶實(shí)現(xiàn)相應(yīng)的復(fù)雜事件查詢功能。CEL語言是復(fù)雜事件處理系統(tǒng)Cayuga[10-11]上運(yùn)行的語言。CEL語言在傳統(tǒng)數(shù)據(jù)庫聲明性語言的基礎(chǔ)上擴(kuò)展序列、迭代算子,以應(yīng)對(duì)流數(shù)據(jù)的處理需求。CEL語言不支持窗口。EPL語言及其支持引擎Esper[12]支持對(duì)于XML類型數(shù)據(jù)的操作。它包含SQL中的所有算子,同時(shí)擁有連接、過濾、聚合算子。EPL語言支持自定義函數(shù)操作。但是其支持的函數(shù)只能處理同一類的所有數(shù)據(jù),比如滿足篩選條件的同一屬性數(shù)據(jù),而不能對(duì)其中的單個(gè)屬性進(jìn)行操作。以上的復(fù)雜事件處理語言都是針對(duì)流數(shù)據(jù)進(jìn)行批量處理,無法針對(duì)數(shù)據(jù)進(jìn)行細(xì)粒度的篩選和操作。XSeq語言及其系統(tǒng)[13-15]主要針對(duì)類型更加復(fù)雜的半結(jié)構(gòu)化數(shù)據(jù)(XML流數(shù)據(jù))進(jìn)行查詢。該語言可以處理基本的XPath查詢結(jié)構(gòu),并添加了順序約束和克林閉包。XSeq語言在基礎(chǔ)的類數(shù)據(jù)庫查詢的同時(shí),增加了部分內(nèi)置函數(shù)以提供聚合的功能。例如計(jì)數(shù)、求均值、平均值等。XSeq對(duì)于XML類型數(shù)據(jù)的查詢針對(duì)性較強(qiáng),但其僅支持對(duì)于批量的XML元素做統(tǒng)一的查詢,同時(shí)其描述相對(duì)復(fù)雜。CEStream是一種支持分布式流數(shù)據(jù)復(fù)雜事件處理的語言[16-17]。該語言主要針對(duì)XML數(shù)據(jù)進(jìn)行檢測,支持基礎(chǔ)的SQL算子,以及常見的流數(shù)據(jù)處理算子連接、過濾、聚合等,并增加了順序約束、克林閉包等操作。該語言提供了正規(guī)樹模式匹配功能,并且支持結(jié)構(gòu)連接,同時(shí)可以對(duì)多個(gè)事件源進(jìn)行組合處理,事件處理能力較強(qiáng)。以上的復(fù)雜事件處理語言都難以滿足對(duì)流數(shù)據(jù)做靈活的細(xì)粒度查詢功能。
為滿足復(fù)雜事件處理的細(xì)粒度處理需求,本文設(shè)計(jì)一種CEStream語言的自定義函數(shù)功能。通過定義函數(shù)形式參數(shù)的流模式匹配功能,實(shí)現(xiàn)對(duì)流數(shù)據(jù)標(biāo)簽的逐個(gè)處理。同時(shí),在自定義函數(shù)中給用戶提供對(duì)原有語句封裝、參數(shù)化變量,提高語言的可用性。本文主要工作為:
1)為復(fù)雜事件處理語言CEStream擴(kuò)展用戶自定義函數(shù)(User-Defined Function, UDF)功能。擴(kuò)展的UDF主要分為2類:支持形參模式匹配的細(xì)粒度處理UDF和參數(shù)化變量的UDF。
2)提出一種流模式匹配方法,使用戶自定義函數(shù)支持對(duì)流數(shù)據(jù)的細(xì)粒度檢測。
3)設(shè)計(jì)一種參數(shù)化變量的用戶自定義函數(shù)。該函數(shù)給用戶提供對(duì)原有查詢語句的封裝功能,增加語言的復(fù)用性和靈活性。
4)設(shè)計(jì)并實(shí)現(xiàn)CEStream語言的用戶自定義函數(shù)運(yùn)行系統(tǒng)。實(shí)驗(yàn)表明在完成相同功能查詢時(shí)使用自定義函數(shù),在提高語言可用性的同時(shí),吞吐量并未下降;支持細(xì)粒度檢測功能的自定義函數(shù)加強(qiáng)了CEStream復(fù)雜事件處理語言的檢測功能。
原有的復(fù)雜事件處理語言CEStream主要處理語句有構(gòu)造流數(shù)據(jù)、構(gòu)造模式這2種語句。在用戶使用該語言時(shí),每當(dāng)需要改變語句中的變量都需要對(duì)整個(gè)語句完全重寫,用戶體驗(yàn)不友好,同時(shí)容易造成程序代碼冗余。為解決這種問題,可以設(shè)計(jì)用戶自定義函數(shù)將原有語句放在函數(shù)內(nèi)并將變量參數(shù)化,使用戶在需要改動(dòng)變量值時(shí)改變調(diào)用函數(shù)的傳入?yún)?shù)即可。同時(shí),自定義函數(shù)的設(shè)計(jì)帶給該語言初步模塊化的功能,使用戶在求解問題時(shí)可以將問題分解為幾個(gè)模塊進(jìn)行分析。
由于各行業(yè)對(duì)于復(fù)雜事件處理語言處理數(shù)據(jù)的能力要求越來越高,原有針對(duì)流數(shù)據(jù)中每個(gè)事件進(jìn)行相同篩選、批量處理的操作已經(jīng)不能滿足使用者的需求。例如,股市、核電站警報(bào)等都需要復(fù)雜事件處理語言能夠?qū)?shù)據(jù)進(jìn)行一種上升趨勢的篩選和處理。具體的查詢案例如表1所示。
表1 查詢案例
對(duì)于表1案例中這種上升趨勢的判定,實(shí)際是前2個(gè)事件中的數(shù)據(jù)進(jìn)行比較,根據(jù)不同的比較結(jié)果,進(jìn)入不同的分支。這是對(duì)于復(fù)雜事件所進(jìn)行的細(xì)粒度處理,用CEStream原有語句對(duì)每一個(gè)事件都進(jìn)行相同篩選的操作是無法完成的。其他復(fù)雜事件處理語言中,只有Esper語言[12]支持這種細(xì)粒度的數(shù)據(jù)操作。但Esper語言對(duì)于這類操作的語言寫法相對(duì)復(fù)雜,對(duì)于遞增的n個(gè)連續(xù)事件,必須有n條語句與之對(duì)應(yīng)。因此有必要設(shè)計(jì)一種令CEStream語言支持對(duì)元素的逐個(gè)處理,可以擴(kuò)展一種支持形式參數(shù)模式匹配的自定義函數(shù)。該函數(shù)的形式參數(shù)中可以寫一種只匹配一個(gè)到幾個(gè)XML元素標(biāo)簽,同時(shí)可以嵌套匹配深層XML元素標(biāo)簽的模式匹配表達(dá)式。通過對(duì)XML數(shù)據(jù)流的頭部進(jìn)行匹配,達(dá)到對(duì)流數(shù)據(jù)進(jìn)行細(xì)粒度處理的效果。同時(shí),可以通過遞歸調(diào)用函數(shù)的形式,實(shí)現(xiàn)對(duì)流數(shù)據(jù)的整個(gè)處理過程。
在復(fù)雜事件處理語言CEStream中擴(kuò)展用戶自定義函數(shù)功能,給出一種通過UDF實(shí)現(xiàn)細(xì)粒度處理流數(shù)據(jù)的方法并增加該語言的靈活性。UDF提供流的參數(shù)模式匹配功能,使CEStream語言能夠支持對(duì)于一個(gè)到幾個(gè)XML標(biāo)簽流的匹配及操作,并通過對(duì)于XML標(biāo)簽內(nèi)容的篩選進(jìn)入不同的分支,提供細(xì)粒度檢測數(shù)據(jù)的方法;同時(shí)實(shí)現(xiàn)對(duì)CEStream原有創(chuàng)建模式、創(chuàng)建流語句的封裝,使原有變量參數(shù)化。
2.1.1 支持形式參數(shù)模式匹配的細(xì)粒度處理UDF
CEStream語言的用戶自定義函數(shù)為滿足對(duì)XML數(shù)據(jù)段的細(xì)粒度查找操作,使用函數(shù)重載的機(jī)制對(duì)每一個(gè)XML標(biāo)簽組做出篩選,并能夠根據(jù)其實(shí)際匹配的形式參數(shù)類型,實(shí)現(xiàn)對(duì)不同標(biāo)簽組,進(jìn)行不同處理的操作。
表2給出一個(gè)監(jiān)控核電站核心溫度的自定義函數(shù)案例。該案例所對(duì)應(yīng)的查詢需求如表1所示。為監(jiān)控核電站溫度是否過高,可通過定義函數(shù)tempUp、 tempUp1進(jìn)行處理。如表2中給出的函數(shù)定義以及形式參數(shù)注釋所示,其中tempUp為入口函數(shù),判斷第一個(gè)eve事件的溫度是否大于閾值$v,同時(shí)前2個(gè)eve事件是否后一個(gè)大于前一個(gè)。tempUp1函數(shù)為一組重載函數(shù),針對(duì)后續(xù)的邏輯進(jìn)行判斷。
表2 監(jiān)控核心溫度函數(shù)案例
函數(shù)tempUp有2個(gè)形式參數(shù),分別為流模式匹配表達(dá)式eve(t($a)$b) eve(t($c)$d) $e與閾值 $v。調(diào)用該函數(shù)時(shí),傳入的第一個(gè)流數(shù)據(jù)參數(shù)必須與流模式匹配表達(dá)式匹配。
重載函數(shù)tempUp1有2個(gè)同名函數(shù),第一個(gè)函數(shù)判斷結(jié)尾處是否符合最后一個(gè)事件的溫度大于第一個(gè)事件溫度的1.15倍;第二個(gè)函數(shù)判斷中間狀態(tài)是否一直符合遞增關(guān)系。這2個(gè)函數(shù)的形式參數(shù)列表有部分不同,下面逐個(gè)說明第一個(gè)函數(shù)定義的形式參數(shù)含義:常量匹配表達(dá)式0,閾值$v,首個(gè)事件溫度$t1st,已得到的流數(shù)據(jù)頭部$h,流模式匹配表達(dá)式eve(t($a)$b)$e。當(dāng)調(diào)用函數(shù)tempUp1時(shí),根據(jù)傳入?yún)?shù)成功匹配的形參,進(jìn)行調(diào)用。
用戶可以通過自定義函數(shù)將一個(gè)或多個(gè)功能封裝起來,對(duì)外提供調(diào)用接口。在為復(fù)雜事件處理語言CEStream增加用戶自定義函數(shù)功能時(shí),可以考慮將一些固定的模式匹配語句、創(chuàng)建流語句放在函數(shù)內(nèi)。也可將其中一些調(diào)用函數(shù)時(shí)經(jīng)常更改的設(shè)定值作為函數(shù)的參數(shù)傳入,設(shè)計(jì)一種參數(shù)化模式的函數(shù)。
例如復(fù)雜事件流處理中的典型應(yīng)用火情檢測,這種參數(shù)化模式的函數(shù),可使得處理操作更便捷,對(duì)火情檢測的細(xì)微調(diào)整等更加方便。表3給出了通常情況下的檢測案例:30 s內(nèi)出現(xiàn)連續(xù)3次溫度檢測值大于80 ℃,則發(fā)出一條報(bào)警消息。但由于環(huán)境不同,檢測設(shè)定的溫度閾值可能改變。使用表3所示的函數(shù)調(diào)用改變調(diào)用時(shí)傳入的參數(shù),即可改變溫度閾值。
表3 火情檢測案例
參數(shù)化模式的用戶自定義函數(shù),使CEStream語言更進(jìn)一步模塊化。它在實(shí)現(xiàn)對(duì)正規(guī)樹模式匹配、事件流模式封裝的同時(shí),使用參數(shù)化模式的方式,使得在調(diào)用該函數(shù)時(shí)對(duì)函數(shù)內(nèi)部的模式匹配標(biāo)簽以及創(chuàng)建流的時(shí)間窗口大小等變量可以進(jìn)行調(diào)整。使用CEStream語言的用戶可以實(shí)現(xiàn)自定義函數(shù),并在調(diào)用函數(shù)時(shí)傳入不同參數(shù),以達(dá)到不同的篩選效果。
以下是復(fù)雜事件處理語言CEStream擴(kuò)展的UDF的核心語法以及相關(guān)內(nèi)置函數(shù)設(shè)置的描述。
1)CEStream語句語法。
正規(guī)樹模式定義語法與事件流定義語法如表4與表5所示。
表4 正規(guī)樹模式定義語法
表5 事件流定義語法
在擴(kuò)展CEStream語言時(shí),將原有的pd(正規(guī)樹模式匹配)、sd(事件流模式)以及新增的賦值語句統(tǒng)稱為語句(statement)作為用戶可以書寫的語句,并擴(kuò)展變量(ID)、常量(PRIMITIVE_TYPE_CONST)、函數(shù)調(diào)用(func_call)、四則運(yùn)算表達(dá)式、條件判斷表達(dá)式這5種表達(dá)式。其中四則運(yùn)算表達(dá)式、條件判斷表達(dá)式不再給出詳細(xì)的文法規(guī)定。下面是CEStream語句語法。
statement_list→statement_list ‘,’ statement | empty
statement→pd | sd | ID=expression ‘;’
expression→ID | CONST | func_call
| arithmetic_expr | if_expr
|expression‘,’expression
func_call→ID ‘(’ expression [‘,’ expression] ‘)’
2)UDF定義語法。
fd→define function ID ‘(’ paramlist‘)’ ‘{’ func_body ‘}’
paramlist→paramlist ‘,’ param | param
param→CONST | arithmetic_expr
| pattern_match
pattern_match→ID | NULL | TAG(pattern_match) pattern_match
func_body→[ statement ] return expression
用戶自定義函數(shù)定義(fd)中包含函數(shù)名、形式參數(shù)以及函數(shù)體這3部分。其中形式參數(shù)(param)為基礎(chǔ)類型常量(CONST)或流模式匹配表達(dá)式(pattern_match)或四則運(yùn)算表達(dá)式。函數(shù)體(func_body)包括零個(gè)或多個(gè)CEStream語句(statement)以及一個(gè)返回值表達(dá)式。
當(dāng)調(diào)用自定義函數(shù)時(shí),通過傳入的實(shí)際參數(shù)與函數(shù)定義中形式參數(shù)相應(yīng)位置的流模式匹配表達(dá)式是否匹配決定是否調(diào)用該函數(shù)。對(duì)于函數(shù)重載形式的同名函數(shù),根據(jù)順序依次對(duì)函數(shù)調(diào)用語句的實(shí)參和函數(shù)定義的形參進(jìn)行匹配,實(shí)參和形參全部匹配成功則確定調(diào)用語句調(diào)用的是該函數(shù)。
3)UDF細(xì)粒度查詢內(nèi)置函數(shù)描述。
對(duì)于流數(shù)據(jù)的處理,可以使用內(nèi)置函數(shù)。內(nèi)置函數(shù)操作流數(shù)據(jù)共分為4種:head、 tail、 isnull、 cons。這4種內(nèi)置函數(shù)操作,分別針對(duì)流數(shù)據(jù)取頭、取尾、判斷變量是否為空、連接2個(gè)數(shù)據(jù)段。
可以作為內(nèi)置函數(shù)參數(shù)的語法結(jié)構(gòu)為:符合內(nèi)置函數(shù)輸入?yún)?shù)類型約定的用戶自定義函數(shù)調(diào)用語句,內(nèi)置函數(shù),流模式匹配表達(dá)式,變量。
系統(tǒng)設(shè)計(jì)部分主要對(duì)于自定義函數(shù)的處理進(jìn)行說明,本系統(tǒng)分為編譯模塊、查詢預(yù)處理模塊、數(shù)據(jù)初步處理模塊、集群這4部分。其中編譯模塊增加對(duì)語句、自定義函數(shù)定義、調(diào)用的編譯功能;查詢預(yù)處理模塊區(qū)分細(xì)粒度處理UDF和參數(shù)化模式的UDF;數(shù)據(jù)預(yù)處理模塊分別對(duì)2種函數(shù)調(diào)用做不同處理,實(shí)現(xiàn)新增的細(xì)粒度數(shù)據(jù)處理功能;集群對(duì)數(shù)據(jù)進(jìn)行后續(xù)篩選工作,并輸出最后的查詢結(jié)果。
圖1為該系統(tǒng)的活動(dòng)圖,說明了各個(gè)模塊的工作流程和模塊之間的關(guān)系。
圖1 自定義函數(shù)處理系統(tǒng)活動(dòng)圖
1)編譯模塊是系統(tǒng)最初接收用戶輸入的CEP語言的模塊,該模塊做下列操作:
①對(duì)CEP語言進(jìn)行語法分析及語義分析,生成查詢計(jì)劃。
②將查詢計(jì)劃發(fā)送給查詢預(yù)處理模塊。
2)查詢預(yù)處理模塊從編譯模塊接收查詢計(jì)劃,進(jìn)行下列操作:
①接收查詢計(jì)劃,對(duì)其進(jìn)行預(yù)處理。區(qū)分2種不同的自定義函數(shù)。對(duì)包含原有語句的參數(shù)化模式函數(shù)做預(yù)處理,對(duì)每一個(gè)數(shù)據(jù)源生成一個(gè)單源查詢計(jì)劃。對(duì)支持形參模式匹配的細(xì)粒度處理函數(shù),將相互之間有調(diào)用關(guān)系的函數(shù)關(guān)聯(lián),合并生成細(xì)粒度函數(shù)查詢計(jì)劃。
②根據(jù)函數(shù)調(diào)用,分析變量間的綁定關(guān)系,并將其與查詢計(jì)劃發(fā)送到集群中做初始化操作。
③根據(jù)調(diào)用函數(shù)需要的數(shù)據(jù)源信息,將數(shù)據(jù)源和對(duì)應(yīng)的數(shù)據(jù)處理模塊連接,并發(fā)送查詢計(jì)劃給其對(duì)應(yīng)的數(shù)據(jù)處理模塊。
3)數(shù)據(jù)初步處理模塊接收查詢預(yù)處理模塊生成的查詢計(jì)劃,做以下操作:
①對(duì)單源查詢計(jì)劃生成模式樹,對(duì)實(shí)時(shí)到來的XML數(shù)據(jù)流進(jìn)行正規(guī)樹模式匹配并不斷輪詢。
②對(duì)細(xì)粒度函數(shù)查詢計(jì)劃,根據(jù)函數(shù)調(diào)用形參中的流模式匹配表達(dá)式,以及函數(shù)內(nèi)的語句對(duì)XML流數(shù)據(jù)進(jìn)行篩選。該部分采用惰性求值的方法,對(duì)需要值的部分求值并輸出,后續(xù)的流數(shù)據(jù)以閉包形式保存,以解決對(duì)與流數(shù)據(jù)遞歸調(diào)用的問題。
4)集群使用接收到的查詢計(jì)劃做初始化操作,并在得到數(shù)據(jù)流后對(duì)其進(jìn)行匹配、篩選等操作,輸出最終的查詢結(jié)果。
本章通過實(shí)驗(yàn)分析用戶自定義函數(shù)的性能以及功能擴(kuò)展。實(shí)驗(yàn)主要從自定義函數(shù)中使用原有語句時(shí)對(duì)查詢性能的影響,以及其增加的細(xì)粒度數(shù)據(jù)處理功能這2個(gè)方面進(jìn)行。主要分析:1)在完成相同功能的查詢時(shí),由自定義函數(shù)封裝的語言和未封裝的語言之間的性能差異;2)用戶自定義函數(shù)對(duì)原有復(fù)雜事件處理查詢語言的功能擴(kuò)展。本實(shí)驗(yàn)的數(shù)據(jù)源為網(wǎng)站服務(wù)器中各種XML格式的記錄日志。本章所有實(shí)驗(yàn)的軟件環(huán)境為JDK1.8,硬件環(huán)境為Intel Xeon E5-1607 3.0 GHz、 6 GB內(nèi)存的PC機(jī)。
表6是本實(shí)驗(yàn)的測試案例。案例Q1.1、 Q1.2是使用原有查詢語句進(jìn)行查詢的案例,作為對(duì)照組。案例Q2.1~Q2.4使用用戶自定義函數(shù)進(jìn)行查詢,其中Q2.1的查詢效果和案例Q1.1相同,Q2.2~Q2.4的查詢效果和案例Q1.2相同。實(shí)驗(yàn)通過使用用戶自定義函數(shù)前后的性能對(duì)比,判斷調(diào)用自定義函數(shù)對(duì)查詢性能的影響。
表6 測試案例
如圖2與圖3所示,單個(gè)數(shù)據(jù)源輸入案例Q2.1和原有語句的對(duì)照組Q1.1在進(jìn)行檢測時(shí),吞吐量均為24000(事件數(shù)/s)左右;多個(gè)數(shù)據(jù)源輸入的案例中,使用UDF的Q2.2~Q2.4和其對(duì)照組Q1.2進(jìn)行檢測,吞吐量為19500(事件數(shù)/s)左右。實(shí)驗(yàn)表明,對(duì)于相同的查詢需求,使用自定義函數(shù)來實(shí)現(xiàn),在增加程序的可擴(kuò)展性的同時(shí),對(duì)查詢效率的影響很小。
圖2 單源案例吞吐量對(duì)比
圖3 多源案例吞吐量對(duì)比
另外,為了測試用戶自定義函數(shù)為CEStream復(fù)雜事件處理語言增加的細(xì)粒度查詢能力,在各個(gè)需要復(fù)雜事件處理的行業(yè)中,篩選出100個(gè)需要對(duì)數(shù)據(jù)進(jìn)行細(xì)粒度處理的場景進(jìn)行功能性測試。這些場景包括股票分析、環(huán)境溫度檢測、故障檢測等各個(gè)行業(yè)。實(shí)驗(yàn)證明用戶自定義函數(shù)可以很好地支持這些細(xì)粒度查詢要求。
本文提出了一種為復(fù)雜事件處理語言增加用戶自定義函數(shù)功能的方案,并給出了自定義函數(shù)部分的語法設(shè)計(jì)及其運(yùn)行系統(tǒng)。用戶自定義函數(shù)使復(fù)雜事件處理語言增加了細(xì)粒度處理流數(shù)據(jù)的能力,并使用戶能夠通過函數(shù)對(duì)查詢功能進(jìn)行封裝。