摘要:隨著互聯網技術在工業(yè)生產中廣泛應用,工業(yè)互聯網的發(fā)展突飛猛進。在工業(yè)生產中,企業(yè)為了對工業(yè)大數據進行更好的采集、分析和預處理,利用大數據技術搭建大數據集群來完成各個生產環(huán)節(jié)?;贖adoop的高可用分布式框架已經成為很多企業(yè)在集群搭建中的首選。文章在基于高可用Hadoop組件基礎上,搭建了Hive、HBase、Spark、Flink、Kafka等大數據生態(tài)系統中一些重要組件,用于對數據的存儲、采集、抽取、清洗、預處理和分析等操作,幫助企業(yè)在生產過程中完善生產環(huán)節(jié),提高生產效率。
關鍵詞:工業(yè)大數據;Hadoop集群搭建;數據處理
中圖分類號:TP311" 文獻標志碼:A
基金項目:2023年保定市科技計劃項目;項目名稱:基于高可用集群和隨機森林算法的工業(yè)大數據分析平臺;項目編號:2311ZG018。
作者簡介:張艷敏(1985— ),女,講師,碩士;研究方向:大數據技術,軟件技術。
0" 引言
在工業(yè)生產過程中,各個生產環(huán)節(jié)產生的數據越來越多,這些數據大多是非結構化數據,傳統的關系型數據庫已無法滿足對這些數據的存儲與處理,因此,文章利用大數據技術原理搭建大數據高可用集群來實現工業(yè)大數據的采集與存儲等操作,集群中包含大數據生態(tài)系統中一些常用的組件。在已搭建的集群中通過Spark技術實現對離線數據的抽取、清洗和預處理,利用Flink技術對實時數據進行分析與存儲。整個生產過程在大數據集群環(huán)境中運轉流暢,最終達到了為企業(yè)節(jié)約成本、創(chuàng)造更多有益價值的目的。
1" 系統整體設計
根據企業(yè)實際生產場景,本文搭建了基于Hadoop HA高可用的集群[1],集群中包含Hive、HBase、Spark、Flink、Redis和MySQL等組件,實現對工業(yè)生產中設備信息(machine.csv)、設備狀態(tài)信息(showFactChangeRecordList.csv)、環(huán)境檢測信息(showFactEnvironmentData.csv)和產品加工信息(showFactProduceRecord.csv)的采集和處理。其中HDFS、Hive和HBase等組件用來存儲數據,Spark用來對離線數據進行抽取、清洗和預處理,Flink主要對實時生產數據進行計算和分析后存儲到Redis或MySQL數據庫中[2]。集群整體結構如圖1所示。
2" 離線數據處理
離線數據處理是利用Spark技術對已經存儲在數據庫中的數據進行預處理,一般用Scala語言編寫,通常包括數據抽取、清洗和指標計算等操作[3]。
2.1" 數據抽取
數據抽取包含全量抽取和增量抽?。?]。全量抽取是將源數據庫中的所有數據抽取到目標數據庫中,增量抽取是將自上次抽取后發(fā)生改變的數據從源數據庫抽取到目標數據庫中。
2.1.1" 全量抽取
在生產過程中,研究人員通常會將數據從MySQL中抽取到Hive中,方便數據更高效的處理。MySQL中包括數據庫shtd_industry,抽取shtd_industry庫中ChangeRecord表的全量數據進入Hive的ods庫,構成表changerecord,字段排序、類型不變;同時添加靜態(tài)分區(qū),分區(qū)字段為etldate,類型為String,值為當前日期(如20230702)。ChangeRecord的表結構如表1所示。
抽取操作執(zhí)行完畢后,系統可以通過hive cli執(zhí)行show partitions ods.changerecord命令查看分區(qū)結果,如圖2所示。
2.1.2" 增量抽取
在企業(yè)生產過程中,有些數據只保留最新數據,例如環(huán)境監(jiān)測表EnvironmentData(表結構如表2所示)中只保留每臺設備的最新監(jiān)測數據,在數據抽取時抽取MySQL中shtd_industry庫的EnvironmentData表的增量數據;將其輸入Hive的ods庫中構成表environmentdata,將ods.environmentdata表中inputtime作為增量字段,僅將新增的數據抽入,字段排序、類型不變;同時添加靜態(tài)分區(qū),分區(qū)字段為etldate,類型為String,值為當前日期(如20230702)。
2.2" 數據清洗
在生產過程中,系統會產生大量的“臟”數據,數據清洗就是去除這些“臟”數據,通過篩選、過濾等操作使數據變得更加干凈和準確[4]。數據清洗是數據處理過程中非常重要的一步,可以提高數據的質量和可信度,為后續(xù)的數據處理工作提供更有效安全的數據,例如對數據進行去重整合等操作。
在數據抽取中,系統將MySQL中數據抽取到Hive的ods庫中,在數據清洗中將ods庫中的changerecord全量數據抽取到dwd庫表fact_change_record中,在抽取之前須要對數據根據changeid和changemachineid進行聯合去重處理,并且添加dwd_insert_user、dwd_insert_time、dwd_modify_user、dwd_modify_time這4列。其中dwd_insert_user、dwd_modify_user均填寫“user1”;dwd_insert_time、dwd_modify_time均填寫當前操作時間。執(zhí)行完畢后,系統使用hive cli按照change_machine_id、change_id降序排序,查詢前1條數據,如圖3所示。
2.3" 指標計算
系統將清洗后的數據存入dwd庫,進入指標計算。指標計算是企業(yè)根據業(yè)務需求對數據進行的針對性查詢[5],通常是將多個數據源的數據集成到一個統一的數據存儲庫中。例如:系統使用Spark根據dwd層的fact_change_record表和dim_machine表統計,計算每個車間設備的月平均運行時長與所有設備的月平均運行時長對比結果(即設備狀態(tài)為“運行”,結果值為:高/低/相同),計算結果存入MySQL數據庫shtd_industry的machine_running_compare表中。dim_machine表、machine_running_compare表結構分別如表3和4所示。
3" 實時數據采集與處理
在工業(yè)生產過程中,系統除了對離線數據進行處理,還須要對實時數據進行處理。實時數據通過Flume采集后存儲在Kafka消息隊列中,再通過Flink讀取Kafka中的流數據,對數據進行實時處理與分析,將結果存儲到數據庫中。
3.1" 實時數據采集
在主節(jié)點中,系統使用Flume采集/data_log目錄下實時日志文件中的數據,將數據存入Kafka的Topic中(Topic名稱為ChangeRecord,分區(qū)數為4)[6],Flume采集ChangeRecord主題的配置如圖4所示。
3.2" 實時數據處理
當實時數據采集完畢后,系統使用Flink消費Kafka中ChangeRecord主題的數據[7],例如每隔1 min輸出最近3 min的預警次數最多的設備,將結果存入Redis,key值為“warning_last3min_everymin_out”,value值為“窗口結束時間,設備id”。本文使用redis cli以HGETALL key方式獲取warning_last3min_everymin_out值,如圖5所示。
4" 結語
文章介紹了工業(yè)大數據高可用集群搭建的整體架構,在此基礎上實現了離線數據處理以及實時數據的采集和處理。系統在離線數據處理中采用數據抽取、清洗和指標計算;在實時數據中使用Flume采集數據到Kafka中,再通過Flink技術進行計算后將結果存入Redis。整個流程來自真實的企業(yè)生產過程。本文將大數據技術應用到企業(yè)生產中,為企業(yè)生產效率提高、轉換提供了有效價值。
參考文獻
[1]劉曉莉,李滿,熊超,等.基于Hadoop搭建高可用數據倉庫的研究和實現[J].現代信息科技,2023(1):99-101.
[2]黎心怡,夏梓彤,莊嘉濠,等.基于大數據技術的實時軌道交通分析預測可視化系統的設計與實現[J].電腦知識與技術,2023(29):71-74.
[3]鄭倩倩.基于Kettle的工業(yè)數據集成與應用[D].重慶:西南大學,2023.
[4]謝文閣,佟玉軍,賈丹,等.數據清洗中重復記錄清洗算法的研究[J].軟件工程師,2015(9):61-62.
[5]何文韜.基于Spark的工業(yè)大數據能效分析平臺的設計與實現[D].大連:大連理工大學,2018.
[6]林子雨.數據采集與預處理[M].北京:人民郵電出版社,2022.
[7]林子雨,陶繼平.Flink編程基礎[M].北京:清華大學出版社,2022.
(編輯" 王雪芬)
Design and implementation of industrial big data high availability cluster construction based
on big data technology
ZHANG" Yanmin, MA" Xiaotao, YANG" Bingqian, WU" Weihong, ZHAO" Bin
(Hebei Software Institute, Baoding 071000, China)
Abstract: With the wide application of the Internet in industrial production, the development of industrial Internet is advancing rapidly. In industrial production, in order to assist enterprises in better collecting, analyzing, and preprocessing the industrial big data, it is necessary to build a big data cluster to complete various production processes using big data technology. Hadoop based highly available distributed frameworks have become the preferred choice for many enterprises in cluster construction. In the article, based on highly available Hadoop components,some important components in the big data ecosystem such as Hive, HBase,Spark, Flink,Kafka, etc." are built to store, collect, extract, clean, preprocess, and analyze data, helping enterprises improve production processes and increase production efficiency.
Key words: industrial big data; Hadoop cluster construction; data processing