張淑梅 李福興
摘要:針對城市交通產(chǎn)生大量的實時、連續(xù)數(shù)據(jù)處理的問題,提出采用Storm分布式實時計算框架解決該問題。系統(tǒng)采用Kafka軟件實時采集生產(chǎn)現(xiàn)場各類大量參數(shù)、發(fā)布-訂閱消息,經(jīng)Storm的Trident應(yīng)用軟件組件對大數(shù)據(jù)進(jìn)行過濾、拆分、分組、函數(shù)、狀態(tài)更新、狀態(tài)查詢、重新分區(qū)等操作完成大數(shù)據(jù)計算;結(jié)合分布式遠(yuǎn)程過程調(diào)用軟件(DRPC軟件)滿足并行查詢需求。經(jīng)驗證,該設(shè)計可提高原系統(tǒng)計算速度、效率和吞吐量,降低開銷,確保數(shù)據(jù)安全。
關(guān)鍵詞:城市交通;大數(shù)據(jù);分布式
中圖分類號:TP393 文獻(xiàn)標(biāo)識碼:A 文章編號:1007-9416(2019)09-0126-03
0 引言
隨著各種交通信息采集技術(shù)例如傳感器技術(shù)、地理信息系統(tǒng)、GPS采集系統(tǒng)和計算機(jī)技術(shù)被廣泛地運用到城市交通路口,采集得到的交通數(shù)據(jù)呈爆發(fā)增長。經(jīng)研究和分析發(fā)現(xiàn),這些數(shù)據(jù)無論是類型、格式、長度各不相同,可分為結(jié)構(gòu)化、非結(jié)構(gòu)化及半結(jié)構(gòu)化。本系統(tǒng)是建立在城市智慧交通物聯(lián)網(wǎng)、云計算平臺基礎(chǔ)上,構(gòu)建基于分布式流式實時數(shù)據(jù)計算技術(shù),建立城市智慧交通大數(shù)據(jù)計算系統(tǒng)。提高城市智慧交通系統(tǒng)響應(yīng)速度,輔助城市交通智慧決策。
1 實時數(shù)據(jù)流式計算相關(guān)技術(shù)
1.1 常見大數(shù)據(jù)實時流式處理架構(gòu)
在大數(shù)據(jù)實時流式處理領(lǐng)域,主流開源實時流式處理系統(tǒng)有Storm、Spark和Flink 3種。Storm工作時需要將任務(wù)設(shè)計為有向無環(huán)拓?fù)鋱D,將設(shè)計好的有向無環(huán)拓?fù)鋱D提交至系統(tǒng)集群,由系統(tǒng)集群主控節(jié)點分配任務(wù)給相應(yīng)的工作節(jié)點執(zhí)行任務(wù)。Spark是在處理前按時間戳預(yù)先將大數(shù)據(jù)分割為微批量數(shù)據(jù)流后進(jìn)行批處理作業(yè)。Flink數(shù)據(jù)處理模式與Storm類似,但具有完善的窗口功能及窗口聚合等功能,并且會主動操作窗口狀態(tài)。為了及時掌握數(shù)據(jù)處理結(jié)果,流式處理系統(tǒng)都有數(shù)據(jù)處理狀態(tài)管理功能,但這3種系統(tǒng)的狀態(tài)管理模式不同,Storm將狀態(tài)管理滾動至應(yīng)用層面或使用更高層面的抽象Trident[1]。Spark把狀態(tài)信息視作是一種微批量數(shù)據(jù)流,在處理數(shù)據(jù)時加載數(shù)據(jù)目前的狀態(tài)信息,該狀態(tài)信息通過利用Spark具有的函數(shù)操作獲得該數(shù)據(jù)處理結(jié)果,同時修改被加載過數(shù)據(jù)的狀態(tài)信息。Flink系統(tǒng)設(shè)有專門的數(shù)據(jù)狀態(tài)信息,F(xiàn)link在內(nèi)部存儲計算產(chǎn)生的中間結(jié)果,并供后續(xù)功能或算子計算結(jié)果使用。數(shù)據(jù)狀態(tài)信息可存在Flink堆內(nèi)存或堆外內(nèi)存,也可存儲介質(zhì)在第三方介質(zhì)中。
1.2 storm架構(gòu)
Storm架構(gòu)適用于處理無邊界的流式數(shù)據(jù),其架構(gòu)依賴Hadoop的Zookeeper。Storm系統(tǒng)將接收的數(shù)據(jù)直接在內(nèi)存中進(jìn)行計算,沒有數(shù)據(jù)傳輸和磁盤讀寫的延遲問題,滿足分布式流式實時計算對實時性要求高的需求。
Storm采用主從架構(gòu)模式,其系統(tǒng)有主節(jié)點(Master)和工作節(jié)點( worker)之分。主節(jié)點為系統(tǒng)的中心,在其上運行后臺服務(wù)程序(Nimbus)、運行各種Storm命令,包括激活(active) 、使失效(Deactive)、再次平衡(rebalance)以及終止(kill)命令。工作節(jié)點(worker)上運行服務(wù)程序(Supervisor),工作節(jié)點也是Spout和Bolt執(zhí)行處理邏輯的地方,通過Supervisor程序安排工作任務(wù)、下載作業(yè)副本。Supervisor監(jiān)聽執(zhí)行后臺服務(wù)程序提交的任務(wù),并可對任務(wù)線程執(zhí)行啟動、暫停和撤銷工作。一個或多個工作線程組成一個工作進(jìn)程,工作線程由每個任務(wù)節(jié)點的實例組成,是Storm的最小單元。
Storm通過Zookeeper程序協(xié)調(diào)主節(jié)點和工作結(jié)點之間的通信。任何在主結(jié)點和工作結(jié)點之間的狀態(tài)都存放在Zookeeper里。一旦其中一個任務(wù)崩潰了,當(dāng)恢復(fù)的時候,將從Zookeeper中讀取該任務(wù)之前的狀態(tài),讓Storm結(jié)點任務(wù)恢復(fù)還原至崩潰以前的狀態(tài)。
1.3 Kafka消息系統(tǒng)
Kafka是一個分布式高吞吐量消息系統(tǒng)[2],它擁有高吞吐量、易擴(kuò)展和透明的特點,非常適合處理為實現(xiàn)規(guī)模化、智能化、集群化生產(chǎn)而組成的物聯(lián)網(wǎng)大數(shù)據(jù)。
Kafka消息可靠性機(jī)制。當(dāng)一個消息被發(fā)送后,發(fā)送端將等候服務(wù)器成功接收到消息的反饋(可通過參數(shù)控制等候時間),假如消息在傳輸途中丟失或是其中某個服務(wù)器死機(jī),發(fā)送端則重新發(fā)送。服務(wù)器端記錄了補(bǔ)償值(offset),用于指向接收端下一個即將發(fā)送的信息,當(dāng)接收端收到了消息,但卻在計算過程中宕機(jī),此時接收端可以通過這個補(bǔ)償值重新找到上一個消息再進(jìn)行處理。接收端還有權(quán)限控制這個補(bǔ)償值,對持久化到服務(wù)器端的消息做任意處理,提高了消息發(fā)送可靠性,降低數(shù)據(jù)丟失率。Kafka的數(shù)據(jù)轉(zhuǎn)發(fā)方案具有允許集群中的某一節(jié)點死機(jī)而不影響整個集群工作的特點,即在一個集群中,當(dāng)備份數(shù)量為N下,并允許N-1個節(jié)點失敗。在所有這些節(jié)點中,其中有一個節(jié)點是頭節(jié)點,該節(jié)點存儲了其它備份節(jié)點列表,并維持各個備份間的狀體同步?;贙afka的特點,本次選用Kafka程序。
2 城市智慧交通大數(shù)據(jù)處理需求分析
智能交通云平臺主要包含以下幾個方面:(1)基于大數(shù)據(jù)處理技術(shù)的交通信息數(shù)據(jù)共享中心,能夠?qū)A繑?shù)據(jù)進(jìn)行有效存儲和管理的城市智慧交通數(shù)據(jù)系統(tǒng)。(2)為方便公眾出行,對于出現(xiàn)大面積交通癱瘓的情況進(jìn)行預(yù)測、避免擁堵的城市交通監(jiān)測和預(yù)警系統(tǒng)。(3)可按時段和區(qū)域統(tǒng)計車輛污染的排放情況,為改善環(huán)境、促進(jìn)綠色環(huán)保以及治理汽車尾氣的排放提供數(shù)據(jù)支持的交通污染監(jiān)測系統(tǒng)。(4)可對交通數(shù)據(jù)進(jìn)行分析,讓公交部門充分了解道路情況,適時調(diào)整公交運力,合理分配公交資源,方便公眾出行的公交管理系統(tǒng)。(5)城市智慧交通系統(tǒng)除人車路外,其智慧網(wǎng)絡(luò)主要由物聯(lián)網(wǎng)和其它設(shè)備組成,其中物聯(lián)網(wǎng)是系統(tǒng)的中樞神經(jīng)系統(tǒng)。在過程中實時產(chǎn)生的大數(shù)據(jù)連續(xù)、數(shù)量相對比較大的特點,傳統(tǒng)的數(shù)據(jù)處理系統(tǒng)無法滿足這一需求,需要構(gòu)建一套能實現(xiàn)實時數(shù)據(jù)分布式流式處理系統(tǒng)。
3 實時數(shù)據(jù)計算系統(tǒng)設(shè)計
3.1 系統(tǒng)架構(gòu)設(shè)計
根據(jù)Storm系統(tǒng)的特點,系統(tǒng)采用結(jié)構(gòu)化設(shè)計模式,其主要由實時數(shù)據(jù)采集、實時數(shù)據(jù)計算、數(shù)據(jù)存儲、實時數(shù)據(jù)分享和實時系統(tǒng)監(jiān)控部分組成。(1)實時數(shù)據(jù)采集層可以根據(jù)數(shù)據(jù)來源、類型、大小、頻率進(jìn)行預(yù)裝軟件,如Kafka軟件和Nginx軟件實現(xiàn)對這些數(shù)據(jù)的實時采集和預(yù)處理,其中Nginx軟件安裝在一臺服務(wù)器上,它將獲取的數(shù)據(jù)按照要求(如地點、時間等)進(jìn)行日志分割并生成一定格式的日志文件,它是流式數(shù)據(jù)處理平臺數(shù)據(jù)的入口和預(yù)處理系統(tǒng)。(2)實時數(shù)據(jù)計算層是基于Storm 實時流式計算技術(shù)的高可靠大數(shù)據(jù)實時計算系統(tǒng),實現(xiàn)對海量大數(shù)據(jù)的分布式、高容錯、高可靠實時的大數(shù)據(jù)進(jìn)行計算,是整個Storm系統(tǒng)的核心部分。系統(tǒng)應(yīng)用Storm系列組件進(jìn)行任務(wù)拓?fù)湓O(shè)計、系統(tǒng)開發(fā)、進(jìn)程編排、信息發(fā)布等。(3)數(shù)據(jù)存儲層是系統(tǒng)用于城市智慧交通運行過程中產(chǎn)生的各類數(shù)據(jù)存儲管理部分。在大數(shù)據(jù)分布式實時流式計算系統(tǒng)中,為了提高系統(tǒng)存取數(shù)據(jù)的速度,數(shù)據(jù)存儲根據(jù)數(shù)據(jù)應(yīng)用背景分為內(nèi)存存儲和硬盤存儲兩種模式。本次系統(tǒng)內(nèi)存儲選Redis系統(tǒng),磁盤存儲選用Hbase數(shù)據(jù)庫管理系統(tǒng)。(4)實時數(shù)據(jù)分享層是實時數(shù)據(jù)計算系統(tǒng)將大數(shù)據(jù)處理的結(jié)果對其它應(yīng)用系統(tǒng)分享的接口。本系統(tǒng)基于高效的實時數(shù)據(jù)存儲子系統(tǒng),采用統(tǒng)一的Web服務(wù)、遠(yuǎn)程服務(wù)等服務(wù)方式為外部系統(tǒng)提供實時數(shù)據(jù)訪問接口。(5)實時數(shù)據(jù)處理監(jiān)控實現(xiàn)對系統(tǒng)各部分的軟件和硬件運行狀態(tài)進(jìn)行實時監(jiān)控。實現(xiàn)對每個節(jié)點上的CPU、存儲(內(nèi)外存)、網(wǎng)絡(luò)帶寬等參數(shù)進(jìn)行實時監(jiān)測;完成對系統(tǒng)接入部分的實時數(shù)據(jù)傳輸情況的實時監(jiān)測;控制系統(tǒng)各計算節(jié)點計算任務(wù)均衡分配、計算任務(wù)的啟停等;可以對實時數(shù)據(jù)計算子系統(tǒng)和實時數(shù)據(jù)分析子系統(tǒng)等的數(shù)據(jù)存取和訪問進(jìn)行實時監(jiān)測,還能根據(jù)預(yù)先定義的報警規(guī)則發(fā)出狀態(tài)報警和預(yù)警。
3.2 城市智慧交通大數(shù)據(jù)處理
3.2.1 數(shù)據(jù)實時計算設(shè)計
數(shù)據(jù)實時計算是Storm系統(tǒng)的核心。Storm通過轉(zhuǎn)發(fā)數(shù)據(jù)功能,按類別聚類建立拓?fù)?,不斷接受現(xiàn)場的數(shù)據(jù),Storm通過主節(jié)點按事先設(shè)定的工作(worker)分配任務(wù)(Task),Spout和Bolt則按類處理大數(shù)據(jù)。
Spout接受傳輸層數(shù)據(jù)的輸入或從文件中讀入數(shù)據(jù)、監(jiān)視新文件,文件一旦被修改,Spout會重新讀入數(shù)據(jù)并覆蓋之前的元組(tuple),是數(shù)據(jù)實時計算數(shù)據(jù)的入口,將接受的數(shù)據(jù)組成tuple(元組),將tuple發(fā)射給Bolt進(jìn)行數(shù)據(jù)實時執(zhí)行流式數(shù)據(jù)合并、連接、分組、聚合等操作,將操作結(jié)果再發(fā)送至下一個Bolt,直至數(shù)據(jù)處理完畢,實現(xiàn)城市智慧交通實時監(jiān)控。
為了更進(jìn)一步提升系統(tǒng)處理大數(shù)據(jù)的能力,在完成了將數(shù)據(jù)傳輸至kafka后,系統(tǒng)選用Trident topology進(jìn)行分析計算,Trident是在storm基礎(chǔ)上,一個以實時流式計算為目標(biāo)的高度抽象。它在提供處理大吞吐量數(shù)據(jù)能力的同時,具有高速分布式查詢和有狀態(tài)流式處理的能力。
3.2.2 建立一個Trident的spout
數(shù)據(jù)采集層完成接收系統(tǒng)實時數(shù)據(jù),具體設(shè)計如下:創(chuàng)建一個新的數(shù)據(jù)流,名命為cm-spout,并且傳參數(shù)給kakfk的spout實例。將Trident topology轉(zhuǎn)換成一個storm的拓?fù)?。分別指定cmhosts和kafka的topic(主題)名,cmhost用于配置連接kafka的Zookeeper,這個spout用于通過查詢的方式動態(tài)確定kafka的分區(qū)信息。經(jīng)過這步已經(jīng)建立了一個用于發(fā)射批量城市智慧交通系統(tǒng)運行的spout。
3.2.3 分割操作及創(chuàng)建每個字段的獨立數(shù)據(jù)流
將數(shù)據(jù)進(jìn)行分割為小數(shù)據(jù)發(fā)布到系統(tǒng)集群的計算機(jī)中完成計算任務(wù),分割將以時間戳、地點、設(shè)備名為關(guān)鍵詞進(jìn)行分割,建立多個主題,用來管理不同時間、地點、設(shè)備所產(chǎn)生的數(shù)據(jù)。不同類別的數(shù)據(jù)記錄到其對應(yīng)的主題池中,而這些進(jìn)入到主題池中的數(shù)據(jù)會被Kafka寫入磁盤的日志文件中進(jìn)行持久化處理,降低Storm對實時分析處理速度的要求,等Storm有空時再處理沒來及處理的數(shù)據(jù),避免數(shù)據(jù)處理的遺漏。
3.3 實時報警
在按照數(shù)據(jù)字段名建立了新的流數(shù)據(jù)后,設(shè)計一個跟蹤函數(shù),實現(xiàn)對各數(shù)據(jù)判斷是否在合適的范圍內(nèi),如果不在合適范圍內(nèi),則向XMPP發(fā)送報警信息和該數(shù)據(jù)并將該數(shù)據(jù)做永久保存處理。
3.4 并行查詢設(shè)計
通過以上設(shè)計,系統(tǒng)已可快速高效地處理城市智慧交通生產(chǎn)的大數(shù)據(jù),但要通過查詢處理數(shù)據(jù)的結(jié)果,還需進(jìn)一步完成查詢的設(shè)計。在設(shè)計時,將應(yīng)用軟件、查詢和Storm隔離,它們之間只能通過外部方式來訪問,為了查詢拓?fù)湟垣@取所需的數(shù)據(jù),將使用DRPC(分布式遠(yuǎn)程過程調(diào)用)實現(xiàn)。DRPC是Storm中一套軟件,它接收用戶輸入,同時也作為DRPC Spout的輸入而存在。
在Storm的DRPC中,客戶端將向Storm的DRPC的服務(wù)器發(fā)出一個DRPC請求,服務(wù)器將把請求發(fā)送到相應(yīng)Storm topology來協(xié)調(diào)請求和計算,并等待該拓?fù)涞膽?yīng)答。一旦收到應(yīng)答,它將把應(yīng)答返回到請求客戶端,實現(xiàn)并行查詢,高效地查詢各項參數(shù)。
4 實驗分析
通過在學(xué)校機(jī)房選5臺PC機(jī)組建局網(wǎng),安裝系統(tǒng)進(jìn)行測試。所選軟件為Storm 0.10.0 released、Kafka2.11-0.10.0.1, Zookeeper3.4.9,Hbase1.0.3,Redis-3.2.3,PC機(jī)配置為:16G內(nèi)存,intel 酷睿i7的CPU,1T磁盤。實驗部署架構(gòu),集群各個節(jié)點的配置和功能描述如表1所示。
4.1 系統(tǒng)可靠性測試
實驗采用運行數(shù)據(jù)包模擬每1秒產(chǎn)生1000條數(shù)據(jù)記錄。當(dāng)數(shù)據(jù)源快速輸出時,計算的數(shù)據(jù)傳輸在“no-ack”(無應(yīng)答)時的丟失率和采用“ack”(有應(yīng)答)保障機(jī)制后的數(shù)據(jù)傳輸?shù)膩G失率。為確保準(zhǔn)確性,采用5次數(shù)據(jù)的平均值。從表2可以看出,采用ack后,有效降低數(shù)據(jù)丟失率。
4.2 系統(tǒng)實時計算性能
由表3可知,系統(tǒng)實時計算總量和時間的關(guān)系,進(jìn)而證明該系統(tǒng)具有很好的實時性和抗壓性。大數(shù)據(jù)實時流式分布式計算已在一些工業(yè)生產(chǎn)現(xiàn)場、電商平臺得到應(yīng)用并顯現(xiàn)出良好的效果,這也從另一方面驗證了設(shè)計的可行性。
5 結(jié)語
本文研究以城市智慧交通系統(tǒng)為對象,在已有系統(tǒng)硬件信息化、智能化建設(shè)基礎(chǔ)上,使用實時流式數(shù)據(jù)計算系統(tǒng)的Storm技術(shù),設(shè)計了城市智慧交通實時大數(shù)據(jù)的分析、計算、反饋與預(yù)警系統(tǒng)。(1)系統(tǒng)實施后,在城市智慧交通運行過程中,對其各項參數(shù)的實時自動采集、連續(xù)監(jiān)測和快速分析,相關(guān)人員可以實時獲取城市智慧交通運行的信息及預(yù)警信息。系統(tǒng)采集到的基礎(chǔ)數(shù)據(jù)可以為后續(xù)的大數(shù)據(jù)分析、處理提供支持。(2)由于涉及到生產(chǎn)實踐問題,設(shè)計驗證僅在學(xué)校用模擬數(shù)據(jù)驗證,有待生產(chǎn)現(xiàn)場的驗證。(3)本文僅對城市智慧交通系統(tǒng)大數(shù)據(jù)處理進(jìn)行了系統(tǒng)構(gòu)建,還有待更進(jìn)一步完善、不斷改善實施方案、優(yōu)化系統(tǒng)算法及與應(yīng)用系統(tǒng)更進(jìn)一步融合,提高應(yīng)用系統(tǒng)的運行速度和效率。
參考文獻(xiàn)
[1] 王潤華,毋建軍,侯佳路.分布式實時計算引擎——Storm研究[J].中國科技信息,2015(6):68-69.
[2] 程學(xué)旗,靳小龍,王元卓,等.大數(shù)據(jù)系統(tǒng)和分析技術(shù)綜述[J].軟件學(xué)報,2014,25(9):1889-1908.
[3] 朱偉,李紀(jì)云,江慧,劉柱云.基于分布式內(nèi)存數(shù)據(jù)的數(shù)據(jù)同步設(shè)計與實現(xiàn)[J].現(xiàn)代電子技術(shù),2014(37):77-79+83.
Abstract:To solve the problem of large amount of real-time and continuous data processing in urban traffic, a Storm distributed real-time computing framework is proposed to solve the problem. The system uses Kafka software to collect a large number of parameters and publish-subscribe messages on the production site in real time. Through the Trident application software component of Storm, large data are filtered, split, grouped, functions, status updates, status queries, re-partitioning and other operations to complete large data calculation.and combines distributed remote procedure call software (DRPC software) to meet the needs of parallel query. It has been proved that the design can improve the computing speed, efficiency and throughput of the original system, reduce the overhead and ensure data security.
Key words:urban transportation;Big data;distributed