趙潤發(fā),婁淵勝+,葉 楓,石 宏
(1.河海大學(xué) 計算機與信息學(xué)院,江蘇 南京 211100; 2.南京廣廈軟件有限公司 工業(yè)大數(shù)據(jù)開發(fā)部,江蘇 南京 210000)
針對工業(yè)大數(shù)據(jù)平臺的研究[1-4],文獻(xiàn)[5]采用Dubbo與NoSQL構(gòu)建了工業(yè)領(lǐng)域大數(shù)據(jù)平臺,為工業(yè)領(lǐng)域不斷增長的數(shù)據(jù)提供了解決辦法。文獻(xiàn)[6]提出了一個面向工業(yè)的數(shù)據(jù)處理系統(tǒng),其以Spark為框架,選取MySQL和HDFS為存儲介質(zhì),實現(xiàn)了工業(yè)數(shù)據(jù)的快速分析。文獻(xiàn)[7]將物聯(lián)網(wǎng)與大數(shù)據(jù)相結(jié)合,構(gòu)建了一個工廠能耗分析平臺,實現(xiàn)了能耗數(shù)據(jù)的查詢以及數(shù)據(jù)的分析。
目前工業(yè)大數(shù)據(jù)平臺已得到廣泛研究,但工業(yè)大數(shù)據(jù)平臺技術(shù)架構(gòu)不同,差異性較大,再者對于很多工業(yè)大數(shù)據(jù)平臺而言,其數(shù)據(jù)處理效率較低,預(yù)警時間較長。針對上述問題,本文研究了一種基于Flink的工業(yè)大數(shù)據(jù)平臺,主要貢獻(xiàn)如下:
(1)采用Kafka和Flink進(jìn)行集成,對數(shù)據(jù)進(jìn)行傳輸和處理,并將處理過的數(shù)據(jù)按照類型存儲至數(shù)據(jù)池中;
(2)利用Flink對工業(yè)大數(shù)據(jù)進(jìn)行預(yù)處理,提高平臺運行的準(zhǔn)確性;
(3)采用多種大數(shù)據(jù)技術(shù),實現(xiàn)工業(yè)大數(shù)據(jù)平臺的數(shù)據(jù)查詢以及預(yù)警功能,且相對于典型大數(shù)據(jù)平臺而言,速度更快、效率更高。
Apache Flink是一個分布式處理框架,可在無邊界和有邊界數(shù)據(jù)流上進(jìn)行計算[8]。Flink不僅能運行在YARN、Mesos等資源管理框架上,而且能在單獨集群中運行,適用于具有不可靠數(shù)據(jù)源、海量數(shù)據(jù)處理等場景。此平臺采用Flink的最主要原因是:工業(yè)大數(shù)據(jù)類型雜,既包括流數(shù)據(jù),又包括批數(shù)據(jù),而Flink兩者都可以處理。它適用的主要場景是流數(shù)據(jù)方面的,而批數(shù)據(jù)是“特殊的流數(shù)據(jù)”,所有任務(wù)都可以當(dāng)成流來處理[8],并且數(shù)據(jù)處理延遲性較低。其架構(gòu)[8]如圖1所示。
對于流數(shù)據(jù)應(yīng)用來說,F(xiàn)link提供DataStream API。對于批數(shù)據(jù)處理應(yīng)用來說,提供DataSet API。它支持Java和Scala語言,同時支持Kafka的輸入數(shù)據(jù)和ElasticSearch、MySQL、InfluxDB多種數(shù)據(jù)庫。Flink同時具有高度靈活的窗口操作,包括time、count等窗口操作,如:每隔多久發(fā)送數(shù)據(jù)至客戶端、每次發(fā)送數(shù)據(jù)的個數(shù)等,十分適用于工業(yè)場景。
圖1 Flink架構(gòu)
Kafka是一個基于Zookeeper的分布式消息系統(tǒng),它具有高吞吐、低延遲、可靠性好、容錯能力強的良好特性[9]。低延遲體現(xiàn)在Kafka每秒能夠處理巨量信息且延遲很低,只有幾毫秒,適用于工業(yè)生產(chǎn)過程中海量數(shù)據(jù)的處理;高吞吐率體現(xiàn)在即使應(yīng)用在廉價的商用機器上,Kafka也能進(jìn)行每秒100 K消息的傳輸。Kafka也較為可靠,傳輸?shù)臄?shù)據(jù)可以在本地磁盤持久保存,同時數(shù)據(jù)會自動進(jìn)行備份,數(shù)據(jù)丟失后仍可找到數(shù)據(jù)。Kafka容錯性較好,集群中節(jié)點是允許失敗的(如副本數(shù)量為n,則n-1個節(jié)點是允許失敗的)[9]。此平臺選用Kafka消息隊列能夠更好地解耦,也增強了平臺的擴展性,即使企業(yè)數(shù)據(jù)發(fā)生改變,不需要改變代碼和調(diào)節(jié)參數(shù)就可以輕松實現(xiàn)用戶要求。同時也保證了數(shù)據(jù)傳送的順序性和安全性。
Grafana是一個可靠性較好的可視化和度量分析工具。它具有靈活和快捷的客戶端圖表,有多種可視化指標(biāo)和面板插件,官方庫里有圖表、折線圖、文本文檔等豐富的儀表盤插件;它支持多種數(shù)據(jù)庫如:MySQL、InfluxDB、Prometheus、OpenTSDB、Elasticsearch和KairosDB等等;Grafana可通過直觀的可視化方式進(jìn)行預(yù)警并發(fā)送通知,當(dāng)獲得的數(shù)據(jù)大于用戶設(shè)定的閾值時通知Slack、DingDing、Email等;并且數(shù)據(jù)源不同,但仍可以使用在同一圖表中,數(shù)據(jù)源的來源可以根據(jù)每個查詢決定,也可以自定義數(shù)據(jù)源;Grafana同時具有豐富的注釋圖,注釋圖表能顯示不同數(shù)據(jù)源的豐富事件,當(dāng)鼠標(biāo)停留在圖表時,會以全面的標(biāo)記來顯示出元數(shù)據(jù)。
InfluxDB是一個用于處理海量數(shù)據(jù)寫入與數(shù)據(jù)查詢的時間序列數(shù)據(jù)庫,應(yīng)用于有大量時間戳數(shù)據(jù)的場景下,例如DevOps(過程、方法、系統(tǒng))監(jiān)控,物聯(lián)網(wǎng)工業(yè)數(shù)據(jù)實時分析等。它是分布式擴展的,不依賴外部任何條件。它還可以對ETL進(jìn)行后臺處理并實時監(jiān)控預(yù)警。它有類似SQL的查詢語言,可輕松方便查詢到需要的數(shù)據(jù)。不僅如此,InfluxDB連續(xù)查詢自動計算聚合數(shù)據(jù),大大提高了頻繁查詢的效率。本平臺中的數(shù)據(jù)量較大,時間戳數(shù)據(jù)較多,因此InfluxDB是工業(yè)大數(shù)據(jù)存儲的絕佳選擇。
本平臺旨在實現(xiàn)一個能滿足對工業(yè)大數(shù)據(jù)進(jìn)行存儲、集成、分析的平臺,能夠為企業(yè)多種業(yè)務(wù)提高指導(dǎo)和決策支持。其架構(gòu)如圖2所示,其主要分為5個部分,包括:數(shù)據(jù)源模塊、消息隊列模塊、數(shù)據(jù)存儲模塊、數(shù)據(jù)處理模塊、可視化模塊。
此平臺的數(shù)據(jù)源主要分為兩種,一種是靜態(tài)系統(tǒng)數(shù)據(jù),第二種是實時流數(shù)據(jù)。數(shù)據(jù)源獲取的方式主要如下:
(1)靜態(tài)系統(tǒng)數(shù)據(jù)一般是由公司專門人員去收集,如設(shè)備生產(chǎn)日期、企業(yè)名稱、設(shè)備編號等,這些數(shù)據(jù)以特定的形式整理形成一個Excel表格,能夠直接使用;
(2)大多數(shù)的企業(yè)獲取數(shù)據(jù)的方式都是通過各種傳感器,傳感器獲取到的設(shè)備的狀態(tài)、運行時間等實時數(shù)據(jù),然后將這些數(shù)據(jù)發(fā)送給此平臺的處理系統(tǒng);
(3)企業(yè)的很多數(shù)據(jù)會分布在不同地區(qū)的不同公司,所以這時候它們通常會以日志的形式存在,而Flume是一個很好的日志收集工具[10]。這個工具能夠?qū)⑦@些日志文件識別出來,并整理收集在一起,并發(fā)往此工業(yè)大數(shù)據(jù)平臺;
(4)工業(yè)生產(chǎn)中會產(chǎn)生很多業(yè)務(wù)靜態(tài)數(shù)據(jù),但它們的格式可能不是我們所需要的,此時可以使用Sqoop數(shù)據(jù)源轉(zhuǎn)換工具,將它們轉(zhuǎn)換為我們所需要的格式,然后再將這些數(shù)據(jù)發(fā)送給工業(yè)大數(shù)據(jù)平臺。
圖2 體系架構(gòu)
消息隊列主要指數(shù)據(jù)在傳輸過程中保存數(shù)據(jù)的一個容器。工業(yè)大數(shù)據(jù)類型多,數(shù)據(jù)量大,面對此場景使用消息隊列是一個極佳的選擇,因為消息隊列能夠極大地降低系統(tǒng)響應(yīng)時間、提高系統(tǒng)穩(wěn)定性,同時保證數(shù)據(jù)傳輸?shù)捻樞蛐院桶踩?,最重要的是實現(xiàn)數(shù)據(jù)的異步化,并起到解耦的作用。
此模塊選用Kafka作為消息隊列系統(tǒng),利用Flink將數(shù)據(jù)源模塊中的實時數(shù)據(jù)和批數(shù)據(jù)都暫存至消息隊列中。Flink作為生產(chǎn)者,會源源不斷地生產(chǎn)出消息,然后發(fā)送給消息隊列Kafka中,而Kafka就成為了消費者,它會不斷地從Flink中獲取到消息,從而對這些數(shù)據(jù)進(jìn)行進(jìn)一步處理。
本模塊主要采用Flink來處理實時大數(shù)據(jù)和離線批數(shù)據(jù)。根據(jù)數(shù)據(jù)類型,將此模塊又分為實時數(shù)據(jù)處理模塊和批數(shù)據(jù)處理模塊。Flink能夠同時支持批處理與流處理任務(wù),它包含兩種預(yù)先定義的函數(shù):DataStream API和DataSet API。DataStream API 包括reduce、aggregations、filter等方法。DataSet API包括distinct、Hash-Partition、window等方法。
批數(shù)據(jù)處理模塊中,此平臺會利用aggregations中的sum()、min()、max()方法對批數(shù)據(jù)進(jìn)行統(tǒng)計,求出工業(yè)數(shù)據(jù)的最大值、最小值、總和等,并在前端顯示出來。
流數(shù)據(jù)處理模塊主要是對數(shù)據(jù)進(jìn)行預(yù)處理。在工業(yè)大數(shù)據(jù)的實際生產(chǎn)過程中,由于人工失誤或者數(shù)據(jù)采集設(shè)備因生產(chǎn)環(huán)境惡劣會導(dǎo)致收集到的數(shù)據(jù)不準(zhǔn)確,這些數(shù)據(jù)如果直接存入到數(shù)據(jù)庫中不僅會降低大數(shù)據(jù)平臺查詢數(shù)據(jù)的準(zhǔn)確性,而且會大大降低平臺的運行效率。此模塊主要利用Flink來去除實際業(yè)務(wù)處理中的無效數(shù)據(jù)、重復(fù)數(shù)據(jù)以及缺失率較高的數(shù)據(jù),其預(yù)處理的流程如圖3所示。
圖3 預(yù)處理流程
數(shù)據(jù)預(yù)處理方法具體步驟如下:
(1)首先利用Flink從Kafka中獲取到數(shù)據(jù),然后通過Flink自帶的RocksDB狀態(tài)后端去重方式對工業(yè)大數(shù)據(jù)進(jìn)行去重。我們需要開啟RocksDB狀態(tài)后端并對其參數(shù)進(jìn)行配置,如數(shù)據(jù)過期的時間、是否過期的數(shù)據(jù)能再次被訪問等,接著注冊Flink定時器。我們也可以利用Flink的TTL機制,打開RocksDB狀態(tài)后端的TTL compaction filter,這樣能在后臺實現(xiàn)重復(fù)數(shù)據(jù)的自動刪除。在處理重復(fù)數(shù)據(jù)時,如果數(shù)據(jù)的key(如事件ID)對應(yīng)的狀態(tài)不存在,說明此數(shù)據(jù)沒有出現(xiàn)過,可以更新狀態(tài)并且輸出數(shù)據(jù)。反之,說明此數(shù)據(jù)已經(jīng)出現(xiàn)過,RocksDB就會將其自動刪除。同時我們可利用Flink SQL提供的distinct去重方法來統(tǒng)計重復(fù)數(shù)據(jù)的明細(xì)結(jié)果;
(2)然后對實際生產(chǎn)過程中的無效數(shù)據(jù)進(jìn)行刪除。這里利用FlinkDataStream API的Evictor()方法對WindowFunction前后的數(shù)據(jù)進(jìn)行處理。Evictor()方法包括Count-Evictor、DeltaEvictor和TimeEvictor以及自定義的Evictor。CountEvictor是在窗口中設(shè)置保持的數(shù)據(jù)數(shù)量,如:evictor(CountEvictor.of(10000)),意思是窗口中最大的數(shù)據(jù)量為10 000,若大于10 000條,則剔除。在實際生產(chǎn)過程中也會產(chǎn)生很多已過時的無效數(shù)據(jù),其不僅會影響平臺數(shù)據(jù)查詢的正確性,而且會增加平臺的資源消耗,進(jìn)而影響執(zhí)行效率,而Flink 自帶的TimeEvictor方法能將最新時間的數(shù)據(jù)篩選出來,去除過時的數(shù)據(jù)。其主要將當(dāng)前窗口中最新元素的時間減去時間間隔,然后將小于該結(jié)果的數(shù)據(jù)全部剔除。DeltaEvictor方法通過定義DeltaFunction和指定threshold(閾值),計算出窗口間數(shù)據(jù)的Delt大小,如果超過了閾值則將當(dāng)前數(shù)據(jù)元素刪除,這樣可以去除那些因為機器故障或者外部原因產(chǎn)生的差別較大的無效數(shù)據(jù)。同時也可以根據(jù)用戶的需求自定義Evictor方法來去除那些無效數(shù)據(jù);
(3)利用步驟(1)中distinct去重方式的Distinct-Accumulator 與CountAccumulator方法統(tǒng)計單條數(shù)據(jù)value值的數(shù)量,DistinctAccumulator()內(nèi)部包含一個map結(jié)構(gòu),key包含的是一條數(shù)據(jù)的屬性值,而value則是屬性值出現(xiàn)的次數(shù)。若缺少的value值過多(大于50%),直接刪除缺失數(shù)據(jù)的記錄。反之認(rèn)定數(shù)據(jù)為有效數(shù)據(jù);
(4)原數(shù)據(jù)經(jīng)過預(yù)處理后得到新數(shù)據(jù),將這些數(shù)據(jù)存儲至數(shù)據(jù)池中。
通過一系列的數(shù)據(jù)預(yù)處理,可以有效防止臟數(shù)據(jù)影響平臺的正常運行。
工業(yè)大數(shù)據(jù)異構(gòu)性較強,數(shù)據(jù)類型較為復(fù)雜,這些數(shù)據(jù)通常以不同形式存儲在不同的數(shù)據(jù)庫或者數(shù)據(jù)管理系統(tǒng)中,所以管理起來較為麻煩,因此企業(yè)急需一個平臺對數(shù)據(jù)進(jìn)行統(tǒng)一管理。而此平臺的數(shù)據(jù)源主要分為兩類,一類是實時數(shù)據(jù),另一類是工業(yè)批數(shù)據(jù),為了方便管理使用,此平臺建立一個數(shù)據(jù)池來存儲數(shù)據(jù)。實時數(shù)據(jù)存放至InfluxDB數(shù)據(jù)庫中,設(shè)備狀態(tài)、設(shè)備離線事件、設(shè)備事件等信息,靜態(tài)系統(tǒng)數(shù)據(jù)存放至MySQL數(shù)據(jù)庫中,如:企業(yè)設(shè)備、企業(yè)名稱、地址等信息。
數(shù)據(jù)存儲的過程如下:首先平臺先判斷獲取到的數(shù)據(jù)的類型,若是工業(yè)批數(shù)據(jù)會利用SQL-query去取出數(shù)據(jù)連接的URL、用戶名和密碼,然后加載SQL-JDBC去連接實例,并執(zhí)行查詢;若是工業(yè)實時數(shù)據(jù)會先加載NoSQL-query,然后讀取NoSQL連接類,讀取InfluxDB數(shù)據(jù)庫自帶配置文件,從而連接實例,并執(zhí)行查詢。
無論是聯(lián)機事務(wù)處理(OLTP),還是聯(lián)機分析處理(OLAP),都是為了用戶更好地更直觀地獲取到處理到的數(shù)據(jù)結(jié)果,因此考慮一個與用戶交互性好的前端工具是十分必要的。
本平臺采用開源的Grafana作為可視化分析工具,它不僅支持多種數(shù)據(jù)庫,如IoTDB、MySQL、InfluxDB等,還支持多種數(shù)據(jù)的展示方式,如折線圖、圖表等,以更直觀的形式顯示出數(shù)據(jù),用戶按照各自需求可快速獲取到數(shù)據(jù)且不需要關(guān)心后臺的具體運行過程。同時可以對工業(yè)設(shè)備進(jìn)行預(yù)警,它通過Slack、DingDing、Email等方式通知企業(yè)數(shù)據(jù)已達(dá)到闕值,從而實現(xiàn)設(shè)備數(shù)據(jù)的準(zhǔn)確預(yù)警。
首先,數(shù)據(jù)源模塊可采用Flume收集工業(yè)生產(chǎn)過程中產(chǎn)生的日志,或直接從傳感器中獲取到數(shù)據(jù),并由專門人員將這些數(shù)據(jù)整理為Excel格式。其次,利用Flink將整理好的數(shù)據(jù)發(fā)送至Kafka消息隊列中,保證數(shù)據(jù)傳輸?shù)陌踩院晚樞蛐?。然后利用Flink獲取到暫存至Kafka中的數(shù)據(jù)并對其進(jìn)行預(yù)處理,去除重復(fù)數(shù)據(jù)、缺失率較高的數(shù)據(jù)、無效數(shù)據(jù)等,處理好后將其存儲至不同的數(shù)據(jù)庫中,批數(shù)據(jù)存儲至MySQL中,流數(shù)據(jù)存儲至InfluxDB中。而Flink貫穿整個運行過程,對于MySQL中的數(shù)據(jù)可采用DataSet API,InfluxDB中的數(shù)據(jù)采用DataStream API??梢暬治瞿K使用Grafana組件,實現(xiàn)不同類型數(shù)據(jù)的增刪改查,同時也可以對企業(yè)數(shù)據(jù)進(jìn)行監(jiān)測,若大于預(yù)定的值可通過郵件的形式進(jìn)行預(yù)警。
此平臺集群的硬件環(huán)境包含3臺物理機,一個為主節(jié)點,其余兩個為子節(jié)點,其域名分別為Master、Slave1、Slave2,3臺機器均使用8 G內(nèi)存以及1 T的硬盤,使用的操作系統(tǒng)為Centos6.4 64位。Flink集群選擇1.9.3版本。Flink的master進(jìn)程 jobManager放在Slave1中。修改好的配置文件放置在其它節(jié)點,并在Slave2的Flink_HOME/conf/slaves目錄下添加 Master、Slave1、Slvae2,這樣可以通過主節(jié)點免密登錄啟動其它的副節(jié)點啟動。Kafka應(yīng)注意與Zookeeper 版本之間的兼容性,所以此平臺選擇了Kafka 2.2.0和Zookeeper3.4.10。前端工具Grafana選擇版本Grafana-6.7.2,此平臺選用的數(shù)據(jù)庫為MySQL-5.5和InfluxDB-1.7.3。
本文的實驗數(shù)據(jù)來自經(jīng)過數(shù)據(jù)脫敏后的3000家企業(yè)基本信息,10 000余臺設(shè)備連續(xù)3個月的運行數(shù)據(jù),脫敏簡要過程如下:利用Java代碼定義數(shù)據(jù)脫敏的工具類,涉及到具體公司名時,用*替代。同時企業(yè)id、設(shè)備id、事件id值重新編號,從而保護(hù)數(shù)據(jù)的安全性。本實驗的采用的數(shù)據(jù)主要分為兩種類型,一種是“企業(yè)名單”、“企業(yè)設(shè)備信息”、“測點名稱”等批數(shù)據(jù),此類數(shù)據(jù)量為87 000條,描述的是3000家企業(yè)的一些基本信息,如公司名、所處地區(qū)等;另一種是“設(shè)備實時事件統(tǒng)計”、“設(shè)備狀態(tài)時長統(tǒng)計”等實時數(shù)據(jù),此類數(shù)據(jù)量較大為1 250 000條,描述的是設(shè)備的實時狀態(tài)信息,如在某個時間段企業(yè)的狀態(tài)等。批數(shù)據(jù)以“企業(yè)名單”為例,包括5個字段值:企業(yè)id、客戶名稱、地址名稱、省市區(qū)、公司名,部分?jǐn)?shù)據(jù)見表1。(注:表中只展示了部分?jǐn)?shù)據(jù)集中的某個表,并不是全部數(shù)據(jù))。
表1 企業(yè)名單
實時數(shù)據(jù)以“設(shè)備實時事件”數(shù)據(jù)為例,包括4個字段值:事件id、發(fā)生時間、設(shè)備id、事件,部分?jǐn)?shù)據(jù)見表2。
表2 設(shè)備實時事件
為了驗證基于Flink的大數(shù)據(jù)平臺的有效性,對此平臺的各個模塊進(jìn)行了測試。
系統(tǒng)實現(xiàn)具體過程如下:首先將數(shù)據(jù)源中的批數(shù)據(jù)和實時數(shù)據(jù)導(dǎo)入到Kafka消息隊列中。批數(shù)據(jù)的數(shù)據(jù)量較小,導(dǎo)入時間較短,耗時3 min 10 s成功將“企業(yè)名單”的信息發(fā)送至Kafka中。而“設(shè)備實時事件統(tǒng)計”實時數(shù)據(jù)量較大,耗時較長,耗時17 min成功將實時數(shù)據(jù)導(dǎo)入到Kafka中。然后利用Flink讀取Kafka數(shù)據(jù)并進(jìn)行預(yù)處理后寫入到MySQL與InfluxDB中,預(yù)處理后的重復(fù)數(shù)據(jù)篩選結(jié)果見表3(以實時數(shù)據(jù)“設(shè)備實時事件”為例)。
表3 重復(fù)數(shù)據(jù)篩選結(jié)果
無效數(shù)據(jù)篩選結(jié)果見表4。
表4 無效數(shù)據(jù)篩選結(jié)果
空數(shù)據(jù)篩選結(jié)果見表5。
表5 空數(shù)據(jù)篩選結(jié)果
數(shù)據(jù)篩選后,開發(fā)人員可利用Navicat和InfluxDBStudio可視化工具查看數(shù)據(jù),用戶顯示界面如圖4所示。
圖4 用戶界面
用戶在瀏覽器中輸入localhost:3000進(jìn)入此平臺,首先填寫數(shù)據(jù)庫的用戶名以及密碼,創(chuàng)建用戶需要的數(shù)據(jù)庫,其次選擇數(shù)據(jù)庫顯示的儀表形式,有折線圖、表格、文本等形式。例如:用戶想查詢MySQL數(shù)據(jù)庫中的某個特定條件的批數(shù)據(jù)并以表格的形式輸出,用戶可在系統(tǒng)界面選擇Table并輸入:SELECT * FROM ′company_list′ WHERE 區(qū)=′天寧區(qū)′,便只查詢天寧區(qū)的公司名單,其實現(xiàn)效果如圖5所示。
圖5 公司名稱
同理,也可以實現(xiàn)實時數(shù)據(jù)的查詢,能快速地查詢到各個設(shè)備的實時狀態(tài)和對應(yīng)的發(fā)生時間,如圖6所示。
圖6 設(shè)備實時狀態(tài)表
同時系統(tǒng)界面也提供edit的方式,用戶只需要選擇操作的數(shù)據(jù)庫和限定條件,也可輕松查詢到數(shù)據(jù)。不僅如此,用戶也可以利用此平臺篩選出自己所需要的數(shù)據(jù),如查詢到相同設(shè)備號id的機器、同一時間內(nèi)機器的上線數(shù)量、統(tǒng)計一段時間內(nèi)出故障機器的數(shù)量、顯示預(yù)警的極值和結(jié)束時間等。
Grafana可以無縫定義告警在數(shù)據(jù)中的位置,可視化的定義閾值,并可以通過釘釘、E-mail等平臺獲取告警通知。這里我們選用E-mail的形式來關(guān)注實時設(shè)備狀態(tài)并獲得告警通知。首先在啟動Grafana前配置/etc/grafana/grafana.ini開啟smtp服務(wù),配置發(fā)送郵件的郵箱以及密碼。配置好后,通過Grafana 的Alerting功能設(shè)置發(fā)送郵件的間隔時間,實現(xiàn)對設(shè)備數(shù)據(jù)的預(yù)警(此平臺判斷機器是否出現(xiàn)故障的方式有兩種:①由于機器是24小時運作的,所以機器會一直呈現(xiàn)在線狀態(tài),若機器離線時間過長則會判定為出故障;②平臺會每隔一定時間發(fā)送機器上下線的數(shù)量給用戶,若下線的機器數(shù)量過多,則判定有機器出現(xiàn)故障),其告警如圖7所示。
圖7 告警
在文獻(xiàn)[11]中,作者介紹了當(dāng)前較為典型的Clou-dera大數(shù)據(jù)平臺,其以Hadoop技術(shù)架構(gòu)為基礎(chǔ),具有穩(wěn)定的、可擴展的企業(yè)級大數(shù)據(jù)管理平臺,它提供了很多部署案例,能夠方便管理企業(yè)生產(chǎn)過程中的多種數(shù)據(jù),且具有強大的管理和監(jiān)控工具。其中Cloudera Manager是開源的方便使用的一款產(chǎn)品,它提供Web用戶界面使得企業(yè)進(jìn)行數(shù)據(jù)管理時更加容易。而Shark[12]也是一個相對較新的開源工業(yè)大數(shù)據(jù)分析平臺,它是Spark的一個組件,可安裝在與Hadoop相同的集群上,是一個性能較好的分布式和容錯內(nèi)存分析系統(tǒng),它具有數(shù)據(jù)聯(lián)合分區(qū),容錯以及機器學(xué)習(xí)的能力,且完全兼容Hive和HiveQL,也能支持多種數(shù)據(jù)庫數(shù)據(jù)的查詢。
本平臺采用了Flink框架來構(gòu)建工業(yè)大數(shù)據(jù)平臺。首先比較Flink平臺和文獻(xiàn)[11,12]二者平臺基礎(chǔ)框架的技術(shù)特點:Spark和Flink都是運行在YARN上的,但Flink的性能是優(yōu)于Spark的,而Spark性能是大于Hadoop的,而且迭代的次數(shù)越多,F(xiàn)link 的優(yōu)勢越明顯。不僅如此Flink具有靈活的窗口,對于流數(shù)據(jù)處理起來更加方便,而工業(yè)生產(chǎn)下流數(shù)據(jù)偏多且較為復(fù)雜,因此Flink十分適用于工業(yè)場景。
其次,文獻(xiàn)[11,12]對于工業(yè)領(lǐng)域中的不同類型的數(shù)據(jù)無明確的區(qū)分,只采用單一的數(shù)據(jù)庫存儲數(shù)據(jù)。而本平臺中采用一個數(shù)據(jù)池來存儲不同數(shù)據(jù),批數(shù)據(jù)放入MySQL據(jù)庫中,流數(shù)據(jù)放入InfluxDB數(shù)據(jù)庫中,能夠更好地區(qū)分開不同類型的工業(yè)大數(shù)據(jù)。再者,本平臺利用Kafka進(jìn)行數(shù)據(jù)暫時存儲,更好地保證了數(shù)據(jù)傳輸?shù)陌踩砸约捌脚_的可擴展性。
Flink平臺和Cloudera Manager大數(shù)據(jù)平臺、Shark大數(shù)據(jù)平臺的查詢數(shù)據(jù)效率如圖8所示,行表示數(shù)據(jù)集的數(shù)量(單位個數(shù)),列表示用戶查詢數(shù)據(jù)的響應(yīng)時間(單位ms)。
圖8 數(shù)據(jù)查詢效率
從圖8中我們可以看出:當(dāng)數(shù)據(jù)集為5000條時,各個平臺的執(zhí)行效率是差不多的,基本能在幾毫秒內(nèi)響應(yīng)出來。但當(dāng)數(shù)據(jù)集數(shù)據(jù)變多時,Shark平臺和Cloudera Manager平臺數(shù)據(jù)查詢時間明顯上升,執(zhí)行效率變低,而Flink平臺在處理將近60 000條數(shù)據(jù)集時也能快速響應(yīng)。
3個平臺的吞吐量方面也進(jìn)行了比較(吞吐量即單位時間內(nèi)平臺成功傳送數(shù)據(jù)的數(shù)量),比較結(jié)果如圖9所示,本次測試吞吐量的單位為:條/s。
圖9 吞吐量比較
從圖9中可以看到,當(dāng)Kafka Data的Partition為1 時,此平臺的吞吐量是Cloudera Manager大數(shù)據(jù)平臺的3.2倍,是Shark平臺的將近1倍,而當(dāng)Partition數(shù)為8時,此平臺吞吐量為Shark平臺的將近1倍,是Cloudera Manager大數(shù)據(jù)平臺的4.6倍。總之Flink平臺的吞吐量是遠(yuǎn)遠(yuǎn)高于其它兩個平臺的,而吞吐量又極大地反應(yīng)了系統(tǒng)的負(fù)載能力。在工業(yè)大數(shù)據(jù)量大的情況下,F(xiàn)link平臺能夠更好地運作。
當(dāng)數(shù)據(jù)量變大時,延遲低也是一個企業(yè)需要考慮的地方,因此比較了3個平臺的延遲性。延遲性即數(shù)據(jù)從進(jìn)入系統(tǒng)到流出系統(tǒng)所用的時間,本次測試延遲的單位為:ms。其實驗結(jié)果如圖10所示。
圖10 延遲比較
從圖10可以看到,F(xiàn)link平臺的延遲較低,即使面對200 000條的數(shù)據(jù)量,平臺也只具有21 ms的延遲,而Shark平臺的延遲幾乎是Flink平臺的2倍,而Cloudera Manager平臺是Shark平臺的兩倍,因此Flink平臺在延遲上也有較大的優(yōu)勢。
同時,在平臺預(yù)警速度方面做了個對比:選用5000條實時數(shù)據(jù)在不同的平臺上運行,比較不同平臺進(jìn)行預(yù)警并發(fā)送郵件至用戶的時間。其實驗結(jié)果如圖11所示。
圖11 預(yù)警時間比較
從圖11中我們可以看出:5000條實時數(shù)據(jù)在此平臺進(jìn)行預(yù)警并發(fā)送郵件的速度是最快的,需要20 ms,而在Shark平臺和Cloudera Manager大數(shù)據(jù)平臺分別需要27 ms和35 ms,此工業(yè)大數(shù)據(jù)平臺預(yù)警時間更短,能夠最大地減少企業(yè)的損失。
針對工業(yè)大數(shù)據(jù)數(shù)據(jù)量大、異構(gòu)性強、及時性強的特點,引入大數(shù)據(jù)技術(shù),提出了Flink和Kafka集成的工業(yè)大數(shù)據(jù)平臺,此平臺通過集群環(huán)境能夠高效地查詢數(shù)據(jù),并能進(jìn)行設(shè)備數(shù)據(jù)的快速預(yù)警。與目前較為典型的兩款開源大數(shù)據(jù)平臺進(jìn)行比較,實驗結(jié)果表明,此平臺在數(shù)據(jù)查詢效率、吞吐量、延遲性以及預(yù)警速度方面都是優(yōu)于其它兩個典型的大數(shù)據(jù)平臺的,能夠滿足預(yù)計的設(shè)計目標(biāo)。不僅如此,此平臺不僅適用于工業(yè)領(lǐng)域,而且適用于所有時間序列數(shù)據(jù)多的場景,因此基于Flink的工業(yè)大數(shù)據(jù)平臺的研究是具有實際意義的。
在今后的工作中,還需完善此平臺的其它功能,如云平臺數(shù)據(jù)分析故障預(yù)測等。其次數(shù)據(jù)源部分的數(shù)據(jù)都是整理好的,而此平臺中并未過多介紹如何獲取數(shù)據(jù)源,因此今后還需多學(xué)習(xí)物聯(lián)網(wǎng)的知識。在企業(yè)生產(chǎn)過程中,安全性是重中之重的,雖然在傳輸過程中使用消息隊列保證數(shù)據(jù)傳輸?shù)陌踩?,但其它模塊產(chǎn)生的數(shù)據(jù)實際是不夠安全的,所以如何保障保證數(shù)據(jù)處理的安全性[13],這也是本平臺未來需要考慮的地方。