張前進
(安徽國防科技職業(yè)學院信息工程系,安徽六安237011)
基于Storm的物聯(lián)網海量實時數據流處理研究
張前進
(安徽國防科技職業(yè)學院信息工程系,安徽六安237011)
針對物聯(lián)網中數據實時與異構的特點,設計了基于云計算的海量實時數據感知與處理模型。結合海量實時數據并行處理機制與數據處理流程,利用開源實時處理系統(tǒng)Storm,實現(xiàn)了海量實時數據流處理過程,并給出了核心實現(xiàn)方法。
物聯(lián)網;實時數據;海量數據
物聯(lián)網的概念是國際電信聯(lián)盟(International Telecommunication Union,ITU)在2005年的《ITU Internet reports 2005-the Internet of things》中提出的。ITU認為“通過對事物內嵌RFID微型芯片或者傳感器芯片,通過互聯(lián)網就能實現(xiàn)物與物、人與物、人與人之間的信息交互,從而形成一個無所不在的物聯(lián)網”[1]。隨著物聯(lián)網技術的快速發(fā)展,以及智慧城市、智能交通、智慧農業(yè)等項目的建設,傳感器的數量呈指數級增長,IDC報告中指出,到2020年全球傳感器數量將突破2 120億個[2]。如此龐大數量的傳感器每時每刻都在產生實時數據,其數據規(guī)模是海量的。在海量的物聯(lián)網數據中,一方面,數據本身是異構的,既有存儲在關系數據庫中的結構化數據,又有web形式的半結構化數據,還有音視頻等文檔形式的非機構化數據;另一方面,很多應用場景的實時數據具有時效性,如智慧農業(yè)中的天氣、環(huán)境等。針對物聯(lián)網實時數據規(guī)模與特點,本文利用分布式實時處理Storm開源架構,設計了基于物聯(lián)網的海量實時數據處理系統(tǒng)。
物聯(lián)網中除了數據是異構的,感知設備本身的數據采集協(xié)議、傳輸協(xié)議也是異構的。為了屏蔽設備的異構,在物聯(lián)網數據實時感知與處理模型中引入云服務平臺,如圖1所示。
圖1 物聯(lián)網實時數據感知與處理模型
感知層:感知層由傳感器、二維碼標簽、RFID標簽、M2M終端等感知設備組成,是物聯(lián)網體系中的重要組成部分[3],負責利用傳感設備節(jié)點采集與感知實時的環(huán)境數據。由于感知層是物聯(lián)網的基礎,其可靠性決定了物聯(lián)網系統(tǒng)的穩(wěn)定性,因此,感知層的主要任務是構建一個低成本、高可靠性的感知網絡。
傳輸層:傳輸層由接入網和物聯(lián)網關兩部分組成。接入網負責將采集到的傳感數據通過網絡進行傳輸,由無線傳感網、局域網、衛(wèi)星網等組成。因此,接入網是一種異構網絡,不同的網絡,其網絡協(xié)議、傳輸協(xié)議均不相同,要達到不同網絡間信息的互聯(lián)互通,需要將異構網絡進行融合。物聯(lián)網關能夠實現(xiàn)不同感知網絡協(xié)議與接入網絡協(xié)議間的轉換以及將數據按照統(tǒng)一格式進行封裝,從而解決異構網絡間的互聯(lián)互通問題。
處理層:處理層是物聯(lián)網海量實時數據流處理系統(tǒng)的核心部分。數據采集與處理模塊負責采集傳輸層上傳的數據,并按照業(yè)務規(guī)則對數據進行轉換處理,將無意義的離散數據與業(yè)務建立映射關系,變成有意義的業(yè)務數據。云資源服務平臺負責為應用層提供與平臺、網絡無關的統(tǒng)一數據資源服務。處理層中引入云服務平臺,為實時數據處理提供可擴展的、具有彈性的云計算服務[3],有效提高整個系統(tǒng)的數據處理能力。
應用層:應用層由智能交通、智慧農業(yè)、智慧城市等智能應用與服務組成。應用層直接面向用戶,負責為用戶提供不受時間、地點限制的智能服務。
2.1 海量實時數據的并行處理機制
相對串行計算而言,并行計算是指將一個按照順序執(zhí)行的計算任務,分解成若干個可以同時執(zhí)行的子任務加以并行執(zhí)行,從而完成整個計算任務,并行計算的主要目的是快速解決大型且復雜的計算問題[5]。
基于物聯(lián)網感知的數據具有流式數據的特點,主要表現(xiàn)為:數據是一組時間序列下針對感知對象相關狀態(tài)屬性的數據,數據間具有無相互依賴的特征[6]。同時,實時感知的物聯(lián)網數據還具有量大流速快的特點。因此,對實時感知的海量物聯(lián)網數據的處理,不僅要求系統(tǒng)具有高可靠性、高穩(wěn)定性,還要求系統(tǒng)能夠快速處理。為了滿足海量數據處理的實時性要求,采用數據并行處理技術完成數據的實時處理。
并行處理機制如圖2所示,首先根據感知數據源對象屬性特征進行并行劃分,然后根據數據分發(fā)機制將上述數據分發(fā)到不同的并行處理節(jié)點上,并行節(jié)點按照預定義的計算規(guī)則對收到的數據進行運算處理,最后將各個并行處理節(jié)點處理的中間數據匯總到合并節(jié)點上,通過運算形成為上層應用服務使用的最終數據。
圖2 海量實時數據并行處理機制
2.2 海量實時數據處理流程
海量實時數據處理流程主要包含數據采集、數據預處理、實時計算分析與存儲、UI展示等過程。具體實現(xiàn)步驟如下:
1)數據采集。通過傳輸層采集由感知層產生的各類實時數據,之后將數據發(fā)送到數據控制服務器。
2)數據預處理。數據預處理實現(xiàn)對數據的分類、清洗、格式轉換等操作。
3)實時計算分析與存儲。首先將預處理完成后的數據放入待處理消息隊列,按順序將消息放入并行計算集群完成實時計算分析。然后分兩條線處理,一條線直接進入第5步,另一條線進入第4步。
4)計算結果存儲與深度分析。這樣可以更好地為用戶提供個性化的服務。
5)UI展示。
大數據處理可以分為批處理和實時流處理兩種模式[7]。對于大數據的批處理模式,目前較為流行的是基于MapReduce與HDFS的開源框架Hadoop分布式存儲計算平臺[8]。Hadoop開源框架適合對歷史數據的集中處理,例如大規(guī)模網站訪問日志的分析、大型購物網站的網頁索引等,但它無法滿足物聯(lián)網中大規(guī)模實時感知數據的處理。大數據實時處理方面,較流行的有Storm、Spark、Samza等基于Apache的開源框架。本文基于Storm開源框架設計分布式海量實時數據處理系統(tǒng)。Storm框架最初由BackType開發(fā),2011年被Tritter公司收購,同年由Tritter在GitHub上將其開源。
3.1 Storm基本組件
Storm開源框架主要分為Nimbus和Supervisor兩種組件,這兩種組件都是快速失敗的,沒有狀態(tài)。任務狀態(tài)和心跳信息都保存在Zookeeper上,Zookeeper是Storm重點依賴的外部資源。
Nimbus是控制節(jié)點的后臺程序,負責為工作節(jié)點分配工作和發(fā)送代碼,并且監(jiān)控工作節(jié)點的工作狀態(tài)[9],全局只有一個Nimbus。
Supervisor是工作節(jié)點的后臺程序,每一個工作節(jié)點上運行一個,負責接受控制節(jié)點Nimbus分配的任務,會監(jiān)聽分配任務給它的那個控制節(jié)點,根據需要關閉或者啟動工作進程worker。
一個工作節(jié)點會運行一個或多個工作進程,每一個工作節(jié)點都會執(zhí)行一個Topology任務子集[10]。Topology是Storm框架中運行的一個封裝計算任務邏輯的實時應用程序,由Spout和Bolt構成。工作節(jié)點與控制節(jié)點的通信與協(xié)調都是通過Zookeeper來實現(xiàn)。
3.2 基于Storm的分布式海量實時數據處理系統(tǒng)
3.2.1 系統(tǒng)架構
基于物聯(lián)網的海量實時數據流處理系統(tǒng)架構如圖3所示,由數據源接入模塊、數據緩存模塊、Storm集群、Hadoop集群、關系數據庫、UI展示等部分組成。
圖3 系統(tǒng)架構圖
1)數據源接入模塊
數據源接入模塊負責為數據處理集群快速接入數據源。本文采用的是基于開源的Apache Flume日志系統(tǒng)實現(xiàn)各種實時感知數據源的快速接入,F(xiàn)lume是Cloudera提供的一個高可靠、高可用的分布式海量日志采集及聚合和傳輸系統(tǒng),還可以用于歷史數據的收集。同時,該模塊還能夠對數據進行簡單處理。
2)數據緩存模塊
對于實時數據處理,如果數據流量較大,數據處理模塊的處理能力可能無法達到,甚至引起宕機。因此,系統(tǒng)引入Kafka系統(tǒng)數據緩存模塊,Kafka系統(tǒng)是一個高吞吐量的分布式發(fā)布訂閱消息系統(tǒng),通過O(1)的磁盤數據結構保證消息的持久性和穩(wěn)定性,其吞吐量可以支持每秒數百萬的消息。
3)Storm與Hadoop集群
Storm集群負責實時數據流的處理,由一個主節(jié)點和若干從節(jié)點組成。使用Storm的一個Spout插件storm-kafka可以持續(xù)不斷地從Kafka系統(tǒng)中讀取數據,然后通過Storm集群進行實時運算,同時將運算結果通過關系數據庫實現(xiàn)持久性保存。Hadoop集群由一個主管理節(jié)點和若干個計算節(jié)點組成,負責對歷史數據的批處理,實現(xiàn)對物聯(lián)網感知數據的深度分析與挖掘。
3.2.2 系統(tǒng)實現(xiàn)
1)數據源接入與數據緩存組件集成
Flume本身自帶了諸多數據源的sink,對于已存在的sink只需針對環(huán)境修改配置文件即可。本文通過自定義Flume中的Kafka sink實現(xiàn)兩者的集成,自定義Flume的sink需要繼承AbstractSink并實現(xiàn)Configurable接口,該接口主要包含消息處理process()方法和sink配置configure(Context arg0)方法。
?process()方法實現(xiàn)示例:
public Status process() throws EventDeliveryException{
……
byte[] body=event.getBody();
final String msg=new String(body);
final KeyedMessage
producer.send(message);
……
}
?configure(Context arg0)方法實現(xiàn)示例:
public void configure(Context arg0) {
Properties prop=new Properties();
prop.put(″zookeeper.connect″,zookeeperValue);
prop.put(″metadata.broker.list″,brokerValue);
prop.put(″serializer.class″,StringEncoder.class.getName());
producer=new Producer
}
2)ISpout與IBolt接口實現(xiàn)
Storm中的計算主要分為兩種類型,一個是數據源的處理Spout和中間數據的處理Bolt。Spout作為task運行在worker內容,主要負責數據的發(fā)送,其核心方法為nextTuple()。
?nextTuple()方法示例:
public void nextTuple() {
……
this.collector.emit(new Values(sendData));//發(fā)送數據
}
Bolt負責節(jié)點處理,既可以進行簡單的數據處理,也可以實現(xiàn)數據流的合并等復雜計算。其核心方法為excute(Tuple tuple,BasicOutputCollector collector)。
?excute(Tuple tuple,BasicOutputCollector collector)方法示例:
public void execute(Tuple tuple,BasicOutputCollector collector) {
String sentence=(String) tuple.getValue(0);
//數據處理邏輯
……
collector.emit(new Values(out));
}
物聯(lián)網感知數據的實時處理在現(xiàn)實中有較大的應用價值,例如智能交通中實時交通情況大數據分析,更加有利于公眾服務。本文設計的基于Storm的海量數據的實時處理系統(tǒng)具有高可靠、高效的特點,可以很好地應對物聯(lián)網數據異構的問題,并且可以勝任大規(guī)模的實時處理任務。
[1]高哲,翁祖泉.基于物聯(lián)網海量數據處理的實時數據庫應用研究[J].中國集成電路.2013(11):57-58.
[2]DIGNAN Larry.Internet of things:$8.9 trillion market in 2020,212 billion connected things[EB/OL].(2013-10-03)[2016-10-20].http://www.zdnet.com/article/internet-of-things-8-9-trillion-market-in-2020-212-billion-connected-things/.
[3]馬駿,郭淵博,馬建峰,等.物聯(lián)網感知層基于資源分層的多用戶訪問控制方案[J].電子學報,2014(1):28.
[4]羅劍明.制造物聯(lián)網的實時數據感知與處理模型的研究[D].廣州:廣東工業(yè)大學,2015:12.
[5]王慧.基于Hadoop的并行挖掘算法的研究[D].北京:首都師范大學,2013:3-4.
[6]趙卓峰,魏文飛,馬強.基于無共享架構的海量感知數據實時處理系統(tǒng)[J].微電子學與計算機,2012,29(9):10.
[7]王銘坤,袁少光,朱永利,等.基于Storm的海量數據實時聚類[J].計算機應用,2014,34(11):3078.
[8]覃雄派,王會舉,杜小勇,等.大數據分析:RDBMS與MapReduce的競爭與共生[J].軟件學報,2012,23(1):35-36.
[9]李川,鄂海紅,宋美娜.基于Storm的實時計算框架的研究與應用[J].軟件,2014,35(10):17.
[10]鄧立龍,徐海水.Storm實現(xiàn)的應用模型研究[J].廣東工業(yè)大學學報,2014,31(3):115.
責任編輯:楊子立
Real Time Massive Data Stream Processing in Internet of Things Based on Storm
ZHANG Qianjin
(Department of Information Engineering,Anhui Vocational College Of Defense Technology,Lu′an 237011)
According to the realtime and heterogeneous characteristics of data in the Internet of things,a realtime massive data sensing and processing model was designed on the basis of cloud computing.Based on the open source real time operating system Storm,a combination of real time massive data parallel processing mechanism and data processing flow realized real time massive data stream processing with core implementation method proposed.
Internet of things;real time data;massive data
10.3969/j.issn.1671?0436.2016.06.007
2016-11-21
安徽省高校自然科學研究重點項目(KJ2016A120)
張前進(1982— ),男,碩士,講師。
TP391
A
1671- 0436(2016)06- 0030- 04