耿少峰,王永恒,李仁發(fā),張佳
(1. 湖南大學(xué)信息科學(xué)與工程學(xué)院,湖南 長沙 410082;2. 集美大學(xué)計(jì)算機(jī)工程學(xué)院,福建 廈門 361021)
主動式復(fù)雜事件處理方法的研究
耿少峰1,2,王永恒1,李仁發(fā)1,張佳2
(1. 湖南大學(xué)信息科學(xué)與工程學(xué)院,湖南 長沙 410082;2. 集美大學(xué)計(jì)算機(jī)工程學(xué)院,福建 廈門 361021)
在CPS的應(yīng)用背景下,對傳感器和控制設(shè)備產(chǎn)生的不確定性事件流進(jìn)行分析和處理得出高層事件,然后在此基礎(chǔ)上引入適應(yīng)性動態(tài)貝葉斯網(wǎng)絡(luò)和并行馬爾可夫決策過程模型來支持主動式的復(fù)雜事件處理。針對大型CPS中馬爾可夫決策過程存在的狀態(tài)數(shù)量巨大的問題,引入狀態(tài)劃分和報(bào)酬分解的方法來進(jìn)行并行優(yōu)化。在模擬交通網(wǎng)絡(luò)的環(huán)境中,實(shí)驗(yàn)結(jié)果顯示所提方法能有效地處理事件流,并具有良好的可擴(kuò)展性。
信息物理融合系統(tǒng);大數(shù)據(jù);復(fù)雜事件處理;馬爾可夫決策過程
信息物理融合系統(tǒng)(CPS, cyber physical system)將智能計(jì)算、通信和控制技術(shù)深度融入到物理系統(tǒng),使計(jì)算資源與物理資源緊密結(jié)合與協(xié)調(diào)分配,完成信息世界與物理世界的融合。它的應(yīng)用領(lǐng)域非常廣泛,小到智能家庭網(wǎng)絡(luò),大到工業(yè)控制系統(tǒng),覆蓋了各種不同規(guī)模的應(yīng)用系統(tǒng)。如何有效地處理 CPS應(yīng)用中所產(chǎn)生的大數(shù)據(jù)是人們目前需要面對的主要問題。當(dāng)前在大數(shù)據(jù)研究領(lǐng)域,對于歷史數(shù)據(jù)的分析和處理已經(jīng)進(jìn)行了廣泛深入的研究,特別是在數(shù)據(jù)倉庫和數(shù)據(jù)挖掘方面。常用方法之一是將大量的服務(wù)器組成集群或使用專業(yè)的云計(jì)算平臺(如亞馬遜的AWS)以及基于Hadoop的分布式計(jì)算框架。另外一種近期常見的計(jì)算平臺是Spark,它突破了 MapReduce的算法框架,支持更靈活的數(shù)據(jù)處理方式,性能也有了很大的提升。但這些方式并不適合于 CPS的數(shù)據(jù)處理。因?yàn)?CPS在工作時(shí)產(chǎn)生的是實(shí)時(shí)的數(shù)據(jù)流,對這些數(shù)據(jù)流的處理往往有較高要求的時(shí)間約束。而 Hadoop/MapReduce的計(jì)算框架僅適用于處理靜態(tài)數(shù)據(jù)和歷史數(shù)據(jù)。近期也出現(xiàn)了一些面向數(shù)據(jù)流的處理平臺,如Spark streaming、Storm和Flink。這些平臺能夠高效地處理數(shù)據(jù)流,但僅限于比較簡單的過濾、聚合等操作,無法直接支持面向CPS的復(fù)雜數(shù)據(jù)流處理。
在 CPS應(yīng)用中,系統(tǒng)必須處理來自無線傳感網(wǎng)、RFID閱讀器、GPS、社會媒體等各種各樣設(shè)備的信號,這些信號可以被當(dāng)成事件來處理。這些由設(shè)備直接生成的事件稱為原始事件。一般情況下原始事件里的語義信息非常有限,在實(shí)際應(yīng)用中,人們更加關(guān)注的是業(yè)務(wù)邏輯和規(guī)則等高級別信息。如在裝有智能門禁的小區(qū)門口有車輛通行時(shí),RFID閱讀器會進(jìn)行讀操作來生成原始的事件,但只有“車輛進(jìn)入或離開小區(qū)”這樣的復(fù)雜事件才是人們真正關(guān)心的,可以根據(jù)一些模式將許多原始事件結(jié)合在一起,從而得到對用戶有意義的復(fù)雜事件。CPS應(yīng)用系統(tǒng)先將業(yè)務(wù)邏輯轉(zhuǎn)換成復(fù)雜事件,再通過處理復(fù)雜事件來檢測業(yè)務(wù)邏輯。復(fù)雜事件處理(CEP,complex event processing)[1]用于處理數(shù)量巨大的簡單事件,并從中得到有價(jià)值的高層次的信息。
在某些CPS的應(yīng)用中,對象執(zhí)行的操作可以改變系統(tǒng)的狀態(tài)。當(dāng)前主流的事件處理方法為響應(yīng)式的,這意味著動作是由系統(tǒng)狀態(tài)發(fā)生變化觸發(fā)。主動式事件處理系統(tǒng)通過運(yùn)用預(yù)測和自動決策技術(shù)[2]減輕或避免將來不希望的事件發(fā)生。例如,在一個(gè)CPS的交通網(wǎng)絡(luò)中,可以根據(jù)當(dāng)前的狀態(tài)和歷史數(shù)據(jù)來預(yù)測道路的擁擠狀態(tài),然后采取一些行動以避免將來可能出現(xiàn)的擁堵。馬爾可夫決策過程(MDP,Markov decision process)作為一個(gè)最佳決策過程中的隨機(jī)動態(tài)系統(tǒng),多年來人們已對其進(jìn)行了深入的研究,并在多個(gè)領(lǐng)域中得到了廣泛的應(yīng)用。
目前,基于MDP實(shí)現(xiàn)主動式復(fù)雜事件處理還面臨著巨大的挑戰(zhàn)。首先是如何有效地處理由于誤讀、干擾等原因產(chǎn)生的事件的不確定性。當(dāng)前的研究主要限于通過信息融合和推理來解決單個(gè)事件的不確定性,但缺乏對復(fù)雜事件不確定性的研究。另一個(gè)問題是如何進(jìn)行高效準(zhǔn)確的狀態(tài)預(yù)測。盡管人們已經(jīng)對數(shù)據(jù)流預(yù)測進(jìn)行了大量的研究,但針對CPS產(chǎn)生的高速、海量數(shù)據(jù)流,預(yù)測模型和算法上都還面臨挑戰(zhàn)。同時(shí),在決策支持方面,目前很少有文獻(xiàn)涉及如何將MDP融入到主動式復(fù)雜事件處理中,以及如何應(yīng)對在此過程中MDP所產(chǎn)生的巨大的狀態(tài)空間。
針對上述問題,本文提出一種主動式復(fù)雜事件處理的架構(gòu)和方法。該方法以交通網(wǎng)絡(luò)為背景,將樹的匹配和緩沖鏈表相結(jié)合,高效地處理了大量由車輛產(chǎn)生的不確定事件流,然后將處理后的復(fù)雜事件提交給適應(yīng)性動態(tài)貝葉斯網(wǎng)絡(luò)(ADBN, adaptive dynamic Bayesian network)來預(yù)測未來事件和系統(tǒng)狀態(tài),最后主動式響應(yīng)器根據(jù) ADBN的預(yù)測結(jié)果,利用一種基于狀態(tài)劃分和并發(fā)操作的新型并行MDP方法進(jìn)行分析,最終產(chǎn)生避免交通擁堵的決策。
CEP通過監(jiān)測不斷產(chǎn)生的數(shù)據(jù)流,根據(jù)每個(gè)事件發(fā)生的集合/序列識別復(fù)雜事件,然后對檢測到的狀態(tài)做出回應(yīng)。Etzion等[3]在其著作中定義了復(fù)雜事件處理的基本概念及體系結(jié)構(gòu)。一個(gè)事件處理agent(EPA, event processing agent)執(zhí)行某些處理邏輯,從輸入事件中提取出復(fù)雜事件。多個(gè)EPA連接在一起可以執(zhí)行更復(fù)雜的層次化事件處理,這種網(wǎng)絡(luò)稱為事件處理網(wǎng)絡(luò)。
在以前的主動數(shù)據(jù)庫研究中,人們已經(jīng)對 CEP做了大量的工作。常用的方法都是基于固定的數(shù)據(jù)結(jié)構(gòu),如樹、圖、自動機(jī)和 Petri網(wǎng)。這些方法很難靈活地對事件查詢語言的實(shí)現(xiàn)進(jìn)行優(yōu)化,也很難根據(jù)應(yīng)用需求的變化來支持查詢語言的擴(kuò)展。針對這些問題,Eugene等[4]提出了一種基于查詢規(guī)劃的復(fù)合事件檢測方法(SASE)。SASE基于有限自動機(jī)(NFA)和堆棧來檢測復(fù)雜事件,并對大的滑動窗口和大的中間結(jié)果集進(jìn)行了優(yōu)化,實(shí)現(xiàn)了比較好的性能和可擴(kuò)展性。近年來的很多工作圍繞著對SASE的改進(jìn)展開,AGRAWAL等[5]提出了改進(jìn)的有限狀態(tài)機(jī)模型來提供更豐富的查詢能力,Zhang等[6]對SASE進(jìn)行了改進(jìn),提供了對非精確時(shí)標(biāo)的事件流的支持。
由于傳感器噪聲等因素,CPS事件具有不確定性,這種不確定性通常使用概率來表示。處理概率性原始數(shù)據(jù)的CEP普遍采用的模型有隱馬爾可夫模型、動態(tài)貝葉斯網(wǎng)絡(luò)和條件隨機(jī)域等。近年來也出現(xiàn)了一些基于有限狀態(tài)機(jī)的概率復(fù)雜事件處理方法。文獻(xiàn)[7]基于NFA設(shè)計(jì)了一種新的數(shù)據(jù)結(jié)構(gòu)——鏈接實(shí)例隊(duì)列(chain instance queues),并采用一種條件概率索引樹來提高搜索性能。另外,文獻(xiàn)[8]面向NFA提出一種優(yōu)化方法,不僅能夠計(jì)算復(fù)雜事件的概率,還能夠?qū)Σ豢尚啪W(wǎng)絡(luò)產(chǎn)生的數(shù)據(jù)獲取復(fù)雜模式的可信度?;谟邢逘顟B(tài)機(jī)的概率復(fù)雜事件處理,比較有代表性的研究項(xiàng)目是美國華盛頓大學(xué)的Cascadia系統(tǒng)[9]和Laha系統(tǒng)[10]。Cascadia系統(tǒng)基于概率數(shù)據(jù)庫中的可能世界模型(possible world model),實(shí)現(xiàn)了對概率事件流和歷史概率事件的查詢,但沒有在預(yù)處理中考慮流上相鄰事件間的時(shí)間關(guān)聯(lián)性。Laha成功支持了關(guān)聯(lián)概率事件流上的高效查詢,并支持更復(fù)雜的事件代數(shù)操作,從而能支持更靈活的復(fù)雜事件處理。
主動式系統(tǒng)的研究已經(jīng)持續(xù)了多年,如主動式運(yùn)輸管理系統(tǒng)[11]、面向即時(shí)無線網(wǎng)絡(luò)的主動型路由[12]等。這些研究總體上代表了一種由響應(yīng)式系統(tǒng)向主動式系統(tǒng)轉(zhuǎn)變的潮流。在復(fù)雜事件處理這個(gè)領(lǐng)域,對主動式方法的研究還比較少。但當(dāng)前CPS的發(fā)展已經(jīng)對這種研究提出了需求,也提供了研究的基礎(chǔ)條件。Engel等[13,14]提出了一種基于事件的主動式計(jì)算框架,該框架主要由復(fù)雜事件處理網(wǎng)絡(luò)、預(yù)測器和基于馬爾可夫決策過程的主動式響應(yīng)器構(gòu)成。該工作對傳統(tǒng)的復(fù)雜事件處理agent進(jìn)行了擴(kuò)展,支持2種新型的agent:預(yù)測agent用于預(yù)測系統(tǒng)狀態(tài);主動式響應(yīng)agent用于計(jì)算合適的響應(yīng)并執(zhí)行。但它的應(yīng)用背景不是面向CPS,也沒有考慮大規(guī)模的數(shù)據(jù)和狀態(tài)的問題。
多年來,人們對MDP進(jìn)行了廣泛的研究和應(yīng)用,產(chǎn)生了多個(gè)變種。對于大型CPS應(yīng)用這個(gè)領(lǐng)域,最大的挑戰(zhàn)在于巨大的狀態(tài)空間造成的組合爆炸問題。對這個(gè)問題當(dāng)前主要有2個(gè)研究方向。第一類是根據(jù)問題領(lǐng)域中的信息來簡化結(jié)構(gòu),如狀態(tài)聚集的方法[15]和時(shí)間聚集的方法[16]等。此類方法的問題和具體的系統(tǒng)相關(guān),無法找到普遍的解決方法。第二類是近似計(jì)算的方法,如近似動態(tài)規(guī)劃[17]和選擇性近似[18]等方法。這類方法的關(guān)鍵是如何在優(yōu)化過程中對值函數(shù)進(jìn)行近似。當(dāng)應(yīng)用到大型CPS的事件處理領(lǐng)域,MDP的規(guī)模更大且具備一些新的特征,其具體模型和大規(guī)模計(jì)算問題的解決仍然是一個(gè)挑戰(zhàn)。
圖1 系統(tǒng)架構(gòu)
本文方法的總體結(jié)構(gòu)如圖1所示。CPS中各類傳感器及RFID等設(shè)備產(chǎn)生的信號,匯聚在一起稱為原始事件流。由于設(shè)備本身的精度及干擾等原因,這些事件不是完全精確的,稱為概率事件。這些原始事件經(jīng)過概率復(fù)雜事件處理引擎的處理,提取出有意義的高層事件,也就是復(fù)雜事件。其中,PEPA(probabilistic event processing agent)為概率事件處理器[19],它負(fù)責(zé)處理由底層設(shè)備產(chǎn)生的原始事件流,并最終生成概率復(fù)雜事件。查詢規(guī)劃器根據(jù)復(fù)雜事件查詢語句使用一個(gè)或多個(gè) PEPA來完成查詢。多個(gè) PEPA組合在一起構(gòu)成的網(wǎng)絡(luò)稱為概率事件處理網(wǎng)絡(luò)(PEPN, probabilistic event processing network)。本系統(tǒng)中預(yù)測分析器(PA, predictive analytics)的實(shí)現(xiàn)方法是基于Pascale等[20]的工作,通過使用一種適應(yīng)性動態(tài)貝葉斯網(wǎng)絡(luò)模型,根據(jù)輸入的復(fù)雜事件來預(yù)測道路網(wǎng)絡(luò)中可能會發(fā)生擁堵的節(jié)點(diǎn)。復(fù)雜事件被存儲在事件數(shù)據(jù)庫中,PA可以通過學(xué)習(xí)這些歷史事件獲得更準(zhǔn)確的預(yù)測。主動式響應(yīng)器根據(jù) PA預(yù)測出來的系統(tǒng)未來狀態(tài),利用并行MDP產(chǎn)生避免或減弱這些狀態(tài)發(fā)生的決策。PRA(proactive agent)為執(zhí)行器,負(fù)責(zé)執(zhí)行對應(yīng)的決策動作。上下文可以從復(fù)雜事件中獲取,也可以從其他信息系統(tǒng)獲取,并影響系統(tǒng)的狀態(tài)預(yù)測和控制。
定義1 概率原始事件CPS事件流中的原始事件即在某個(gè)時(shí)間的一次發(fā)生(一次信號讀?。8怕试际录硎緸椋簂t;ID, A, T, Prgt;。其中,ID為產(chǎn)生數(shù)據(jù)的設(shè)備標(biāo)簽號,A為事件屬性的集合,T為事件發(fā)生的時(shí)標(biāo)。Pr為事件發(fā)生的概率值,它代表由原始信號轉(zhuǎn)換為確切的CPS事件的概率。
定義 2 概率復(fù)雜事件是由原始事件或者復(fù)雜事件按照一定的運(yùn)算規(guī)則形成的事件。一個(gè)概率復(fù)雜事件表示為:lt;E, R, Tc, Prgt;。其中,E是符合合成規(guī)則的概率原始事件串,R為事件合成規(guī)則,Tc為事件的時(shí)間跨度,Pr為事件概率值。
事件類型由一組具有相同語義和結(jié)構(gòu)的事件規(guī)則構(gòu)成,每個(gè)事件對象都是一個(gè)事件類型的實(shí)例。一個(gè)事件類型可以表示為原始或復(fù)雜事件。主要的復(fù)雜事件模式包括 ALL、ANY、COUNT和SEQ[3]等。如在交通系統(tǒng)中,COUNT模式用來計(jì)算規(guī)定時(shí)間內(nèi)某一地點(diǎn)經(jīng)過車輛的數(shù)量。把這些模式結(jié)合起來可以構(gòu)建層次性的復(fù)雜模式。
由于RFID閱讀器等設(shè)備存在漏讀、誤讀和臟讀等情況,所以導(dǎo)致交給PEPA處理的數(shù)據(jù)存在不確定的可能性。這些不確定的原始數(shù)據(jù)被稱為不確定事件。不確定事件流的復(fù)雜事件處理主要有2個(gè)挑戰(zhàn),一是如何對大量高速的、實(shí)時(shí)的事件進(jìn)行符合查詢規(guī)則的檢測;二是如何計(jì)算由相關(guān)或不相關(guān)的不確定事件流組成的復(fù)雜事件的概率。本文提出了一種面向不確定性事件流的復(fù)雜事件處理方法(ISCEP, indeterminate stream complex event processing)來解決上述問題。該方法采用事件概率模型建模,并通過基于匹配樹與各節(jié)點(diǎn)的緩沖鏈表相結(jié)合的方法來處理大量的事件流。
在大規(guī)模的 CPS應(yīng)用中經(jīng)常需要處理分布式的事件流,本文假設(shè)不同事件流上的事件實(shí)例是相互獨(dú)立的。在單一事件流里,一些在SEQ模式中的原始事件具有馬爾可夫性。這意味著下一個(gè)事件發(fā)生的概率僅取決于序列中的當(dāng)前事件,和歷史事件無關(guān)。如一輛汽車在i+1時(shí)刻的位置和它在i時(shí)刻的位置有關(guān),而和i時(shí)刻之前的位置無關(guān)。一個(gè)具有馬爾可夫性的事件序列被稱為馬爾可夫鏈。像Kawashima等[9]做的工作那樣,使用條件概率表(CPT)來處理?xiàng)l件概率。除了這些馬爾可夫鏈的事件,可能還存在一些相互獨(dú)立的原始事件。對于一個(gè)SEQ事件E=SEQ(e1, e2,…,en),原始事件劃分為2個(gè)集合S和T。S包含相互獨(dú)立的事件,T包含一個(gè)或多個(gè)馬爾可夫鏈。SEQ事件的概率可以根據(jù)式(1)計(jì)算。
其中,Si是非獨(dú)立集合 S中的某一條馬爾可夫鏈,e1表示馬爾可夫鏈中的第一個(gè)事件,Pr(em+1|em)表示連續(xù)發(fā)生的事件是前后相關(guān)的,它的值可以在CPT中獲得。
事件概率模型中PEPA的輸入是由概率原始事件組成的事件流。閱讀器所處區(qū)域用大寫字母表示,時(shí)標(biāo)用數(shù)字表示。如圖2所示的閱讀器在不同區(qū)域和時(shí)刻讀取到由同一個(gè)設(shè)備產(chǎn)生的概率流。
圖2 概率原始事件流
EPA的輸出是概率復(fù)雜事件。例如表1是經(jīng)過處理后得到的某個(gè)車輛可能的行駛路徑,其中省略了復(fù)雜事件的合成規(guī)則和時(shí)間跨度,保留了2個(gè)主要的屬性:符合合成規(guī)則的概率原始事件串和復(fù)雜事件的概率。根據(jù)EPA的輸出,PA可以選擇更可靠的結(jié)果作為輸入,這樣在一定程度上可以避免因數(shù)據(jù)錯(cuò)誤而導(dǎo)致錯(cuò)誤的狀態(tài)預(yù)測。
表1 概率復(fù)雜事件輸出流
不確定性事件流的具體處理流程如圖3所示。
圖3 不確定性事件流處理方法
當(dāng)查詢被提交的時(shí)候,會注冊相應(yīng)的復(fù)雜事件。所謂注冊就是在查詢映射表中建立查詢索引,同時(shí)建立用于流處理的匹配樹。將交通網(wǎng)絡(luò)劃分為若干子區(qū)域,根據(jù)子區(qū)域和不同的時(shí)間段(如早晚高峰期)對應(yīng)的已注冊查詢來構(gòu)建查詢映射表。當(dāng)某一時(shí)段內(nèi)某個(gè)區(qū)域發(fā)生事件的時(shí)候,就可以通過查詢映射表迅速地找到已注冊查詢。如構(gòu)造一個(gè)如表 2所示的查詢映射表。如果從輸入流中讀取到8:00區(qū)域A在發(fā)生事件,那么應(yīng)該將原始事件發(fā)送到查詢1對應(yīng)的匹配樹處理。
表2 查詢映射表
當(dāng)處理實(shí)時(shí)數(shù)據(jù)流時(shí),每個(gè)登記的查詢都會建立一個(gè)匹配樹。基本的建立規(guī)則在文獻(xiàn)[21]描述。葉子節(jié)點(diǎn)是區(qū)域節(jié)點(diǎn),中間節(jié)點(diǎn)是復(fù)合規(guī)則節(jié)點(diǎn)。在每一個(gè)節(jié)點(diǎn)的下方都會建立一個(gè)緩沖鏈表用以緩沖歷史事件。如圖4所示是查詢1的匹配樹構(gòu)造結(jié)果,其中,SEQ節(jié)點(diǎn)是由B、C區(qū)域事件節(jié)點(diǎn)組成,TSEQ節(jié)點(diǎn)是由復(fù)合節(jié)點(diǎn)SEQ和A區(qū)域事件節(jié)點(diǎn)組成。
在執(zhí)行查詢時(shí),首先獲得概率簡單事件流中的每一個(gè)區(qū)域事件的相關(guān)查詢集合。接著遍歷每一個(gè)查詢,把區(qū)域事件輸送到查詢對應(yīng)匹配樹中,按照自底向上的方式進(jìn)行匹配。如果當(dāng)前節(jié)點(diǎn)匹配成功就會在本節(jié)點(diǎn)的緩沖鏈表中加入匹配成功的記錄,接著發(fā)送更新消息給父節(jié)點(diǎn)。父節(jié)點(diǎn)接到更新消息就會對所有子節(jié)點(diǎn)的緩沖鏈表進(jìn)行匹配,若滿足規(guī)則會在自己的緩沖鏈表中加入成功匹配的節(jié)點(diǎn)記錄,然后繼續(xù)向上級節(jié)點(diǎn)發(fā)送更新消息。如此迭代直到?jīng)]有節(jié)點(diǎn)再接收到更新消息為止。最后對匹配樹的根節(jié)點(diǎn)緩沖鏈表進(jìn)行查看,如果有成功匹配記錄,則自頂向下輸出所有過程并返回結(jié)果集合,否則直接結(jié)束。具體過程如算法1所示。
圖4 查詢1的匹配樹
算法1 ISCEP算法
輸入 E為概率原始事件流,QTree為查詢匹配樹
輸出 R為概率復(fù)雜事件模式的實(shí)例集合
方法:
算法中幾個(gè)函數(shù)含義如下。
Add():事件匹配成功后,將記錄加入鏈表。記錄包含新計(jì)算的復(fù)雜事件概率、產(chǎn)生此記錄的事件集合和時(shí)標(biāo)。
Update():節(jié)點(diǎn)緩沖鏈表有新記錄加入后,用此函數(shù)向節(jié)點(diǎn)的父節(jié)點(diǎn)發(fā)送更新消息。
Match():對子節(jié)點(diǎn)緩沖隊(duì)列中的記錄按照時(shí)標(biāo)順序由后往前進(jìn)行遍歷,對時(shí)標(biāo)限制和運(yùn)算符限制進(jìn)行匹配。
Output(QTree):根節(jié)點(diǎn)緩沖鏈表有記錄時(shí)候調(diào)用。因?yàn)槌晒ζヅ溆涗洶苯幼庸?jié)點(diǎn)的信息,所以按照自頂向下的方式檢索直到葉子節(jié)點(diǎn),就能得到組成當(dāng)前查詢結(jié)果的所有信息。
設(shè)已注冊的查詢有N個(gè),表示為Q1,…,QN,劃分的區(qū)域有M個(gè),表示為D=D1,…,DM,函數(shù)f(Di)表示區(qū)域Di對應(yīng)的查詢的集合。設(shè)一個(gè)新的查詢Q對應(yīng)K個(gè)區(qū)域,表示為DQ=DQ1,…,DQK,其中,DQi∈D。在沒有優(yōu)化的情況下,Q的查詢代價(jià)表示為,QDi表示對應(yīng)到區(qū)域 DQi上的子查詢。如果某個(gè)區(qū)域DQi上的子查詢和已注冊查詢匹配,則其代價(jià)可表示為find(f(DQi))+ merge(f(DQi)),其中,merge(f(DQi))為合并子查詢結(jié)果的代價(jià),而由于建立了索引,find(f(DQi))可忽略不計(jì)。設(shè)已注冊查詢匹配率為 α,則查詢 Q的區(qū)域匹配數(shù)為對應(yīng)的區(qū)域集合為D'Q。則Q的查詢代價(jià)變?yōu)?。由于merge合并結(jié)果的代價(jià)遠(yuǎn)小于直接查詢,因此當(dāng)匹配率α的值比較大的時(shí)候,算法的查詢性能會有大幅度的提高。
針對CPS復(fù)雜事件處理系統(tǒng)的特點(diǎn),傳統(tǒng)馬爾可夫決策過程擴(kuò)展如下。
定義 3 主動式復(fù)雜事件系統(tǒng)并發(fā)馬爾可夫決策過程。一個(gè)主動式復(fù)雜事件系統(tǒng)并發(fā)馬爾可夫決策過程定義為。其中,I為處理動作的agent有限集合,S為狀態(tài)的有限集合,包含一個(gè)特殊的初始狀態(tài)為動作聯(lián)合,其中,Ai為來自第 i個(gè) agent的動作。P: S×A×S→[0, 1]為狀態(tài)轉(zhuǎn)換函數(shù),P(s,α,s')代表在狀態(tài) s執(zhí)行響應(yīng) α后轉(zhuǎn)換到狀態(tài) s'的概率,R:S→Re為報(bào)酬函數(shù)(Re為實(shí)數(shù))。對每個(gè)為CPS的復(fù)雜事件,復(fù)雜事件的變化影響系統(tǒng)的狀態(tài)。C為上下文,在不同的上下文中,agent會采用不同的動作為復(fù)雜事件對系統(tǒng)狀態(tài)的影響。
系統(tǒng)從原始狀態(tài)開始,根據(jù)合適的策略選擇一個(gè)或多個(gè)動作,由動作agent執(zhí)行。動作agent的操作引起 CPS原始事件的變化,原始事件的變化引起復(fù)雜事件的變化,根據(jù)對復(fù)雜事件的綜合分析,確定系統(tǒng)轉(zhuǎn)移到下一個(gè)狀態(tài)的概率。馬爾可夫決策過程的關(guān)鍵是尋找一個(gè)策略π :S→A,它把每個(gè)狀態(tài)映射到一個(gè)動作集合。最優(yōu)的策略使報(bào)酬的總和達(dá)到最大。策略選擇步驟基于式(2)。
策略更新步驟基于式(3)。
其中,γlt;1為衰減因子。經(jīng)典的MDP中動作是順序執(zhí)行的,最近有些研究提到并發(fā)MDP按功能對不同的動作進(jìn)行分組,這些分組后的動作并發(fā)執(zhí)行且相互沒有影響。MDP模型針對一個(gè)狀態(tài)有多個(gè)并發(fā)執(zhí)行的動作,它們在不同的位置執(zhí)行相同的功能來協(xié)同把系統(tǒng)轉(zhuǎn)向期望的狀態(tài)。本文中的MDP方法基于以下觀察結(jié)果。
1) 觀察結(jié)果1。大型CPS中的對象可以根據(jù)上下文進(jìn)行劃分,每個(gè)組對應(yīng)一個(gè)子狀態(tài),而不單獨(dú)考慮每個(gè)對象的狀態(tài)。
例如在考慮交通擁堵問題時(shí),每個(gè)路口的車流量(或擁堵情況)是子狀態(tài),所有路口的擁堵狀況構(gòu)成系統(tǒng)的總體狀態(tài)。
2) 觀察結(jié)果2。大型CPS中某個(gè)位置對應(yīng)的子狀態(tài)依賴于其鄰居位置的狀態(tài)(此處的鄰居是指在一定范圍內(nèi)和當(dāng)前節(jié)點(diǎn)相連的其他節(jié)點(diǎn))。
例如某個(gè)路口的擁堵狀態(tài)依賴于前一個(gè)時(shí)間段其鄰居路口的擁堵狀態(tài)。
3) 觀察結(jié)果3。在大型CPS中,可以把狀態(tài)節(jié)點(diǎn)分為一般節(jié)點(diǎn)和重點(diǎn)節(jié)點(diǎn)。重點(diǎn)節(jié)點(diǎn)是在主動式處理中要控制其狀態(tài)的節(jié)點(diǎn)。如圖5所示,可以根據(jù)重點(diǎn)節(jié)點(diǎn)對交通網(wǎng)絡(luò)進(jìn)行劃分。對單個(gè)子網(wǎng)中重點(diǎn)節(jié)點(diǎn)的狀態(tài)轉(zhuǎn)換,只需調(diào)整其子網(wǎng)內(nèi)鄰居節(jié)點(diǎn)的狀態(tài)來解決。
交通網(wǎng)絡(luò)可以如圖5劃分成若干組。一個(gè)或多個(gè)擁堵程度高(由PA預(yù)測)的節(jié)點(diǎn)作為一個(gè)組的中心,所有到中心的距離小于閾值D的節(jié)點(diǎn)劃分為一個(gè)組。假設(shè)有 n個(gè)節(jié)點(diǎn):J={j1,j2,…,jn},本文把交通流量按擁堵程度分成 k個(gè)級別:CS={c1,c2,…,ck}。MDP中的狀態(tài)可以表示為st=lt;st1,st2,…, stngt;,其中,sti∈S 是節(jié)點(diǎn) ji在 t時(shí)刻處于擁堵狀態(tài),si∈CS。策略πit={ait1,ait2,…,aitn},其中,動作aitk的作用是在t時(shí)刻引導(dǎo)一些車輛改變它們的路徑(如可以通過導(dǎo)航軟件提示等方法),從而減少k組的擁堵程度。本文假設(shè)在交通網(wǎng)絡(luò)中分組時(shí),任意 2組中心間的距離足夠大,這意味著子動作間不存在依賴。因此上述分組的數(shù)據(jù)可以用分布式服務(wù)器處理,通過并行來提高系統(tǒng)性能。通過網(wǎng)絡(luò)劃分,把一個(gè)狀態(tài)數(shù)為n的MDP轉(zhuǎn)換成多個(gè)子MDP,每個(gè)子MDP的聯(lián)合狀態(tài)數(shù)遠(yuǎn)小于n,從而大大降低了問題規(guī)模。
圖5 基于重點(diǎn)節(jié)點(diǎn)的網(wǎng)絡(luò)劃分
算法2 并行調(diào)度方案
輸入 已劃分的交通網(wǎng)絡(luò)分組V={v1,v2,…,vk},分布式系統(tǒng)中服務(wù)器節(jié)點(diǎn)C={c1,c2,…,cn}
輸出 網(wǎng)絡(luò)分組在處理節(jié)點(diǎn)中的分配方案
方法:
其中,分布式系統(tǒng)為同構(gòu)體系結(jié)構(gòu),并且服務(wù)器節(jié)點(diǎn)數(shù)目n小于子網(wǎng)分組數(shù)k,vk.s是分組vk中所有節(jié)點(diǎn)擁堵程度總和。根據(jù)vk.s的值對V進(jìn)行排序,放入堆 MaxHeap中。遍歷 MaxHeap,通過Assign(cn,vk)將當(dāng)前分組任務(wù)分配給 cost最小的服務(wù)器,隨后對服務(wù)器的cost值進(jìn)行修正。此算法實(shí)現(xiàn)了分布式服務(wù)器的負(fù)載均衡,相對于單節(jié)點(diǎn),處理性能會有大幅度的提高。
根據(jù)觀察結(jié)果,在前述MDP模型中加入新的變量G,代表子狀態(tài)節(jié)點(diǎn)的網(wǎng)絡(luò)結(jié)構(gòu)。G=(V,E),其中,V為節(jié)點(diǎn)的集合,E為邊的集合。一個(gè)邊(i,j)存在表示節(jié)點(diǎn)i影響節(jié)點(diǎn)j的狀態(tài)。
定義4 鄰居狀態(tài)節(jié)點(diǎn)。對于任意狀態(tài)節(jié)點(diǎn)i∈V,其鄰居狀態(tài)節(jié)點(diǎn)N(i)={j∈V |di,j≤k},其中,di,j為i和j之間的路徑長度,k是閾值。定義4表明一個(gè)節(jié)點(diǎn)的狀態(tài)只受鄰居節(jié)點(diǎn)狀態(tài)的影響。
通過定義鄰居節(jié)點(diǎn),和重點(diǎn)節(jié)點(diǎn)相關(guān)的聯(lián)合狀態(tài)數(shù)進(jìn)一步縮小,從而進(jìn)一步降低問題的規(guī)模。
定義5 狀態(tài)轉(zhuǎn)換分解。設(shè)有n個(gè)狀態(tài)的子節(jié)點(diǎn),對應(yīng)的動作也分為n個(gè)子動作,則根據(jù)前面的觀察結(jié)果,有
其中,s表示當(dāng)前狀態(tài),s'表示下一個(gè)狀態(tài)。
定義6 報(bào)酬分解。設(shè)有n個(gè)狀態(tài)的子節(jié)點(diǎn),對應(yīng)的動作也分為n個(gè)子動作,則根據(jù)前面的假設(shè),有
定義6意味著整體報(bào)酬被分解為若干個(gè)組的報(bào)酬。每一組的報(bào)酬又進(jìn)一步分解為所有鄰居節(jié)點(diǎn)的報(bào)酬。這樣每個(gè)節(jié)點(diǎn)的報(bào)酬可以采用單機(jī)多處理器的并行方式來計(jì)算。
在交通CPS中,當(dāng)一個(gè)動作ai被執(zhí)行,節(jié)點(diǎn)i處的交通流量為
其中,Pa_out(ji)是嘗試減少節(jié)點(diǎn)i處流量的節(jié)點(diǎn)集,Pa_in(ji)是嘗試增加節(jié)點(diǎn) i處流量的節(jié)點(diǎn)集,αki是Sk流向節(jié)點(diǎn)i的比例。假設(shè)并不是所有車輛都會按本文的策略來改變行駛路線,那么 βki是 Sk受動作ai影響的比例,P(jk)是在節(jié)點(diǎn)k處的車輛根據(jù)ai改變路線的概率,它可以從歷史數(shù)據(jù)得到。整體的狀態(tài)事務(wù)概率可以用P(j)和?i計(jì)算。
根據(jù)定義 6,可以定義平均流量和其中一組gi的方差為
本文的目標(biāo)是把所有擁堵狀態(tài)最小化。因此,定義歐式距離子動作的報(bào)酬函數(shù)和整體報(bào)酬函數(shù)為
最終,最優(yōu)策略可以按式(10)計(jì)算。
SUMO是一個(gè)開源的微觀道路交通仿真模擬器[22],它可以檢測通過相應(yīng)區(qū)域車輛的數(shù)量和每輛車的位置。如圖6所示,本文用JADE(Java agent development framework)框架對 SUMO進(jìn)行擴(kuò)展來支持多agent系統(tǒng),同時(shí)使用 SUMO的TraCI接口來獲得感應(yīng)圈變量和車輛位置的變量,用它們來模擬RFID和GPS閱讀器?;谶@個(gè)框架,agent控制器用智能算法控制車輛在 SUMO中移動,從中獲取事件數(shù)據(jù)和執(zhí)行相應(yīng)的動作。在實(shí)驗(yàn)中,在地圖里設(shè)置了80個(gè)路口和8萬輛汽車。為了更接近真實(shí)的交通系統(tǒng),定義了一組規(guī)則:每輛車都有一個(gè)家的位置和辦公室位置,車輛Vi在家和辦公室之間運(yùn)行的概率是 pi。這些車輛還有相應(yīng)概率去超市、醫(yī)院等其他地點(diǎn)。此實(shí)驗(yàn)?zāi)P涂梢詽M足 PCEP對交通網(wǎng)絡(luò)的劃分、狀態(tài)和動作的分解等操作,并且可以并行的生成決策方法。本文把4臺至強(qiáng)E3處理器和16 GB內(nèi)存的服務(wù)器當(dāng)作數(shù)據(jù)處理服務(wù)器。另一臺有4 GB內(nèi)存的PC運(yùn)行SUMO。
圖6 系統(tǒng)實(shí)現(xiàn)框架
首先來評估ISCEP關(guān)于多查詢的處理性能。本文對經(jīng)典的RCEDA方法[23]進(jìn)行了類似本文提出的概率計(jì)算方式的拓展,稱之為 PRCEDA(probability RCEDA)。通過PRCEDA與本文提出的ISCEP進(jìn)行比較。假定所有的注冊查詢都是二叉樹且Query Depth不超過4層,從圖7中可以觀察到,隨著注冊查詢數(shù)目的加大,ISCEP的時(shí)間消耗增長幅度相比 PRECDA要小很多。因?yàn)镻RECDA以輪詢方式查詢匹配樹,隨著注冊查詢數(shù)目的增加,查詢效率會越來越低。使用查詢映射表的ISCEP輪詢次數(shù)要少很多,所以查詢效率下降較少。
圖7 查詢處理性能
下一個(gè)實(shí)驗(yàn)評估該系統(tǒng)的平均擁堵度,結(jié)果如圖8所示。本文把擁堵程度劃分為11個(gè)等級,其中,“0”表示無擁堵,“10”表示擁堵程度最高。從圖中可以觀察到使用本文方法比讓汽車自主行駛時(shí)平均擁堵度明顯降低。原因是本文方法發(fā)現(xiàn)節(jié)點(diǎn)可能出現(xiàn)擁堵時(shí),會主動引導(dǎo)后續(xù)車輛向輕負(fù)荷節(jié)點(diǎn)行駛。同時(shí),當(dāng)車輛總體擁堵程度比較高的時(shí)候,本文方法帶來的擁堵下降也更多。原因是當(dāng)總體車流量比較大時(shí),系統(tǒng)執(zhí)行的動作會影響更多的車輛,帶來更多的擁堵下降。
圖8 擁堵程度比較
最后的實(shí)驗(yàn)中采用不同數(shù)量的服務(wù)器和不同大小的數(shù)據(jù)來測試本文方法的性能。重點(diǎn)節(jié)點(diǎn)的數(shù)量固定在20,其結(jié)果如圖9所示。結(jié)果表明,本文方法通過網(wǎng)絡(luò)的劃分、鄰居節(jié)點(diǎn)定義及狀態(tài)和動作分解,可以在較大規(guī)模的MDP問題中在可接受的時(shí)間內(nèi)形成決策。從圖中可以看到,生成決策的平均消耗時(shí)間隨著車輛數(shù)量的增加成正比。原因在于車輛數(shù)量的增加需要執(zhí)行更多的子動作,因此增加了算法的復(fù)雜性。通過本文方法還可以發(fā)現(xiàn)隨著車輛數(shù)量的增加,4個(gè)服務(wù)器的性能更好,而且生成決策所需的時(shí)間增長相對較慢。這說明系統(tǒng)具備更好的可擴(kuò)展性,在一定范圍內(nèi)可以通過增加新的計(jì)算資源來支持更大規(guī)模的交通網(wǎng)絡(luò)。
圖9 不同服務(wù)器數(shù)量的性能
本文提出了一種對 CPS設(shè)備產(chǎn)生的不確定性事件流進(jìn)行處理分析,然后在基于ADBN的預(yù)測結(jié)果上引入并行MDP模型來控制大規(guī)模交通擁堵的主動式復(fù)雜事件處理方法。實(shí)驗(yàn)評估表明,該方法能夠有效地減少節(jié)點(diǎn)的擁堵程度,并且具有良好的可擴(kuò)展性。但在并行MDP中,子狀態(tài)和子響應(yīng)的空間隨著節(jié)點(diǎn)和車輛的增加依然會變得巨大。今后的工作重點(diǎn)在于研究如何進(jìn)一步減少子狀態(tài)的空間數(shù)量,從而提高系統(tǒng)的整體性能。
[1] LUCKHAM D C. The power of events: an introduction to complex event processing in distributed enterprise systems[M]. Boston, Addison Wesley, 2002.
[2] ENGEL Y, ETZION O. Towards proactive event-driven computing[C]//The Fifth ACM International Conference on Distributed Event-Based Systems. 2011: 125-136.
[3] ETZION O, NIBLETT P. Event processing in action[M]. Manning Publications, 2010.
[4] WU E, DIAO Y, RIZVI S. High-performance complex event processing over streams[C]//2006 ACM SIGMOD International Conference on Management of Data. 2006: 27-29.
[5] AGRAWAL J, DIAO Y, GYLLSTROM D. Efficient pattern matching over event streams[C]//SIGMOD Conference. 2008: 147-160.
[6] ZHANG H, DIAO Y, IMMERMAN N. Recognizing patterns in streams with imprecise timestamps[J]. PVLDB, 2010, 3(1): 244-255.
[7] XU C, LIN S, LEI W. Complex event detection in probabilistic stream[C]//The 12th International Asia-Pacific Web Conference. 2010:361-363.
[8] KAWASHIMA H, KITAGAWA H, LI X. Complex event processing over uncertain data streams[C]//The Fifth International Conference on P2P, Parallel, Grid, Cloud and Internet Computing. 2010: 521-526.
[9] KAWASHIMA H, KITAGAWA H, LI X. Complex event processing over uncertain data streams[C]//3PGCIC. 2010: 521-526.
[10] WELBOURNE E, KHOUSSAINOVA N, LETCHNER J, et a1. Cascadia: a system for specifying, detecting, and managing RFID events[C]//The 6th International Conference on Mobile Systems, Applications, and Services. 2008: 281-294.
[11] DOLEV S, KOPEETSKY M, SHAMIR A. RFID authentication efficient proactive information security within computational security[J].Theory of Computing Systems, 2011, 48(1):132-149.
[12] KUNZ T, ALHALIMI R. Energy-efficient proactive routing in MANET: energy metrics accuracy[J]. Ad Hoc Networks, 2010, 8(7):755-766.
[13] ENGEL Y, ETZION O, FELDMAN Z. A basic model for proactive event-driven computing[C]//The 6th ACM International Conference on Distributed Event-Based Systems (DEBS'12). 2012: 107-118.
[14] ENGEL Y, ETZION O. Towards proactive event-driven computing[C]//The Fifth ACM International Conference on Distributed Event- Based Systems, DEBS. 2011: 125-136.
[15] JIA Q S. On state aggregation to approximate complex value functions in large-scale Markov decision processes[J]. IEEE Transactions on Automatic Control, 2011, 56(2):333-344.
[16] SUN T, ZHAO Q C, LUH P B. Incremental value iteration for time aggregated Markov decision processes[J]. IEEE Transactions on Automatic Control, 2007, 52(11):2177-2182.
[17] POWELL W B. Approximate dynamic programming: solving the curse of dimensionality[M]. New York, Wiley-Inter Science, 2007.
[18] JIA Q S, ZHAO Q C. Strategy optimization for controlled Markov process with descriptive complexity constraint[J]. Science in China,2009, 52(11):1993-2005.
[19] WANG Y H, CAO K N, ZHANG X M. Complex event processing over distributed probabilistic event streams computers and mathematics with applications[J]. Computers amp; Mathematics with Applications,2012, 66(10): 1808-1821.
[20] PASCALE A, NICOLI M. Adaptive Bayesian network for traffic flow prediction[C]//Statistical Signal Processing Workshop (SSP). IEEE,2011: 177-180.
[21] 王永恒, 楊圣洪. 分布式 RFID數(shù)據(jù)流的復(fù)合事件檢測方法[J]. 計(jì)算機(jī)應(yīng)用研究, 2011, 28(11): 4177-4179.WANG Y H, YANG S H. Complex event detection method for distributed RFID systems[J]. Application Research of Computer, 2011,28(11): 4177-4179.
[22] BEHRISCH M, BIEKER L, ERDMANN J, et al. Sumo - simulation of urban mobility: an overview[C]//The Third International Conference on Advances in System Simulation. Barcelona, Spain, 2011: 63-68.
[23] WANG F, LIU S, LIU P, et al. Bridging physical and virtual worlds:complex event processing for RFID data streams[C]//The 10th International Conference on Extending Database Technology (EDBT'2006).2006: 588-607.
Research of proactive complex event processing method
GENG Shao-feng1,2, WANG Yong-heng1, LI Ren-fa1, ZHANG Jia2
(1. College of Information Science and Engineering, Hunan University, Changsha 410082, China;2. College of Computer Engineering, Jimei University, Xiamen 361021, China)
Based on the preliminary analysis results of the indeterminate event stream that generated by the sensors and control purpose equipment of CPS, the adaptive dynamic Bayesian network and parallel Markov decision process model were used to support the proactive complex event processing. In order to resolve the vast state space issue of Markov decision process for large CPS, states partition and reward decomposition methods were proposed to parallel the decision making process. The experimental result based on the simulation of traffic network shows that proposed method can process event stream effectively and has favorable scalability.
CPS, big data, complex event processing,Markov decision process
s: The National Natural Science Foundation of China(No.61173036, No.61371116), Foundation of Fujian Educational Committee (No.JA14186)
TP391
A
10.11959/j.issn.1000-436x.2016183
2016-04-07;
2016-06-26
國家自然科學(xué)基金資助項(xiàng)目(No.61173036,No.61371116);福建省教育廳基金資助項(xiàng)目(No.JA14186)
耿少峰(1981-),男,河南臺前人,湖南大學(xué)博士生,主要研究方向?yàn)?CPS、復(fù)雜事件處理等。
王永恒(1973-),男,河北霸州人,湖南大學(xué)副教授,主要研究方向?yàn)槲锫?lián)網(wǎng)、數(shù)據(jù)挖掘等。
李仁發(fā)(1957-),男,湖南宜章人,湖南大學(xué)教授、博士生導(dǎo)師,主要研究方向?yàn)镃PS、嵌入式系統(tǒng)和無線傳感網(wǎng)等。
張佳(1980-),女,河南新鄉(xiāng)人,集美大學(xué)講師,主要研究方向?yàn)槲锫?lián)網(wǎng)、嵌入式系統(tǒng)等。