摘要:闡述了一種高性能分布式復雜消息處理引擎的設計方案,這種引擎改進了傳統(tǒng)復雜事件處理過程(CEP)處理引擎擴展性問題。新的設計方案通過將分布式無狀態(tài)數(shù)據(jù)處理節(jié)點與分布式存儲相結合,實現(xiàn)了復雜消息處理的規(guī)模和性能的線性擴展,同時避免了單點故障,保證了系統(tǒng)的高可靠性。
關鍵詞: 復雜事件處理;流式計算;M2M;滑動窗口;實時計算
Abstract: This paper describes a high-performance, distributed, complex event processing engine that improves the scalability of a traditional complex event processing engine. In the design of this new complex event processing (CEP), the stateless processing node is combined with distributed storage so that scale and performance can be linearly expanded. This design prevents single node failure and makes the system highly reliable.
Key words: CEP; stream processing; M2M; sliding window; real-time processing
中圖分類號:TP393.03 文獻標志碼:A 文章編號:1009-6868 (2013) 04-0058-05
隨著物聯(lián)網(wǎng)和移動互聯(lián)網(wǎng)的發(fā)展,整個世界已處于數(shù)據(jù)爆炸的進程中,這也導致了我們認識世界、處理數(shù)據(jù)的手段不斷進步。數(shù)年前,各種企業(yè)系統(tǒng)還是一個個的信息孤島,人們研究的重點在于獲取信息、打通孤島,這使得過去十年里人們一直熱衷于面向服務的體系結構(SOA)的研究。但在現(xiàn)在這個信息爆炸的時代,每個系統(tǒng)、每個人面臨的問題不再是無法獲取信息,而是如何能夠快速地從海量的信息中獲取有價值的內(nèi)容,并阻止無用的信息淹沒有價值的內(nèi)容。
物聯(lián)網(wǎng)和互聯(lián)網(wǎng)應用的一個共同特點是高并發(fā)、大數(shù)據(jù)量,海量消息系統(tǒng)不僅對消息處理的可靠性有一定的要求,對系統(tǒng)擴展性也有較高要求,希望能夠從每秒幾千次消息到上百萬次消息平滑擴展。
電信領域的應用場景采用的實時監(jiān)測用戶信令和行為的方法,例如用戶的每一次互聯(lián)網(wǎng)訪問請求、通話、短信、位置變更等信息都需要實時采集處理,并構建用戶的行為模型。這個量更加巨大,百萬人口的城市信令量就達到每秒數(shù)GB的量級,因此靠傳統(tǒng)的離線處理基本不可能完成。
目前主要有兩種海量實時數(shù)據(jù)處理方法:第1種方法是通過類似Map-reduce的方法進行在線采集、離線處理;第2種方法是事件流化,直接在內(nèi)存中進行海量數(shù)據(jù)的運算和處理。對于消息系統(tǒng),目前第1種方法有micro-mapreduce [1],它可以將Map-reduce粒度變小,周期縮短,這種方法實時性稍差(5 min-1 h),但能夠較好地處理可擴展性問題。第2種方法有現(xiàn)有開源的流式處理框架如S4,商用的產(chǎn)品如Oracle CEP[2]等,該方法能夠將相關數(shù)據(jù)載入內(nèi)存并進行計算,單機處理性能較高,但處理的可擴展性、容災容錯等存在一些問題,需要在前端進行數(shù)據(jù)分流,后端進行數(shù)據(jù)合并。
Storm[3] 提供了比較好的分布式解決方案,Storm集群有一個主節(jié)點和多個工作節(jié)點構成,工作節(jié)點與主節(jié)點通過Zookeeper協(xié)同工作。Storm本質(zhì)上是一個可靠的分布式消息處理引擎,以保證每條消息都能夠被處理。缺點在于其主節(jié)點存在單點問題,必須雙機HA 2,并且沒有時間窗機制,對于事件窗口,以及多路事件協(xié)同(例如發(fā)生事件A,如果同時過去30 s發(fā)生過事件B則生成新的事件C)沒有比較好的支持。
對于復雜事件處理(CEP)來說,提供良好的用戶使用界面非常有必要,常用的是使用類結構化查詢語言(SQL)的事件處理語言(EPL)來定義事件處理邏輯。Cayuga[4]和Borealis[5]在EPL處理以及事件的服務質(zhì)量(QoS)處理方面提供了很好的思路。
為達到可靠處理海量實時數(shù)據(jù)的目的,我們開發(fā)了一套全新的高性能分布式復雜消息處理引擎ZX-CEP,重點實現(xiàn)了以下一些能力:
·復雜事件數(shù)據(jù)的流式處理;
·高并發(fā),單機支持每秒十萬以上消息量,線性擴展能力較強;
·簡單的EPL消息處理編排以及圖形化處理流程編排;
·分布式計算,系統(tǒng)容量及處理能力的線性擴展;
·滑動事件窗口。
1 分布式流計算架構
從系統(tǒng)層面看,分布式流計算系統(tǒng)可以認為是一個處理黑盒,大量連續(xù)的數(shù)據(jù)流進入黑盒, 經(jīng)過處理后,轉換為特定的事件流輸出或傳送到其他系統(tǒng)再進行進一步處理。例如系統(tǒng)通過流式處理偵測到某種告警,可以生成告警事件通知自動維護程序進行故障修復操作,也可以將分析后的事件存儲到持久化存儲引擎以供后續(xù)分析處理。
流計算系統(tǒng)內(nèi)數(shù)據(jù)的流向本質(zhì)上是有向無環(huán)圖(如圖1所示),需要對數(shù)據(jù)進行多重處理的情況下,我們可以將一個流程的輸出作為另一個流程的輸入,實現(xiàn)多個流程的序列化處理。
分布式復雜消息處理引擎的架構如圖2所示。
該系統(tǒng)由幾個關鍵網(wǎng)元構成:數(shù)據(jù)預處理模塊、復雜消息處理模塊、輸出適配模塊、任務調(diào)度管理模塊。
1.1 數(shù)據(jù)預處理模塊
連續(xù)事件流傳送過來的是各種未經(jīng)過結構化處理的事件序列,然后再通過事件預處理模塊(如圖3所示)來實現(xiàn)原始事件的過濾、合并以及分流。
預處理模塊又分為兩部分:
·輸入數(shù)據(jù)適配器。該適配器用于接收原始事件序列并轉換為結構化事件,并按事件發(fā)生的先后順序送入本地消息隊列,等待數(shù)據(jù)預處理。按照輸入內(nèi)容不同,輸入適配器一般需要定制開發(fā)。經(jīng)過輸入適配器后,轉換為標準的消息體格式,包括消息源ID、消息發(fā)生時間戳、消息內(nèi)容K/V對象。對于無法量化的消息,我們還需要有一個元數(shù)據(jù)管理,將消息內(nèi)容進行量化處理映射。
·預處理操作。我們實現(xiàn)了對事件預處理的一些原子操作,如字段過濾原子、字段填充原子、事件過濾原子、事件合并原子以及事件拆分原子等,通過任務管理器我們實現(xiàn)了基本原子操作的規(guī)則定制以及動態(tài)加載?;驹硬僮骺梢詫嵗癁槎鄠€算子,各個算子按照定義好的規(guī)則進行連接,就可以實現(xiàn)對數(shù)據(jù)的預處理。各個算子的連接方式,可以通過圖形化編輯工具生成,也可以通過EPL語言條件解析產(chǎn)生。對算子操作進行管線化連接的好處是:可以隨時對基本算子進行各種串并聯(lián)操作,實現(xiàn)復雜的數(shù)據(jù)處理邏輯而不需要復雜代碼編寫。
在事件處理的過程中,輸入信號有可能產(chǎn)生一些超出正常幅度之外的噪音信息,但通過過濾操作我們能夠有效去除噪音,保留正常信號[6]。
1.2 復雜消息處理模塊
多個事件處理模塊偵聽同一個或多個窗口變更事件隊列,而空閑的事件處理模塊則會自動從隊列中獲取待處理事件。由于事件處理模塊本身是無狀態(tài)的,這樣就保證了我們可以隨時根據(jù)業(yè)務情況增加或減少事件處理模塊而不會影響到系統(tǒng)的運行。
分布式消息處理的關鍵有以下兩點:
(1)維護分布式消息隊列,從而保證事件的序列性。這點我們在DCache K/V系統(tǒng)中已經(jīng)實現(xiàn)[7],當然也可以用其他高性能的分布式消息隊列實現(xiàn)。如圖4所示,通過在分布式存儲內(nèi)維護一致的消息隊列,我們可以保證處理的分布式及消息處理的順序性。
(2)在分布式K/V系統(tǒng)內(nèi)維護統(tǒng)一的時間窗口。時間窗口由選舉出的主節(jié)點維護,這避免了各個節(jié)點由于時鐘不一致而導致的處理誤差。
1.3 輸出適配模塊
輸出適配模塊用于將系統(tǒng)處理結果轉換為特定的輸出動作或數(shù)據(jù)流。輸出適配模塊有兩個基本類型:消息輸出以及定期采樣輸出。當以數(shù)據(jù)流方式輸出時,輸出的數(shù)據(jù)流可以作為輸入流并由另一組規(guī)則進行后續(xù)處理。在這種場景中需要先根據(jù)不同緯度的情況進行分析,細粒度觀察5 min內(nèi)數(shù)據(jù)流情況,并輸出整合結果后形成粗粒度數(shù)據(jù)流,再進行更長時間段范圍內(nèi)的分析(如1 d)輸出的方式可以為文件、數(shù)據(jù)庫表或消息隊列。輸出適配模塊一般根據(jù)業(yè)務需要定制開發(fā)。
輸出適配模塊還有一個功能是時光穿梭,即當規(guī)則條件被觸發(fā)后,通過輸出適配模塊,我們可以紀錄下事件發(fā)生前后的系統(tǒng)各種相關消息狀況,并做鏡像持久化存儲,后續(xù)可以重放以便分析問題。
1.4 任務調(diào)度管理模塊
任務調(diào)度管理模塊的工作流程如圖5所示,主要有兩部分構成:
(1)規(guī)則的生成。我們可以通過兩種方式生成規(guī)則,一種是通過EPL的事件處理語句,動態(tài)定制生成任務圖;另外一種則是通過規(guī)則編輯器,以圖形界面方式生成事件處理邏輯。
(2)規(guī)則的調(diào)度執(zhí)行。在業(yè)務過程中,我們需要動態(tài)的規(guī)則加載,也即規(guī)則加載過程不能夠影響正常的處理過程。EPL適合比較簡單的規(guī)則場景,規(guī)則圖編輯則適合比較復雜的規(guī)則場景。為了提升效率,我們做了圖形化的規(guī)則編輯器,將規(guī)則圖生成后直接轉換為對應的代碼實現(xiàn)并實現(xiàn)了程序代碼的動態(tài)加載。
當指定一個滑動窗口將被適配新的規(guī)則時,存在如何匹配發(fā)生在規(guī)則生效前舊數(shù)據(jù)的問題。 在此我們定義了兩種實現(xiàn)策略:一種是新規(guī)則部署后我們將清空對應窗口數(shù)據(jù),但這可能會導致數(shù)據(jù)有一定時間中斷;另外一種策略是我們記錄新規(guī)則生效后的時間信息,在此期間內(nèi)新舊兩套規(guī)則同時計算,當新規(guī)則生效后的數(shù)據(jù)出棧后,我們才正式啟用新規(guī)則的計算結果,否則一直采用老規(guī)則計算結果。這可能造成的影響是僅當T =Tw +1(W為時間窗寬度)時間后新的規(guī)則才能夠生效。
2 滑動窗口設計及臟數(shù)據(jù)
污點傳播機制
通過高性能分布式消息隊列我們可以實現(xiàn)滑動事件窗?;瑒邮录暗亩x是一個唯一的事件序列,需要在系統(tǒng)中保留固定的時間長度或者固定數(shù)量的消息,隨著時間推移,仍保持在該事件序列內(nèi)的所有消息都維持在特定時間/長度范圍內(nèi)。因此滑動事件窗分為兩類,一類是滑動時間窗,所有事件都維持在特定的時間區(qū)間內(nèi);另一類是滑動空間窗,預先定義好窗體內(nèi)事件的容量,超出容量后的事件將自動出棧,如圖6中所示。
為了保證系統(tǒng)的分布式處理,我們采用了分布式K/V引擎來維護滑動事件窗口,這樣事件的一致性存儲就由分布式K/V引擎來保障。
事件序列的每個元素,以及入棧指針和出棧指針均作為鍵值對保存在分布式K/V引擎中,這樣我們就實現(xiàn)了分布式的滑動事件窗口存儲,如圖6所示。其中入棧指針和出棧指針我們使用了特定的同步操作模式來進行存取,保證了在分布式環(huán)境下的數(shù)據(jù)一致性。
每個事件進入事件窗,或者定時掃描發(fā)現(xiàn)是否有事件退出事件窗口,都會激發(fā)消息處理動作。 該動作會激發(fā)復雜消息處理模塊進行處理。為了保證處理的分布式,這里采用了消息隊列方式,實現(xiàn)拉模式的消息處理。滑動事件窗的進入、退出事件會生成窗口變更消息,該消息會進入另一個消息隊列等待復雜消息處理模塊響應處理。
為此我們構建了類似Aurora[9]的數(shù)據(jù)模型,并將時間窗的事件序列轉換成為增量事件序列。3種增量事件分別為:
·插入事件:(+,t)t為新增到事件窗口的事件對象;
·刪除事件:(-,t) t 為從事件窗口退出的事件對象;
·替換事件:(^,t 1,t 2)t 1為被替換事件對象,t 2為新事件對象。
通過處理增量事件,系統(tǒng)能夠有效避免經(jīng)常性的全局掃描事件窗,從而大大加速處理的進程。在事件信息中,我們還增加QoS標識,并發(fā)送到不同優(yōu)先級的隊列中,這樣可以保證高優(yōu)先級事件被優(yōu)先處理。我們利用分布式K/V存儲維護了事件狀態(tài)機以及全局計數(shù)器,在事件處理過程中,有效簡化了數(shù)據(jù)的處理邏輯。以最簡單的計算事件窗內(nèi)所有事件的平均值為例。普通方法是每次事件都需要重新計算時間窗內(nèi)所有事件的平均值:
而通過增量事件后,每次則需要計算:
如果采樣周期為1 s,事件窗則為5 min,則后一種的計算量就只有前一方式的1/300。
計算的分布式帶來一個額外的問題:對于復雜的計算,有可能涉及多個事件序列,因此多個事件隊列產(chǎn)生的事件并不一定由同一個事件處理器處理。在此我們引入了計算的污點數(shù)據(jù)傳播模型,以保證任何一個基礎事件帶來的信息更新都能夠及時引發(fā)后續(xù)處理節(jié)點的處理。
當涉及到某一個規(guī)則需要使用兩個或多個滑動窗口內(nèi)的數(shù)據(jù)時,因為我們的系統(tǒng)是分布式處理,就導致了有可能兩個滑動窗口產(chǎn)生的事件流并不是在同一個節(jié)點上進行的分析處理。 為此我們設計了分布式的污點數(shù)據(jù)傳播機制[5],保證一個規(guī)則數(shù)的各個處理節(jié)點都能夠對最終結果進行正確更新,即使并非在同一個節(jié)點完成的計算。如圖7 所示,灰色部分數(shù)據(jù)代表臟數(shù)據(jù),通過數(shù)據(jù)傳播機制來傳遞臟數(shù)據(jù)標識,從而保證所有數(shù)據(jù)及時得到更新計算。
整個規(guī)則樹可以認為是一個有向無環(huán)圖,變化后的數(shù)據(jù)經(jīng)過算子后,有可能影響后續(xù)處理節(jié)點,也有可能沒有影響。動態(tài)分析能夠查詢出所有被影響節(jié)點,靜態(tài)分析僅能夠分析出可信邊界,可信邊界外部數(shù)據(jù)可能是被污染,也可能是疑似污染,可信邊界內(nèi)部的數(shù)據(jù)可以確保是干凈的。
應用污點傳播算法,我們可以識別出受影響的節(jié)點,并應用算子重新計算受影響節(jié)點的后續(xù)數(shù)據(jù),而對于沒有受影響的數(shù)據(jù)則不需要重復計算。作為分布式系統(tǒng),每個數(shù)據(jù)以及算子,都有可能發(fā)生或保存在不同的物理節(jié)點中。
污點傳播的分析有靜態(tài)分析與動態(tài)分析兩種,本系統(tǒng)實現(xiàn)了編譯基本的靜態(tài)分析。當源數(shù)據(jù)發(fā)生改變時,系統(tǒng)可以分析出后續(xù)有向圖中所有被影響的節(jié)點,并標示出被影響數(shù)據(jù)。當需要獲取被影響數(shù)據(jù)進行計算時,根據(jù)被污染標示,可以進行前向逆推計算。這種情況下保證了整個系統(tǒng)的計算量最小。
污點檢查策略的首要任務就是分析可信邊界,污點檢查策略表示為一個由實體類型(type)、脆弱性描述(vul)、程序操作(op) 以及操作數(shù)位置(loc)組成的4元組:
[type,vul,op,loc] | type∈ROLES,vul ∈VUL _TYPES,op∈ ACTS,loc∈{N ∪ any }。
針對每個輸入變量,對應每個計算節(jié)點進行污染檢查,我們就可以整理出系統(tǒng)的污染傳播矩陣,如表1中所示。
對于多輸入變量環(huán)境,被污染節(jié)點和疑似污染節(jié)點是單輸入變量的并集。通過污點傳播算法,我們可以讓系統(tǒng)只在需要輸出數(shù)據(jù)的時間點對于臟數(shù)據(jù)節(jié)點進行數(shù)據(jù)更新計算操作,而不需要時刻全面更新系統(tǒng)數(shù)據(jù)節(jié)點,這樣能夠極大降低系統(tǒng)的計算量。
3 事件處理的QoS保障
增量事件消息隊列不止一個,根據(jù)QoS標識不同,不同優(yōu)先級別的消息會被放入不同的增量事件消息隊列。通過這種方式,我們能夠實現(xiàn)優(yōu)先處理高優(yōu)先級事件信息。
事件處理模塊優(yōu)先從高優(yōu)先級隊列獲取變更消息(如圖8所示),高優(yōu)先級隊列中沒有待處理信息后再從低優(yōu)先級隊列獲取信息。
事件處理模塊的處理結果可以暫存到DCache 中,或輸出到輸出隊列。對于暫存在DCache中的計算結果,另有一個采樣工具定期采樣并輸出到輸出隊列。例如我們需要統(tǒng)計某一個傳感器組過去1小時窗口內(nèi)平均值,并且每5分鐘報告一次,同時一旦發(fā)現(xiàn)某時刻讀數(shù)過高則需要馬上發(fā)送告警。這時事件處理模塊對于每一個新的輸入輸出事件,都會修改維護在DCache中的平均值對象。采樣程序每隔5分鐘從DCache中采樣該平均值數(shù)據(jù),并輸出到輸出隊列,對于讀數(shù)過高的數(shù)據(jù)則即時生成告警事件并放入輸出隊列。
4 結束語
文章描述了ZX-CEP分布式復雜消息處理引擎的設計及實現(xiàn),該引擎能夠高性能實時處理復雜的流式數(shù)據(jù)。我們首先基于數(shù)據(jù)與邏輯分離的原則對該系統(tǒng)進行了設計,數(shù)據(jù)存儲節(jié)點采用云存儲方式,保留多副本;數(shù)據(jù)處理節(jié)點采用無狀態(tài)節(jié)點,可以分布式動態(tài)進行擴展。該架構既保證了海量數(shù)據(jù)下的存儲可擴展性以及數(shù)據(jù)安全性,也保證了并行處理下的計算可擴展性。同時該架構還保證了任意一個節(jié)點故障對于系統(tǒng)業(yè)務正常處理沒有任何影響,流式計算仍然能夠持續(xù)進行而不會被中斷。
本架構依賴于分布式K/V 存儲以及構建于分布式K/V 之上的分布式消息隊列,并通過分布式消息隊列實現(xiàn)了跨節(jié)點共享的滑動時間窗。
我們展現(xiàn)了使用EPL語言實現(xiàn)對于數(shù)據(jù)處理邏輯的實時定制與加載機制。通過EPL完成基于基礎算子之上的復雜邏輯編排圖。由于分布式數(shù)據(jù)處理的特性,數(shù)據(jù)的分布式處理及存儲帶來了分布式邏輯運算的復雜性,因此我們引入了臟數(shù)據(jù)傳播機制,讓數(shù)據(jù)驅動處理邏輯。
未來我們將致力于進一步提升本系統(tǒng)的動態(tài)邏輯處理機制,讓邏輯判斷更加靈活,支持更加復雜的邏輯運算。同時我們將提升本系統(tǒng)的可維護性,確保能夠自動發(fā)現(xiàn)故障,并通過調(diào)整數(shù)據(jù)存儲及計算節(jié)點實現(xiàn)故障的自我修復。
參考文獻
[1] CONDIE T, ALVARO P, HELLERSTEIN J M,et al. MapReduce online[R]. UCB/EECS-2009-136.Berkeley,CA,USA:University of California, Berkeley,2009.
[2] 黃強, 增慶凱.基于信息流策略的污點傳播分析及動態(tài)驗證[J]. 軟件學報,2011, 22(9):2036-2048.
[3] 李新玉,黃忠東.基于CEP 的可持久化事件處理方案[J].計算機應用與軟件, 2010,27(12):151-153.
[4] CHERNIACK M, BALAKRISHNAN H,BALAZINSKA M,et al.Scalable distributed stream processing[C]//Proceedings of the 1st Biennial Conference on Innovative Data Systems Research (CIDR’03),Jan 5-8,2003, Asilomar, CA, USA. New York,NY,USA: ACM,2003:12p.
[5] ABADI D J,AHMAD Y,BALAZINSKA M,et al.The Design of the Borealis Stream Processing Engine[C]//Proceedings of the 2nd Biennial Conference on Innovative Data Systems Research (CIDR’05),Jan 4-7,2005, Asilomar, CA, USA.New York,NY,USA: ACM, 2003:13p.
[6] LUCKHAM D C, FRASCA B.Complex Event Processing in Distributed Systems[R]. CSL-TR-98-754.Stanford,CA,USA:Stanford University, 1998.
[7] BRENNA L,DEMERS A,GEHRKE J,et al.Cayuga: A High-Performance Event Processing engine[C]//Proceedings of the ACM SIGMOD International Conference on Management of Data(SIGMOD’07), Jun 11-14,2007,Beijing,China.New York,NY,USA:ACM,2007:1100-1102.
[8] Oracle cep[EB/OL].http://www.oracle.com/technologies/soa/complexevent-processing.html.
[9] 高洪.基于P2P網(wǎng)絡的分布式消息隊列[J]. 程序員, 2012(6):102-106.
作者簡介
陸平,東南大學畢業(yè);中國計算機協(xié)會CCF會員、服務計算專委;現(xiàn)任中興通訊業(yè)務研究院院長,負責業(yè)務軟件、多媒體終端、信令檢測、ICT等產(chǎn)品的研發(fā)以及互聯(lián)網(wǎng)、云計算、家庭網(wǎng)絡等新業(yè)務的研究;曾主持多項國家重點課題研究;發(fā)表論文10篇。
錢煜明,東南大學畢業(yè);中興通訊業(yè)務研究院總工程師,負責大數(shù)據(jù)處理、云計算、移動互聯(lián)網(wǎng)等方向系統(tǒng)架構及新技術研究;江蘇省雙創(chuàng)人才,主持多項國家重點課題研究;發(fā)表論文16篇。
朱科支,東南大學畢業(yè);現(xiàn)任中興通訊業(yè)務研究院產(chǎn)品經(jīng)理,負責移動互聯(lián)網(wǎng)及大數(shù)據(jù)處理相關產(chǎn)品研發(fā)及管理,對于大數(shù)據(jù)處理、搜索引擎、并行計算、移動終端管理等方面有深入研究。