袁海飛
(徐州徐工挖掘機(jī)械有限公司信息化管理部 江蘇省徐州市 221000)
工業(yè)數(shù)據(jù)的存儲管理是工業(yè)信息化應(yīng)用、推進(jìn)智能制造的前提和基礎(chǔ)[1],然而海量的生產(chǎn)設(shè)備聯(lián)網(wǎng)數(shù)據(jù)、物聯(lián)網(wǎng)數(shù)據(jù)、實(shí)時監(jiān)控數(shù)據(jù)為工業(yè)數(shù)據(jù)的存儲管理帶來了效率問題,同時也對設(shè)備監(jiān)控數(shù)據(jù)存儲計算的實(shí)時性、高效性提出了更高的要求[2]。
在數(shù)據(jù)存儲方面,現(xiàn)有的設(shè)備監(jiān)控系統(tǒng)[3-4]往往將數(shù)據(jù)存儲在如MySQL等結(jié)構(gòu)化數(shù)據(jù)庫中,頻繁的讀寫請求給數(shù)據(jù)庫帶來了極大壓力。有的應(yīng)用研究[4]采用“一主多從、讀寫分離”的存儲架構(gòu),通過一個主數(shù)據(jù)庫接收寫請求,多個從數(shù)據(jù)庫同步數(shù)據(jù)并處理讀請求,這種方式緩解了數(shù)據(jù)庫的壓力,但仍存在復(fù)雜表需要垂直切分或水平切分的情況,在數(shù)據(jù)分析或查詢過程中表現(xiàn)欠佳。
大數(shù)據(jù)平臺Hadoop的出現(xiàn)為設(shè)備聯(lián)網(wǎng)監(jiān)控數(shù)據(jù)管理提供了新的方向[5-8],Hadoop平臺中的存儲組件HBase是一種分布式的列式數(shù)據(jù)庫,針對設(shè)備監(jiān)控數(shù)據(jù)存儲具有結(jié)構(gòu)靈活、可拓展、存儲效率高等優(yōu)勢,被廣泛地應(yīng)用在數(shù)據(jù)存儲和分析過程中[5-6]。而基于HBase存儲的時序數(shù)據(jù)庫OpenTSDB支持以毫秒精度、每秒數(shù)百萬次寫入,在設(shè)備監(jiān)控數(shù)據(jù)方面具有良好的表現(xiàn)[8]。
然而,不同于普通應(yīng)用系統(tǒng),設(shè)備監(jiān)控系統(tǒng)因其特殊的應(yīng)用場景會產(chǎn)生大量的實(shí)時數(shù)據(jù)[2],如設(shè)備、儀表參數(shù)、定位、指標(biāo)等。這些實(shí)時增量不斷增長的時序數(shù)據(jù)為數(shù)據(jù)存儲的可擴(kuò)展性提出了要求。此外,在數(shù)萬臺機(jī)器毫秒級監(jiān)控的場景中,服務(wù)器每秒需要處理GB級的數(shù)據(jù),傳統(tǒng)通過負(fù)載均衡進(jìn)行實(shí)時計算的處理方式已經(jīng)達(dá)到瓶頸。
為了解決以上問題,本文提出了一種基于分布式實(shí)時計算架構(gòu)的生產(chǎn)設(shè)備數(shù)據(jù)分析平臺。首先,基于Hadoop平臺構(gòu)建數(shù)據(jù)存儲計算集群;然后,利用OpenTSDB對設(shè)備實(shí)時監(jiān)測數(shù)據(jù)進(jìn)行時序化存儲管理;最后,基于Kafka和Flink對實(shí)時設(shè)備監(jiān)測數(shù)據(jù)進(jìn)行實(shí)時計算分析,實(shí)現(xiàn)高并發(fā)設(shè)備監(jiān)控場景下的低延遲響應(yīng)。
OpenTSDB[8]是一個可擴(kuò)展時間序列數(shù)據(jù)庫,能夠在不丟失粒度的情況下存儲和提供大量時間序列數(shù)據(jù)。OpenTSDB可以基于Hadoop平臺運(yùn)行,支持以毫秒精度、每秒數(shù)百萬次寫入,通過添加節(jié)點(diǎn)來增加數(shù)據(jù)庫容量。OpenTSDB的底層由HBase進(jìn)行存儲,由此可以提供高可用、可擴(kuò)展、高效查詢等數(shù)據(jù)操作。OpenTSDB還提供GUI系統(tǒng),可以根據(jù)用戶篩選條件展示數(shù)據(jù)直方圖。同時,OpenTSDB采用HTTP API的形式提供數(shù)據(jù)接口,方便異構(gòu)系統(tǒng)的數(shù)據(jù)通信以及如Grafana等數(shù)據(jù)可視化組件進(jìn)行數(shù)據(jù)監(jiān)控展示。
Apache Flink[9]是一個框架和分布式處理引擎,用于對無界和有界數(shù)據(jù)流進(jìn)行有狀態(tài)計算。Flink基于Flink流式執(zhí)行模型(Streaming execution model),能夠支持流處理和批處理兩種應(yīng)用類型。流處理和批處理所提供的服務(wù)等級協(xié)議完全不相同,流處理一般需要支持低延遲、Exactly-once保證,而批處理需要支持高吞吐、高效處理,所以在實(shí)現(xiàn)的時候通常是分別給出兩套實(shí)現(xiàn)方法,或者通過一個獨(dú)立的開源框架來實(shí)現(xiàn)其中每一種處理方案。如:實(shí)現(xiàn)批處理的開源方案MapReduce、Spark;實(shí)現(xiàn)流處理的開源方案Storm;微批處理方案Spark Streaming。與傳統(tǒng)方案不同,F(xiàn)link在實(shí)現(xiàn)流處理和批處理時,將二者統(tǒng)一起來,把批處理被作為一種特殊的流處理,即有界的數(shù)據(jù)流。這種批、流一體的架構(gòu)使得Flink在執(zhí)行實(shí)時數(shù)據(jù)計算時具有極低的延遲。
基于分布式實(shí)時計算架構(gòu)的生產(chǎn)設(shè)備數(shù)據(jù)分析平臺總體架構(gòu)共包括:數(shù)據(jù)采集層、數(shù)據(jù)存儲層、資源管理層、數(shù)據(jù)計算層和應(yīng)用層。如圖1所示為系統(tǒng)架構(gòu)圖。
系統(tǒng)的數(shù)據(jù)源主要包括:設(shè)備實(shí)時監(jiān)測上傳數(shù)據(jù)、用戶操作日志流數(shù)據(jù)及通過API調(diào)用的相關(guān)業(yè)務(wù)數(shù)據(jù)。此外,還可以通過ETL導(dǎo)入數(shù)據(jù)作為系統(tǒng)的數(shù)據(jù)源。
系統(tǒng)的數(shù)據(jù)存儲根據(jù)數(shù)據(jù)類型和應(yīng)用場景分為基礎(chǔ)業(yè)務(wù)庫、時序數(shù)據(jù)庫和內(nèi)存數(shù)據(jù)庫。基礎(chǔ)業(yè)務(wù)庫通過MySQL存放系統(tǒng)業(yè)務(wù)結(jié)構(gòu)化信息,時序數(shù)據(jù)庫存儲生產(chǎn)設(shè)備監(jiān)控數(shù)據(jù)。此外,服務(wù)器將經(jīng)常訪問的數(shù)據(jù)緩存在內(nèi)存數(shù)據(jù)庫Redis中,從而提高訪問速度和計算效率。
系統(tǒng)通過由Yarn進(jìn)行資源管理,負(fù)責(zé)在有數(shù)據(jù)計算請求時根據(jù)集群狀況分配計算資源和計算節(jié)點(diǎn),從而提供MapReduce、Spark、Flink等組件的計算環(huán)境。
系統(tǒng)對于并發(fā)產(chǎn)生的設(shè)備監(jiān)測數(shù)據(jù)、訂單業(yè)務(wù)數(shù)據(jù)等,通過Flink進(jìn)行實(shí)時計算,實(shí)現(xiàn)異常判斷、實(shí)時統(tǒng)計和數(shù)據(jù)存儲寫入功能,并由Phoenix進(jìn)行HBase中數(shù)據(jù)的結(jié)構(gòu)化查詢計算。
系統(tǒng)通過Web的形式提供用戶交互界面,實(shí)現(xiàn)對設(shè)備實(shí)時監(jiān)測、歷史狀態(tài)查詢、統(tǒng)計報表、設(shè)備管理、系統(tǒng)管理和日志管理等。
圖1:系統(tǒng)總體架構(gòu)圖
時序數(shù)據(jù)庫OpenTSDB的存儲管理沒有提供Java的SDK進(jìn)行方法調(diào)用,而實(shí)基于HTTP請求的方式進(jìn)行操作。系統(tǒng)將設(shè)備實(shí)時監(jiān)測時序化數(shù)據(jù)存儲在OpenTSDB中,其主要存儲結(jié)構(gòu)設(shè)計如下。
OpenTSDB底層由HBase負(fù)責(zé)按照時間戳作為Rowkey進(jìn)行存儲,每條數(shù)據(jù)記錄包括監(jiān)控項(xiàng)名稱metric、時間戳(秒/毫秒級)、監(jiān)控項(xiàng)值和監(jiān)控項(xiàng)相關(guān)信息(tags)。OpenTSDB是面向列式存儲的數(shù)據(jù)庫,其監(jiān)控項(xiàng)信息tags可記錄多條也可不記錄,這些tags可供用戶進(jìn)行數(shù)據(jù)后期查詢和篩選使用。
對高并發(fā)產(chǎn)生的設(shè)備監(jiān)控數(shù)據(jù),系統(tǒng)通過基于Kafka和Flink進(jìn)行設(shè)備數(shù)據(jù)的實(shí)時計算處理,實(shí)現(xiàn)異常判斷、實(shí)時統(tǒng)計、數(shù)據(jù)寫入等操作,具體步驟如下:
(1)系統(tǒng)基于消息隊列Kafka對設(shè)備數(shù)據(jù)進(jìn)行傳輸,達(dá)到高并發(fā)、低延遲和消峰的作用。首先通過命令行創(chuàng)建Kafka消息訂閱的topic,表示一條設(shè)備監(jiān)控的數(shù)據(jù)記錄,設(shè)計topic名為“devinfo”,定義副本數(shù)2個,分區(qū)數(shù)9個。多副本的設(shè)計提高了Kafka的可用性,多分區(qū)的設(shè)計使得消費(fèi)者能夠?qū)Χ鄠€分區(qū)同時處理,實(shí)現(xiàn)數(shù)據(jù)負(fù)載均衡,提高實(shí)時處理效率。
bin/kafka-topics.sh --zookeeper node01:2181 --create --replicationfactor 3 --partitions 9 --topic devinfo
(2)設(shè)備監(jiān)控數(shù)據(jù)匯聚在設(shè)備通信網(wǎng)關(guān),由網(wǎng)關(guān)進(jìn)行協(xié)議格式解析,然后構(gòu)建Kafka生產(chǎn)者KafkaProducer,將設(shè)備監(jiān)控數(shù)據(jù)發(fā)送到Kafka broker的“devinfo”這個topic下。同時,利用回調(diào)函數(shù)監(jiān)測是否發(fā)送成功,異常則觸發(fā)報警。因?yàn)镵afka多分區(qū)并行消費(fèi)的設(shè)計,同一分區(qū)下的數(shù)據(jù)會順序消費(fèi),但不同分區(qū)數(shù)據(jù)的處理會產(chǎn)生數(shù)據(jù)錯亂的情況。因此,網(wǎng)關(guān)生產(chǎn)者根據(jù)消息設(shè)備id作為分區(qū)的key進(jìn)行分區(qū),將同一設(shè)備數(shù)據(jù)分到同一個Kafka分區(qū)中,從而保證同一個設(shè)備的消息順序性。
表1:分布式節(jié)點(diǎn)配置情況
表2:OpenTSDB寫入性能測試
圖2:OpenTSDB時序數(shù)據(jù)查詢
(3)基于實(shí)時計算引擎Flink創(chuàng)建Kafka的消費(fèi)者線程池,實(shí)時處理設(shè)備數(shù)據(jù)記錄并將其寫入OpenTSDB。
首先,配置Flink流式處理環(huán)境,設(shè)置Flink定期執(zhí)行CheckPoint將數(shù)據(jù)持久化到內(nèi)存中,設(shè)置周期設(shè)為1s,設(shè)置檢查點(diǎn)模式為Exactly-once,即有且僅有一次。同時,若執(zhí)行CheckPoint時間超過60s,則丟棄該檢查點(diǎn)。
其次,通過Kafka Flink Connector API實(shí)現(xiàn)Flink消費(fèi)Kafka處理,配置Kafka的相關(guān)信息,如:Zookeeper的集群、Kafka的broker集群以及Kafka消息者組。
然后,配置Kafka消息的
最后,添加配置的數(shù)據(jù)源作為Flink流式環(huán)境的source,執(zhí)行DataStream流的map過程對每一條消息進(jìn)行實(shí)時處理,如:異常判斷、實(shí)時統(tǒng)計等,最后分別調(diào)用OpenTSDB API執(zhí)行數(shù)據(jù)寫入操作。
服務(wù)器端首先搭建部署Hadoop集群,包括HDFS、Yarn、Zookeeper、Kafka、HBase等組件的部署、監(jiān)控管理。此外,還部署了Flink計算組件、MySQL主備節(jié)點(diǎn)等。Hadoop集群應(yīng)用環(huán)境選用1個主節(jié)點(diǎn)和3個計算節(jié)點(diǎn),各節(jié)點(diǎn)配置情況如表1所示。
在性能測試方面針對OpenTSDB的數(shù)據(jù)寫入性能進(jìn)行實(shí)驗(yàn)。實(shí)驗(yàn)使用多線程寫入的方式模擬網(wǎng)關(guān)節(jié)點(diǎn),并通過Flink計算寫入數(shù)據(jù)庫,模擬100臺設(shè)備,每臺設(shè)備監(jiān)測數(shù)據(jù)為50個字段,其中30個字段為String型格式,20個為Double型格式,采樣頻率為每秒一次,網(wǎng)關(guān)節(jié)點(diǎn)每10s一次性寫入數(shù)據(jù)庫。持續(xù)模擬1個小時,共計60*60/10*50*100=180萬個數(shù)據(jù)點(diǎn),記錄平均每批次(10s)數(shù)據(jù)寫入平均耗時并計算吞吐量如表2所示。
OpenTSDB提供了一個Web應(yīng)用實(shí)現(xiàn)數(shù)據(jù)的簡單查詢和篩選,用戶可以根據(jù)監(jiān)控項(xiàng)名稱和查詢時間范圍獲取到對應(yīng)監(jiān)控值的直方圖,通過監(jiān)控項(xiàng)的tag可以對結(jié)果進(jìn)行篩選。如圖2所示為監(jiān)控設(shè)備PLC的電流變化曲線圖。然而,此界面所查詢的數(shù)據(jù)為秒級,對于毫秒級的時序數(shù)據(jù)雖然已經(jīng)存儲在數(shù)據(jù)庫中,但仍需要通過Http API的方式進(jìn)行查詢。
本文設(shè)計并實(shí)現(xiàn)基于分布式實(shí)時計算架構(gòu)的生產(chǎn)設(shè)備數(shù)據(jù)分析平臺,對實(shí)時監(jiān)控數(shù)據(jù)庫的存儲管理和實(shí)時分析計算進(jìn)行優(yōu)化。首先,構(gòu)建基于Hadoop的數(shù)據(jù)集群平臺;然后,基于OpenTSDB實(shí)現(xiàn)對設(shè)備監(jiān)控數(shù)據(jù)進(jìn)行分布式、可擴(kuò)展的時間序列數(shù)據(jù)管理;最后,基于Kafka和Flink對實(shí)時設(shè)備監(jiān)測數(shù)據(jù)進(jìn)行實(shí)時計算分析,實(shí)現(xiàn)高并發(fā)設(shè)備監(jiān)控場景下的低延遲響應(yīng)。實(shí)驗(yàn)對OpenTSDB的寫入性能進(jìn)行測試,結(jié)果表明本系統(tǒng)能夠有效進(jìn)行設(shè)備監(jiān)控系統(tǒng)的數(shù)據(jù)存儲和實(shí)時計算過程。