朱 抗,何成昭,梁文超,王 超,胡衛(wèi)民
(株洲中車時(shí)代電氣股份有限公司,湖南 株洲 412001)
工業(yè)互聯(lián)網(wǎng)是基于工業(yè)設(shè)備的實(shí)時(shí)、高效數(shù)據(jù)采集體系,其通過端到端的數(shù)據(jù)集成與建模分析,最終實(shí)現(xiàn)設(shè)備的智能化控制[1]。自2012年國際工業(yè)巨頭GE公司提出工業(yè)互聯(lián)網(wǎng)的概念并發(fā)表了工業(yè)互聯(lián)網(wǎng)白皮書以來,工業(yè)互聯(lián)網(wǎng)相關(guān)的概念和技術(shù)開始快速普及。國外,GE公司推出工業(yè)互聯(lián)網(wǎng)操作系統(tǒng)Predix,西門子公司推出工業(yè)互聯(lián)網(wǎng)平臺MindSphere,ABB公司推出了ABB Ability數(shù)字化解決方案[2-3];國內(nèi),華為技術(shù)有限公司、三一集團(tuán)、海爾集團(tuán)、中國航天科工集團(tuán)有限公司等分別推出了FusionPlant、樹根互聯(lián)、COSMOPlat及CMSS等工業(yè)互聯(lián)網(wǎng)平臺,這使大量工業(yè)設(shè)備具備了與工業(yè)互聯(lián)網(wǎng)平臺互聯(lián)的基礎(chǔ)[4-5]。
隨著工業(yè)互聯(lián)網(wǎng)上大量設(shè)備的接入,傳回系統(tǒng)后臺的數(shù)據(jù)呈現(xiàn)爆發(fā)式增長的態(tài)勢,不僅數(shù)據(jù)體量大,而且數(shù)據(jù)種類多[6]。工業(yè)數(shù)據(jù)的爆發(fā)式增長需要高效的平臺來管理數(shù)據(jù)以及大容量的數(shù)據(jù)存儲介質(zhì)來實(shí)現(xiàn)海量數(shù)據(jù)高可靠的管理和存儲[7]。大量設(shè)備的運(yùn)行數(shù)據(jù)以及環(huán)境數(shù)據(jù)源源不斷地向遠(yuǎn)程服務(wù)端傳輸,給遠(yuǎn)程服務(wù)端帶來高并發(fā)的壓力,會引發(fā)服務(wù)器資源耗盡、數(shù)據(jù)丟失等問題;同時(shí)遠(yuǎn)程服務(wù)端在進(jìn)行數(shù)據(jù)收集、多種協(xié)議解析及實(shí)時(shí)數(shù)據(jù)處理的過程中,對數(shù)據(jù)處理的實(shí)時(shí)性要求高,否則將影響數(shù)據(jù)的持久化以及應(yīng)用層的用戶體驗(yàn)。為了滿足大量數(shù)據(jù)實(shí)時(shí)處理的需要,研究具備高并發(fā)、低時(shí)延特性的網(wǎng)絡(luò)服務(wù)程序就顯得格外重要[8]。為此,本文提出一種工業(yè)互聯(lián)網(wǎng)高并發(fā)流式數(shù)據(jù)處理方法及應(yīng)用方案,其首先利用Netty通信框架來解決海量數(shù)據(jù)回傳到遠(yuǎn)程服務(wù)端的高并發(fā)問題,然后利用Kafka消息隊(duì)列將數(shù)據(jù)解析、實(shí)時(shí)分析等耗時(shí)操作交由集群進(jìn)行分布式運(yùn)算,從而保證數(shù)據(jù)的持久化、實(shí)時(shí)處理操作的短響應(yīng)時(shí)間以及高吞吐量。
Netty由于具備并發(fā)性能高、傳輸速度快及便于封裝的優(yōu)點(diǎn),已成為高性能通信領(lǐng)域的首選框架[9]。為了實(shí)時(shí)處理Netty通信框架(圖1)中的高并發(fā)數(shù)據(jù),可以使用Kafka消息中間件以異步的方式將消息接收和寫入數(shù)據(jù)庫這種耗時(shí)操作進(jìn)線解耦,從而避免程序阻塞情況的發(fā)生[10]。
圖1 Netty整體框架Fig.1 Overall framework of Netty
Netty是基于NIO模型和epoll模型的異步通信框架,在開發(fā)具備良好可維護(hù)性、高性能、高擴(kuò)展性的網(wǎng)絡(luò)協(xié)議服務(wù)器方面具有很大的優(yōu)勢[11]。
NIO以塊的方式在緩沖區(qū)中處理數(shù)據(jù),極大地增強(qiáng)了數(shù)據(jù)操作的靈活性。NIO的非阻塞IO特性決定其允許線程利用操作間隙進(jìn)行其他事務(wù)處理,從而減少了服務(wù)器創(chuàng)建線程的數(shù)量,節(jié)省了服務(wù)器性能開銷。利用一個(gè)線程處理多個(gè)客戶端連接時(shí),會用到選擇器(selector)。選擇器能夠監(jiān)聽多個(gè)注冊的通道上是否有事件發(fā)生,如有事件發(fā)生,便獲取事件,然后針對每個(gè)事件進(jìn)行相應(yīng)的處理,這樣就不必為每個(gè)連接都創(chuàng)建一個(gè)線程,且只有真正有讀寫事件發(fā)生時(shí)才會進(jìn)行讀寫,極大地減少了系統(tǒng)開銷。
當(dāng)文件描述符的內(nèi)核緩沖區(qū)非空的時(shí)候,epoll模型發(fā)出可讀信號進(jìn)行通知;當(dāng)寫緩沖區(qū)不滿的時(shí)候,則發(fā)出可寫信號通知。這種主動通知機(jī)制避免了對大量并發(fā)連接進(jìn)行輪詢,減少了不必要的系統(tǒng)開銷。
Kafka是一種高吞吐量、持久化、分布式的消息隊(duì)列系統(tǒng),主要架構(gòu)涉及工作流程、存儲機(jī)制,以及生產(chǎn)者和消費(fèi)者,是當(dāng)前處理大數(shù)據(jù)計(jì)算的一個(gè)非常重要的組件,用于解決應(yīng)用解耦、異步通信及流量控制等問題(圖2)[12]。
圖2 Kafka整體框架Fig.2 Overall framework of Kafka
Kafka本質(zhì)上是一個(gè)消息隊(duì)列。消息隊(duì)列允許程序獨(dú)立地?cái)U(kuò)展或修改隊(duì)列兩邊的處理過程,從而實(shí)現(xiàn)解耦。系統(tǒng)可恢復(fù)性強(qiáng)體現(xiàn)在,即使一個(gè)處理消息的進(jìn)程掛掉,加入隊(duì)列中的消息仍然可以在系統(tǒng)恢復(fù)后被處理。消息隊(duì)列的緩沖機(jī)制有助于解決生產(chǎn)消息和消費(fèi)消息的處理速度不一致的問題。異步通信機(jī)制允許程序?qū)⑾⒎湃腙?duì)列,但是不必立即處理它。
Kafka處理的消息來自任意多個(gè)被稱為生產(chǎn)者(producer)的進(jìn)程,消息數(shù)據(jù)通過生產(chǎn)者被發(fā)布到不同主題(topic)下的分區(qū)(partition)。在分區(qū)內(nèi)部,消息被加上索引和時(shí)間戳存儲在一起,而其他被稱為消費(fèi)者(Consumer)的進(jìn)程則可以從不同分區(qū)訂閱消息。
對于涉及眾多不同行業(yè)工業(yè)設(shè)備且設(shè)備數(shù)據(jù)回傳方式存在多樣化的工業(yè)互聯(lián)網(wǎng)平臺,其用于海量回傳數(shù)據(jù)的多種通信協(xié)議需兼容性良好,網(wǎng)絡(luò)層應(yīng)具備高并發(fā)能力,可通過消息隊(duì)列靈活、高效地處理數(shù)據(jù);平臺需結(jié)合大數(shù)據(jù)技術(shù)對設(shè)備運(yùn)行工況進(jìn)行分析,了解關(guān)鍵器件在不同設(shè)備、不同地域、不同時(shí)間維度上的運(yùn)行狀況,以指導(dǎo)后續(xù)產(chǎn)品的系統(tǒng)及控制策略設(shè)計(jì)等。
工業(yè)互聯(lián)網(wǎng)平臺接入的設(shè)備類型有軋機(jī)、礦用卡車、鐵路供電裝置、風(fēng)場設(shè)備及暖通設(shè)備等,實(shí)時(shí)回傳的行業(yè)數(shù)據(jù)類型包括UDP協(xié)議數(shù)據(jù)、json格式數(shù)據(jù)等,通過這些數(shù)據(jù)可以實(shí)時(shí)監(jiān)控設(shè)備的運(yùn)行狀態(tài)。圖3示出一種多行業(yè)的數(shù)據(jù)回傳方式。由于設(shè)備安裝地點(diǎn)廣域、分散,為了在一套系統(tǒng)中匯總整合不同地域的設(shè)備運(yùn)行數(shù)據(jù),需將大量設(shè)備運(yùn)行數(shù)據(jù)、環(huán)境數(shù)據(jù)等匯總到一個(gè)數(shù)據(jù)中心,數(shù)據(jù)傳輸方式可借助無線網(wǎng)絡(luò)、VPN等多種方式進(jìn)行傳輸[13]。
圖3 多行業(yè)的數(shù)據(jù)回傳方式Fig.3 Data return mode of multi-industry
基于Netty的工業(yè)大數(shù)據(jù)平臺主要包含數(shù)據(jù)采集、數(shù)據(jù)傳輸、消息隊(duì)列、數(shù)據(jù)處理、數(shù)據(jù)推送及存儲等功能。其中,數(shù)據(jù)采集對象涉及風(fēng)場數(shù)據(jù)、軋機(jī)數(shù)據(jù)、空調(diào)數(shù)據(jù)及鐵路供電數(shù)據(jù)等,也可根據(jù)需要擴(kuò)展到更多的行業(yè)領(lǐng)域數(shù)據(jù);數(shù)據(jù)類型包括實(shí)時(shí)數(shù)據(jù)、文件數(shù)據(jù)及網(wǎng)絡(luò)數(shù)據(jù)等;數(shù)據(jù)通信部分涵蓋VPN通道傳輸、公網(wǎng)傳輸和4G傳輸?shù)榷喾N傳輸方式;數(shù)據(jù)處理部分采用分布式消息隊(duì)列Kafka結(jié)合Spark Streaming流式處理技術(shù)保證數(shù)據(jù)的高并發(fā)處理[14];數(shù)據(jù)存儲部分采用分布式存儲系統(tǒng)Hbase進(jìn)行海量數(shù)據(jù)的存儲;數(shù)據(jù)展示部分采用rest接口和WebSocket技術(shù)實(shí)現(xiàn)歷史數(shù)據(jù)的請求和實(shí)時(shí)數(shù)據(jù)推送。圖4示出數(shù)據(jù)處理流程設(shè)計(jì)。
圖4 數(shù)據(jù)處理流程設(shè)計(jì)Fig.4 Design of data processing flow
對于數(shù)據(jù)流的傳輸,Netty提供了ChannelHandler組件,它基于網(wǎng)絡(luò)事件觸發(fā),作為處理所有入站和出站數(shù)據(jù)的應(yīng)用程序邏輯的容器。應(yīng)用程序中的數(shù)據(jù)在Netty中的傳輸如圖5所示。對于應(yīng)用程序而言,接收并處理傳輸?shù)絅etty框架的數(shù)據(jù)需要ChannelInboundHandler子接口,應(yīng)用程序的業(yè)務(wù)邏輯通常要駐留在一個(gè)或者多個(gè)ChannelInboundHandler中,接收事件并執(zhí)行它們所實(shí)現(xiàn)的處理邏輯,并將數(shù)據(jù)傳遞給鏈中的下一個(gè)ChannelHandler。
圖5 包含入站和出站ChannelHandler的ChannelPipelineFig.5 ChannelPipeline containing inbound and outbound Channelhandlers
ChannelPipeline為ChannelHandler鏈提供了容器,并定義用于在該鏈上傳播的入站和出站事件流的應(yīng)用程序編程接口 (application programming interface,API)。當(dāng)通道(channel)被創(chuàng)建時(shí),它會被自動地分配到其專屬的ChannelPipeline。Netty的EventLoop是協(xié)同設(shè)計(jì)的一部分,它采用了2個(gè)基本的API:并發(fā)和網(wǎng)絡(luò)編程,而服務(wù)于Channel的I/O和事件的EventLoop包含在EventLoopGroup中。圖6示出EventLoopGroup的工作模式,它負(fù)責(zé)為每個(gè)新創(chuàng)建的Channel創(chuàng)建一個(gè)EventLoop,使用事件循環(huán)的方式進(jìn)行分配以獲取一個(gè)均衡的分布。
圖6 非阻塞傳輸?shù)木€程分配方式Fig.6 Thread allocation for non-blocking transmission
通過Netty服務(wù)器,以UXP協(xié)議對數(shù)據(jù)進(jìn)行解析,并最終將各行業(yè)數(shù)據(jù)歸一化為相同結(jié)構(gòu)的數(shù)據(jù)包,流程如圖7所示。經(jīng)過數(shù)據(jù)歸一化流程后,每個(gè)行業(yè)數(shù)據(jù)都將以相同結(jié)構(gòu)的Kafka實(shí)例發(fā)送到消息隊(duì)列中,然后在Spark程序中進(jìn)行解析,實(shí)現(xiàn)了行業(yè)數(shù)據(jù)與后臺程序之間的解耦。這將極大減小后臺程序之間的行業(yè)區(qū)別,給數(shù)據(jù)持久化和應(yīng)用層程序的開發(fā)帶來便利。
圖7 基于Netty和Kafka的網(wǎng)絡(luò)處理流程Fig.7 Network processing flow based on Netty and Kafka
為了驗(yàn)證所設(shè)計(jì)系統(tǒng)的性能,挑選了運(yùn)行在全國各地3個(gè)行業(yè)的528臺設(shè)備,以3種不同數(shù)據(jù)回傳方式向測試集群上傳需要處理的數(shù)據(jù),測試集群總共4個(gè)節(jié)點(diǎn)。測試場景如下:風(fēng)場接入25臺設(shè)備,回傳設(shè)備運(yùn)行數(shù)據(jù)、風(fēng)場遙信數(shù)據(jù)和風(fēng)場遙測數(shù)據(jù),每10 min將數(shù)據(jù)存為.dat文件并通過ftp回傳到測試集群,然后由前端程序進(jìn)行解析并發(fā)往Netty服務(wù)器;接入空調(diào)500臺,數(shù)據(jù)以json格式提供,由前端程序進(jìn)行請求并轉(zhuǎn)發(fā)給Netty服務(wù)器;接入軋機(jī)3臺,以公網(wǎng)傳輸?shù)姆绞酵ㄟ^上位機(jī)程序?qū)?shù)據(jù)包直接發(fā)往Netty服務(wù)器,數(shù)據(jù)回傳時(shí)間為3 s。
通過Netty服務(wù)器程序,分別在1次連接1 000次數(shù)據(jù)請求和1次只進(jìn)行1次數(shù)據(jù)請求但是循環(huán)1 000次兩種工況下進(jìn)行通信效率測試,設(shè)置并發(fā)數(shù)分別為128和256,得到的結(jié)果如圖8所示。
從測試結(jié)果可以看出,Netty服務(wù)器對于高并發(fā)數(shù)據(jù)處理的效率,在4萬次請求解析過程中響應(yīng)時(shí)間不超過400 ms。每次創(chuàng)建連接和重用連接的平均響應(yīng)時(shí)間最大差值也僅為70 ms,這就給工業(yè)領(lǐng)域多行業(yè)設(shè)備的接入帶來極大優(yōu)勢,可以不斷地創(chuàng)建新連接進(jìn)行數(shù)據(jù)回傳,性能損耗對應(yīng)用層幾乎沒有影響。
為了測試大量數(shù)據(jù)通過消息隊(duì)列進(jìn)行數(shù)據(jù)解析、數(shù)據(jù)持久化等耗時(shí)操作的性能,分別對消息隊(duì)列每秒所能處理的數(shù)據(jù)幀數(shù)、平均耗時(shí)及處理延遲時(shí)間等指標(biāo)進(jìn)行驗(yàn)證。當(dāng)前接入Kafka broker節(jié)點(diǎn)數(shù)為4個(gè),實(shí)驗(yàn)將對Kafka消息隊(duì)列的數(shù)據(jù)處理情況進(jìn)行監(jiān)控。由圖9可以得知,平均每秒處理的數(shù)據(jù)流為92.64幀,即可完成所有數(shù)據(jù)的消費(fèi)。由圖10可以得知,每筆數(shù)據(jù)從接入到處理再到存入數(shù)據(jù)庫平均所耗費(fèi)的時(shí)間為169 ms。由圖11可以得知,數(shù)據(jù)處理的平均延遲時(shí)間為171 ms。可見所設(shè)計(jì)系統(tǒng)在處理高并發(fā)流式數(shù)據(jù)的過程中,數(shù)據(jù)處理平均耗時(shí)和平均延遲時(shí)間均能滿足系統(tǒng)需求。
圖9 數(shù)據(jù)后臺平均處理數(shù)據(jù)流幀數(shù)Fig.9 Average frame number of data stream processed on server
圖10 數(shù)據(jù)后臺平均處理數(shù)據(jù)流耗時(shí)Fig.10 Average processing time of data stream on server
圖11 數(shù)據(jù)后臺平均處理數(shù)據(jù)流延時(shí)Fig.11 Average processing delay of data stream on server
本文提出一種工業(yè)互聯(lián)網(wǎng)高并發(fā)流式數(shù)據(jù)處理技術(shù)及應(yīng)用方案,并詳細(xì)介紹了多行業(yè)數(shù)據(jù)接入平臺的數(shù)據(jù)接入方式和數(shù)據(jù)處理方式、基于Netty的高并發(fā)數(shù)據(jù)處理機(jī)制和Kafka消息隊(duì)列。使用該方案的工業(yè)互聯(lián)網(wǎng)大數(shù)據(jù)平臺能滿足廣域、分散的大量設(shè)備的數(shù)據(jù)接入,并且支持多種數(shù)據(jù)格式以及多種傳輸協(xié)議,解決了不同行業(yè)數(shù)據(jù)的一致性問題,在高并發(fā)的情況下依然能保持毫秒級的處理速度。
后續(xù)可以考慮優(yōu)化消息中間件中Consumer和Partition的數(shù)量配置,一方面降低對系統(tǒng)硬件資源的需求,另一方面可以給應(yīng)用層軟件提供更加靈活的數(shù)據(jù)接口。