徐 江,張鴻宇,李軍懷,馮連強,王懷軍
(1.中國重型機械研究院股份公司,陜西 西安710032;2.西安理工大學(xué) 計算機科學(xué)與工程學(xué)院,陜西 西安710048)
當前,工業(yè)生產(chǎn)體系伴隨著數(shù)字化技術(shù)與移動互聯(lián)網(wǎng)的蓬勃發(fā)展,掀起了一場萬物互聯(lián)智慧化的新興革命,信息技術(shù)正在飛快的與工業(yè)生產(chǎn)中的基礎(chǔ)設(shè)施和管理系統(tǒng)相融合,將傳統(tǒng)的工業(yè)體系提升到更高的水平。德國“工業(yè)4.0”[1]與我國的“中國制造2025”[2],標志著工業(yè)生產(chǎn)制造將從自動化時代全面轉(zhuǎn)向信息化與智慧化的時代。在智慧化轉(zhuǎn)型的洪流中,傳統(tǒng)工業(yè)利用新一代的信息與工業(yè)互聯(lián)網(wǎng)技術(shù),采集的巨大結(jié)構(gòu)化的工業(yè)時序數(shù)據(jù)[3],深入挖掘與運用制造企業(yè)信息資源,以業(yè)務(wù)與數(shù)據(jù)作為雙驅(qū)動,逐步轉(zhuǎn)型為智能制造[4],提供更為全面、及時、科學(xué)、智慧的工業(yè)服務(wù)。物聯(lián)網(wǎng)技術(shù)的發(fā)展速度遠遠超過了對工業(yè)企業(yè)管理與生產(chǎn)模式的影響速度;物聯(lián)網(wǎng)(The Internet of Things, IOT)是工業(yè)信息化發(fā)展的基礎(chǔ)技術(shù),并依托于無線網(wǎng)絡(luò)、移動設(shè)備、SOC、傳感器等多種技術(shù)的進步,在集成度、靈敏性以及成本控制等方面愈發(fā)成熟[5]。利用RFID、工業(yè)現(xiàn)場總線、藍牙、紅外、探測器以及多種信號傳感器等采集設(shè)備與短距離傳輸技術(shù),采集并傳輸整個制造設(shè)備的生產(chǎn)數(shù)據(jù),并通過物聯(lián)網(wǎng)中的感知層網(wǎng)絡(luò)將這些數(shù)據(jù)傳遞到采集控制柜中,之后通過實時數(shù)據(jù)庫,上傳到監(jiān)測云平臺,經(jīng)由流數(shù)據(jù)處理平臺進行運算分析。因此,物聯(lián)網(wǎng)技術(shù)使得許多行業(yè)領(lǐng)域產(chǎn)生了以“海量”和“高速”為特征的數(shù)據(jù)。
在工業(yè)生產(chǎn)中,設(shè)備的生產(chǎn)數(shù)據(jù)以及各種傳感器采集到的信息數(shù)據(jù)會持續(xù)生成,且以數(shù)據(jù)記錄的形式發(fā)送。數(shù)據(jù)序列的量級會隨著時間增長而無限增大,這樣的動態(tài)數(shù)據(jù)集合被稱為流數(shù)據(jù)[6]。和傳統(tǒng)的靜態(tài)數(shù)據(jù)不同,流數(shù)據(jù)需要按照記錄或者根據(jù)滑動時間窗按順序進行遞增式處理,借助處理的結(jié)果,管理人員可以及時的了解業(yè)務(wù)活動的情況,從而迅速應(yīng)對新情況做出響應(yīng)。在流數(shù)據(jù)處理方面,數(shù)據(jù)流管理系統(tǒng)(DataStreamManagementSystem,DSMS)是近年來國內(nèi)外學(xué)者的一個研究熱點,流處理引擎(StreamProcessingEngine,SPE)[7,8]是進行實時流數(shù)據(jù)處理的一種主流方法。傳統(tǒng)的數(shù)據(jù)處理結(jié)構(gòu)對于流式數(shù)據(jù)的處理能力較差,對批式數(shù)據(jù)的分析[9]效果也不佳。尤其對于狀態(tài)監(jiān)測數(shù)據(jù)類型復(fù)雜、業(yè)務(wù)需求多變以及上層應(yīng)用集成等方面的需求[10],傳統(tǒng)SPE的解決能力有限。
設(shè)備在制造過程中時刻產(chǎn)生大量的數(shù)據(jù),其中一些數(shù)據(jù)需要進行及時處理分析,實時的將處理過后的數(shù)據(jù)上傳到監(jiān)控界面供生產(chǎn)管理人員進行監(jiān)控。監(jiān)測流數(shù)據(jù)以元(指定數(shù)量的數(shù)據(jù)包)為單位,以時間為序列,按照順序抵達處理平臺,可以被定義為
Dflow=[d1,d2,d3,…,dn]T
(1)
式中,Dflow為數(shù)據(jù)集合矩陣,di表示矩陣中單個維度下的數(shù)據(jù)元集合。一般情況下,Dflow遵循順序訪問或指定次數(shù)內(nèi)的訪問?;瑒哟翱谀P蛯Νh(huán)境中的流數(shù)據(jù)進行處理。對持續(xù)性的流動數(shù)據(jù)施加滑動窗口,有利于更好地適應(yīng)流數(shù)據(jù)延續(xù)性,實時性的特點。
對持續(xù)性的流動數(shù)據(jù)施加滑動窗口,有利于更好地適應(yīng)流數(shù)據(jù)延續(xù)性,實時性的特點。通常情況下,滑動窗口可分為三種類型,分別是基于數(shù)據(jù)元組、基于分區(qū)以及基于時間[11]。
(2)基于分區(qū)的滑動窗口包含整數(shù)N與流數(shù)據(jù)S的一組屬性參數(shù)(a1,a2,a3,…,ak)。根據(jù)屬性參數(shù)的需求,在邏輯上對S劃分為不同的子流,在每個子流上獨立計算大小為N的基于數(shù)據(jù)元組的滑動窗口,之后將這些窗口的結(jié)果進行整合產(chǎn)生輸出。
(3)基于時間的滑動窗口在流數(shù)據(jù)S中加入了時間周期T以及T與指定時間的關(guān)系函數(shù)R。在內(nèi),R()包含了S中所有屬于[t-T,]范圍內(nèi)的數(shù)據(jù)元組。借助滑動T實現(xiàn)對數(shù)據(jù)流的處理。
針對復(fù)雜重型裝備監(jiān)測數(shù)據(jù)規(guī)模大、類型多、實時性要求較高的需求,本文提出了基于滑動窗口的流數(shù)據(jù)并行處理方法。結(jié)合基于分區(qū)與基于時間兩種思想,構(gòu)建滑動窗口模型。在此模型中,數(shù)據(jù)被劃分為數(shù)個子流,按照數(shù)據(jù)類型對應(yīng)的處理邏輯進行并行處理操作,最后通過融合子流結(jié)果的方式獲得處理結(jié)果,為系統(tǒng)實時監(jiān)控等功能提供高質(zhì)量的數(shù)據(jù)管理。
本文結(jié)合基于分區(qū)與基于時間兩種滑動窗口思想,構(gòu)建單位時間周期下,融合子流處理結(jié)果的滑動窗口模型。在模型中,首先定義基本窗口與滑動窗口。基本窗口可以被定義為在[t-T,]內(nèi)抵達的流數(shù)據(jù)所組成的時間窗口。其中,時間跨度為
Wspan=T
(2)
而連續(xù)時間內(nèi)的基本窗口序列可以組建成為一個滑動窗口,可以表示為
Wslide=[Wk-n+i,Wk-n+i,Wk-n+i+1,…,Wk]
(3)
式中,Wk為第k個基本窗口抵達后的時間窗口,n為滑動窗口可容納的基本窗口數(shù)量。時間窗口的跨度可表示為
Wslide-span=nT
(4)
滑動窗口處理模型如圖2所示。
圖2 滑動窗口處理
并行實時運算模式是實現(xiàn)實時監(jiān)控與報警的有效途徑?;跁r間角度與不同的業(yè)務(wù)需求,并行處理滑動窗口模型結(jié)構(gòu)如圖3所示。
圖3 并行處理滑動窗口模型結(jié)構(gòu)
圖3中小方格是模型中的基本窗口,對應(yīng)一個監(jiān)測點數(shù)據(jù)計算單元,默認設(shè)置為1 s, 則整個窗口大小為覆蓋基本窗口的長度n。復(fù)雜重型裝備監(jiān)測中的狀態(tài)參數(shù)監(jiān)測數(shù)據(jù)會以數(shù)據(jù)流的形式陸續(xù)通過窗口,n個計算單元并行進行分析計算,每1 s會輸出n個參數(shù)的監(jiān)測分析結(jié)果。最后根據(jù)需求,按照不同的時間周期將處理后的基本窗口進行合并,統(tǒng)計每秒輸出的處理結(jié)果,并將其輸出。
監(jiān)測數(shù)據(jù)在處理的過程中需要引入滑動窗口,在基本窗口不變的情況下,根據(jù)數(shù)據(jù)實際需求,改變滑動窗口大小?;瑒哟翱诳缍茸兓?guī)則為:設(shè)置初始窗口大小為Winit,其中包含n個基本窗口Wbase。之后窗口隨數(shù)據(jù)流方向開始滑動,滑動增量為ε×Wbase,其中ε為整數(shù)且0<ε≤n,其具體值可以按照流數(shù)據(jù)元集合di的實際需求而定。數(shù)據(jù)進入窗口后,將按照本文步驟進行處理。
(1)每個基本窗口的流數(shù)據(jù)元會隨機分配到不同的PQueue(并行隊列)數(shù)據(jù)集合中。該數(shù)據(jù)集合會對流入的數(shù)據(jù)按照數(shù)據(jù)類型切割為數(shù)據(jù)片PSlice。PSlice通過配置服務(wù)獲取配置信息,其需要配置的參數(shù)如表1所示。
表1 PSlice配置參數(shù)
如圖4所示,流數(shù)據(jù)元中的數(shù)據(jù)包括多種數(shù)據(jù)類型,在單位時間窗口下數(shù)據(jù)量會彼此不同,因此切割后的PSlice的數(shù)據(jù)長度不等。每個PSlice均通過配置服務(wù)配置信息:數(shù)據(jù)類別作為當前PSlice的特征值;數(shù)據(jù)閾值用于甄別該PSlice的數(shù)據(jù)范圍;該數(shù)據(jù)類型對應(yīng)的處理流程接口。
圖4 基本窗口下PQueue中的PSlice
(2)采用輪詢機制,當有PQueue需要被分配時,將工作節(jié)點可用的線程數(shù)量按照從多到少排序。之后按照自定義的并行度依次從每個工作節(jié)點的線程池中調(diào)用可用線程,并將PQueue中當前窗口的PSlices分配給它們。如果一輪后當前窗口仍有PQueue存在未分配的PSlice,則從工作節(jié)點中再次調(diào)用下一個可用的線程,循環(huán)反復(fù)直到所有的PSlice被分配。這種調(diào)度機制不需要進程間通信,所以開銷較少,適合復(fù)雜重型裝備監(jiān)測的實際需求。
(3)每個線程均對應(yīng)一個處理邏輯單元PExcutor,該單元專注于處理同一數(shù)據(jù)類別的數(shù)據(jù),因此,PSlices被分配入PExcutor后,會被緩存在一個HashMap中,其Key值為PSlice的數(shù)據(jù)類別Tdi。PExcutor會根據(jù)Tdi獲取到該時間窗口下的所有數(shù)據(jù),之后調(diào)取處理接口對其進行批量處理。
(4)將當前窗口范圍內(nèi)的處理結(jié)果與上一窗口的數(shù)據(jù)結(jié)果進行統(tǒng)計處理。統(tǒng)計范圍為該數(shù)據(jù)類型的監(jiān)測周期Tm=ω(ε×Winit),其中ω可根據(jù)實際需求人為定義在配置服務(wù)中。
(5)按照滑動量持續(xù)滑動窗口,從而實現(xiàn)流數(shù)據(jù)持續(xù)穩(wěn)定的并行處理流程。
一個流數(shù)據(jù)元集合d在滑動窗口機制的支持下,主要有三種算法:數(shù)據(jù)接收、數(shù)據(jù)處理和滑動窗口。
3.2.1 算法1:數(shù)據(jù)接收DataAccept(D)
監(jiān)測數(shù)據(jù)會以流的形式持續(xù)不斷的輸入,選擇多進程機制保證服務(wù)器可以穩(wěn)定及時地接收數(shù)據(jù),同時可以提高接收效率。每次接收數(shù)據(jù)會進行存儲過程,為降低存儲壓力,每個進程均會對數(shù)據(jù)進行聚合,每隔一段時間后將聚合數(shù)據(jù)發(fā)送至存儲單元。存儲單元需要一邊將數(shù)據(jù)進行持久化處理,一邊將數(shù)據(jù)發(fā)送至處理單元。
算法1DataAccept(D)源代碼為
輸入:流數(shù)據(jù)D
aggregatedData = []
for t in timeRange(interval):
# 按照時間周期進行數(shù)據(jù)聚合
slice = D.get() # 獲取數(shù)據(jù)流數(shù)據(jù)
aggregatedData.extend(slice) # 聚合數(shù)據(jù)
kafka.receive(aggregatedData) # 存儲單元接收
kafka.sendToPQueue(aggregatedData)
# 發(fā)送至處理單元
3.2.2 算法2:數(shù)據(jù)處理DataProcessing(aggregatedData)
數(shù)據(jù)依照數(shù)據(jù)類型進入對應(yīng)的并行處理隊列后,進入等待狀態(tài)。狀態(tài)激活后,被處理線程調(diào)取,進行自定義處理。統(tǒng)計線程負責將處理結(jié)果進行合并統(tǒng)計,并輸出結(jié)果。
算法2DataProcessing(aggregatedData)源代碼為
輸入:聚合數(shù)據(jù)對象aggregatedData
輸出:處理結(jié)果PResult
for PQueue in PQueues:
PQueue.add(aggregatedData)
# 聚合數(shù)據(jù)輸入PQueue
PQueue.split(Td, configuration)
# 按照數(shù)據(jù)類別分割為PSlice并對其初始化
Worknodes.sort() # 對工作節(jié)點排序
for node in Worknodes:
thread = node.GetThreadPoolExecutor(degree) # 按照并行度啟動線程池
thread.run(PQueue)
combineResult(Td, interval, threads.result)
# 按照數(shù)據(jù)類型的指定周期合并處理結(jié)果
# thread 處理邏輯過程:
hashmap.add( thread.getSlice(PQueue))
# 將PSlice緩存入map中
for task in PExcutor:
# 對相同數(shù)據(jù)類型集中處理
task.execute(hashmap.getValue(PSlice.type) .union(),PSlice.processInterface)
3.2.3 算法3:滑動窗口SlideWindow
在實際算法中,若要統(tǒng)計單位時間t內(nèi)最近n秒的數(shù)據(jù),滑動窗口可劃分為n/t個slot,對窗口創(chuàng)建長度為n/t的數(shù)組。處理單元會在時間t內(nèi)通過算法2不停的將流數(shù)據(jù)進行處理。每隔時間段t會觸發(fā)滑動窗口的移動行為,即
Array[Slot3]?Array[Slot2],Array[Slot2]?Array[Slot1]
(5)
算法3SlideWindow源代碼為
Slot currentSlot # 當前激活的slot
LinkedList
long time = System.currentTimeMillis()
currentSlot = new Slot(time)
elif((time-currentSlot.timstamp )> t):
# 每單位時間t創(chuàng)建一個slot
historySlots.add(currentSlot)
currentSlot = new Slot(time)
DataProcessing() # 執(zhí)行處理過程
void maintain():
# 維護窗口,刪除已處理slot
if(historySlots.isEmpty()) return
slot = historySlots[0]
long ts = System.currentTimeMillis()
if(ts-slot.timestamp > n):
# 超過滑動距離后刪除
historySlots.remove(0)
自2010年雅虎公司公開其通用分布式流處理平臺S4[12]起,許多用途相近又各具各色的平臺相繼被提出,如Storm[13],SparkStreaming[14],Samza[15]和MillWheel[16]等。Storm最大的提升在于提供消息處理反饋機制和巧妙的利用異或計算保障記錄被完全處理,平臺采用弱中心化的結(jié)構(gòu)。借助其特性搭建本文提出的基于滑動窗口并行處理方法的實現(xiàn)平臺,PQueue、PSlice、PExcutor均可由Storm中的Blot去實現(xiàn),其模型拓撲結(jié)構(gòu)圖如圖5所示。
圖5 Storm下的并行滑動窗口模型
拓撲結(jié)構(gòu)由負責數(shù)據(jù)來源的Spout與多個不同功能的Bolt組成。數(shù)據(jù)流由KafkaSpout開始,以直接分組的形式發(fā)送到預(yù)處理Bolt中,在此執(zhí)行格式化或類型轉(zhuǎn)換等操作;預(yù)處理Bolt按照字段ID將數(shù)據(jù)發(fā)送到滑動窗口Bolt中;滑動窗口Bolt會依照滑動窗口模型的結(jié)構(gòu),先將數(shù)據(jù)流發(fā)送到模型Bolt中,利用按照業(yè)務(wù)需求自定義的模型Bolt對數(shù)據(jù)進行處理,之后會將處理結(jié)果隨機地發(fā)送到至統(tǒng)計Bolt中;統(tǒng)計Bolt進行統(tǒng)計處理后將結(jié)果轉(zhuǎn)發(fā)到存儲Bolt,由其進行數(shù)據(jù)視圖的存儲。該拓撲實現(xiàn)可分為5個步驟。
(1)創(chuàng)建拓撲構(gòu)造器TopologyBuilder;
(2)配置SpoutConfig;
(3)利用拓撲構(gòu)造其中的setSpout()與setBolt()方法創(chuàng)建KafkaSpout對象,預(yù)處理Bolt對象與滑動窗口Bolt對象,其執(zhí)行器的并發(fā)度設(shè)為1??烧{(diào)用withWindow()方法設(shè)置基本窗口的時間大小,默認設(shè)置為1 s;
(4)按照業(yè)務(wù)需求構(gòu)建模型Bolt對象,執(zhí)行器的并發(fā)度設(shè)為n;
(5)創(chuàng)建統(tǒng)計Bolt對象與存儲Bolt對象,配置最終存儲方案。
流數(shù)據(jù)并行處理窗口借助于Storm的拓撲結(jié)構(gòu)構(gòu)建后,實現(xiàn)包含n個基本窗口的滑動窗口在單位周期內(nèi)對接收數(shù)據(jù)的處理。在時間窗口獨立并行下,最終完成指定時間內(nèi)統(tǒng)計最近ns的監(jiān)測數(shù)據(jù)。
基于Lambda架構(gòu)[17],設(shè)計了流數(shù)據(jù)處理平臺,用于實時監(jiān)控的流數(shù)據(jù)需要經(jīng)過預(yù)處理、閾值判斷或聚合等多種處理流程。將數(shù)據(jù)處理分成批量處理平臺與增量處理平臺兩部分,以應(yīng)對數(shù)據(jù)的多樣化需求,流數(shù)據(jù)處理平臺結(jié)構(gòu)如圖6所示。
圖6 流數(shù)據(jù)處理平臺結(jié)構(gòu)
批量處理平臺可用于不同數(shù)據(jù)集的任意查詢,并實現(xiàn)對數(shù)據(jù)集的深入分析,例如設(shè)備狀態(tài)評估模型、能耗分析等業(yè)務(wù)功能。增量處理平臺則需要對某一數(shù)據(jù)序列增量式地更新指標、報告和匯總統(tǒng)計結(jié)果,以響應(yīng)每個到達的數(shù)據(jù)記錄,更適合為設(shè)備實時監(jiān)控和報警響應(yīng)等功能提供數(shù)據(jù)支持。
構(gòu)建混合模式的數(shù)據(jù)處理平臺,同時維護批量處理與增量處理兩個部分。數(shù)據(jù)流從實時數(shù)據(jù)庫開始,通過Flume組件對其進行采集整理,整理后的數(shù)據(jù)流按照需求標準分為批量數(shù)據(jù)流與實時數(shù)據(jù)流兩個部分,分別發(fā)送到相應(yīng)的flume-sink上。批量數(shù)據(jù)流直接存儲到批量處理平臺中的HDFS上,可利用如MapReduce等多種大數(shù)據(jù)處理方式對其進行數(shù)據(jù)分析,生成批計算視圖,數(shù)據(jù)量較小的視圖存儲到MySQL中,數(shù)據(jù)量較大的存儲在HBase上。實時數(shù)據(jù)流對應(yīng)的sink是Kafka,Kafka作為消息通道,將實時數(shù)據(jù)流的增量數(shù)據(jù)發(fā)送到Storm中的Spout,之后通過實時計算得到實時視圖,將其存儲在Redis等緩存數(shù)據(jù)庫中,方便快速訪問。
增量處理平臺用來處理增量的實時數(shù)據(jù)。增量處理平臺對數(shù)據(jù)處理后會生成實時處理視圖(Real-Time View)。為了提升處理效率,平臺會在接收到新數(shù)據(jù)后不斷刷新實時視圖,采用增量計算對數(shù)據(jù)進行分段處理,因此延遲較小,更適合實時處理。增量處理平臺的數(shù)據(jù)訪問可以定義為
RealTimeView=function(realtimeview,newdata)
(6)
面臨高度動態(tài)的實時數(shù)據(jù)信息,增量處理平臺選擇使用Strom創(chuàng)建拓撲結(jié)構(gòu)來轉(zhuǎn)換數(shù)據(jù)流,持續(xù)處理到達的數(shù)據(jù)。為避免在做實時運算的過程中因為數(shù)據(jù)擁塞而導(dǎo)致服務(wù)器掛掉,本文選擇Kafka[18]作為消息隊列,將不均勻的數(shù)據(jù)轉(zhuǎn)化成均勻的數(shù)據(jù)流,從而與Storm完善結(jié)合,實現(xiàn)穩(wěn)定的流運算。增量處理平臺的框架結(jié)構(gòu)如圖7所示。
圖7 增量處理平臺結(jié)構(gòu)
數(shù)據(jù)經(jīng)過Flume分流后,增量數(shù)據(jù)被發(fā)送至Kafka中。Kafka作為專門面向海量數(shù)據(jù)傳輸而開發(fā)的分布式消息中間件,以提供生產(chǎn)消費訂閱,流數(shù)據(jù)處理等功能被廣泛應(yīng)用[19]。在增量處理平臺中,通過Kafka創(chuàng)建用于處理增量數(shù)據(jù)的Topic,之后即可通過配置Flume的Sink數(shù)據(jù)流向,將RealTimeEvent與Event發(fā)送到該topic上。接收到增量數(shù)據(jù)后,Kafka將其發(fā)送到由Storm構(gòu)建的拓撲模型中。Storm對其進行增量處理,之后生成相應(yīng)的視圖或結(jié)果,發(fā)送到指定界面中或者緩存到Redis中供業(yè)務(wù)服務(wù)消費。
搭建1個主節(jié)點和2個從節(jié)點組成的Storm+Kafka虛擬集群,實驗數(shù)據(jù)用模擬的方式,人為輸入模擬數(shù)據(jù),將其流入到Kafka中的Spout。考慮到復(fù)雜重型裝備監(jiān)測中的數(shù)據(jù)實際監(jiān)控周期,30 s較為接近各個數(shù)據(jù)類型的單位時間周期,故滑動窗口設(shè)置為30 s,基本時間窗口設(shè)置為1 s,模型Bolt定義為閾值判斷對模擬數(shù)據(jù)進行篩選,以測試整個平臺的性能。
表2 實驗平臺
其中一臺虛擬機既作為主節(jié)點,又充當從節(jié)點,剩余三臺虛擬機為從節(jié)點,受主節(jié)點調(diào)配。
5.1.1 集群與單機對比
設(shè)置集群參與工作的節(jié)點為4,設(shè)置Spout與模型Bolt的并發(fā)度分別為1與15,分別觀察在單機與集群下進行數(shù)據(jù)閾值判斷的吞吐量,結(jié)果如圖8所示。
圖8 單機與集群吞吐量對比
從圖8可以看出,單機環(huán)境可在單位時間內(nèi)處理將近6萬條左右的數(shù)據(jù),而集群環(huán)境的處理能力則大大增加。在數(shù)據(jù)量低于5萬條左右時,單機處理要比集群性能好,隨著數(shù)據(jù)流的不斷輸入,集群的高性能便凸顯出來,不過由于涉及到機器之間的數(shù)據(jù)分發(fā),集群相比于單機在數(shù)據(jù)處理能力上存在較大的不穩(wěn)定性。本次對比實驗證明在流數(shù)據(jù)量級逐漸增大的情況下,通過集群部署的方式可以有效提高數(shù)據(jù)并行計算的能力。
5.1.2 集群節(jié)點數(shù)對比
設(shè)置全局參與進程數(shù)為4,Spout與模型Bolt的并發(fā)度仍然為1與15,分別設(shè)置集群參與工作的節(jié)點個數(shù)為2、3、4。在其他變量不變,只修改集群節(jié)點個數(shù)的前提下逐漸增加數(shù)據(jù)規(guī)模,觀察其進行數(shù)據(jù)判斷的吞吐量,結(jié)果如圖9所示。
圖9 集群節(jié)點數(shù)吞吐量對比
在數(shù)據(jù)量級分別為10萬條,30萬條與60萬條的前提下,節(jié)點數(shù)不變,增加數(shù)據(jù)量級基本不會改變系統(tǒng)的吞吐量,而吞吐量的細微變化可能與設(shè)備配置、任務(wù)分發(fā)與數(shù)據(jù)傳輸環(huán)境有關(guān)。在相同數(shù)據(jù)規(guī)模下,增加節(jié)點個數(shù)可以明顯看到系統(tǒng)吞吐量出現(xiàn)上升的趨勢。本次對比實驗證明通過增加參與運算的集群節(jié)點可以有效提升整個系統(tǒng)的并行計算能力。
5.1.3 參與進程數(shù)對比
設(shè)置集群參與工作的節(jié)點為4,設(shè)置Spout與模型Bolt的并發(fā)度仍為1與15,改變?nèi)謪⑴c工作的進程數(shù)量為2、3、4、8,觀察增加數(shù)據(jù)規(guī)模后的系統(tǒng)吞吐情況,結(jié)果如圖10所示。
圖10 進程數(shù)吞吐量對比
由圖10可知,數(shù)據(jù)量級不變的情況下,隨著進程個數(shù)的增加,系統(tǒng)的吞吐量出現(xiàn)了先上升后下降的情形。例如當數(shù)據(jù)量為10萬條時,在2進程下單位時間內(nèi)數(shù)據(jù)處理量為6 500條,進程個數(shù)增加到4為止,吞吐量一直處于上升趨勢,達到單位時間內(nèi)8 600條,而進程個數(shù)進一步增加到8時,吞吐量下降。本次對比實驗說明在一定程度上,增加參與工作進程的個數(shù)可以提高集群的并行計算能力,但是這個數(shù)量是有限制的,這個閾值與集群的工作節(jié)點數(shù)量有關(guān)。
通過3個對比實驗結(jié)果可以得出結(jié)論,在集群部署下,通過合理設(shè)置集群節(jié)點數(shù)與工作進程數(shù),可以有效的提升增量處理平臺針對流數(shù)據(jù)并行處理的計算能力。
在集群節(jié)點與進程數(shù)一定的前提下,通過修改模型Bolt的并發(fā)度,來驗證模型Bolt對于系統(tǒng)處理數(shù)據(jù)的延遲影響。本實驗對于延遲定義為數(shù)據(jù)從Spout出發(fā)到其完全處理完成所消耗的時間。對模型Bolt的并發(fā)度分別設(shè)置為5、15、30后,得到的延遲情況如圖11所示。
圖11 模型Bolt延遲性測試
由圖11可知,當并發(fā)度設(shè)置較小時,處理模塊無法有效的對當前數(shù)據(jù)及時處理,從而造成數(shù)據(jù)堆積現(xiàn)象,最終導(dǎo)致系統(tǒng)延遲急速上漲;當并發(fā)度設(shè)置為滑動窗口長度的一半時,基本上可以滿足流數(shù)據(jù)的處理需求;當并發(fā)度等于滑動窗口長度時,延遲進一步降低,但是效果提升有限,分析認為可能和設(shè)備性能瓶頸與虛擬機環(huán)境有關(guān)。結(jié)合吞吐量的測試實驗結(jié)果,說明基于滑動窗口的并行流數(shù)據(jù)處理方法可以滿足復(fù)雜重型裝備實時監(jiān)測對于數(shù)據(jù)實時性的需求。
針對復(fù)雜重型裝備制造過程中設(shè)備的監(jiān)測,本文提出了一種基于滑動窗口的流數(shù)據(jù)并行處理方法。融合了基于分區(qū)與基于時間兩種滑動窗口思想,有利于更好地適應(yīng)流數(shù)據(jù)延續(xù)性和實時性;并基于時間角度與不同的業(yè)務(wù)需求,提出了并行處理滑動窗口模型,設(shè)計了并行處理算法;借助于Storm的拓撲結(jié)構(gòu),實現(xiàn)滑動窗口在單位周期內(nèi)對接收數(shù)據(jù)的處理,基于Storm拓撲結(jié)構(gòu)和Lambda架構(gòu)設(shè)計流數(shù)據(jù)處理平臺,實現(xiàn)流數(shù)據(jù)的預(yù)處理、閾值判斷或聚合等多種處理。最后對其進行了吞吐量與延遲的測試實驗,驗證了方法與平臺的可用性。