馬 彬,李玉濤,許 琪
(1.江蘇省氣象信息中心,江蘇 南京 210005;2.江蘇省氣侯中心,江蘇 南京 210005)
隨著氣象觀測設(shè)備信息化程度的大幅提高,地面自動(dòng)氣象站的時(shí)空密度不斷增加,氣象自動(dòng)站已實(shí)現(xiàn)了分鐘加密觀測,產(chǎn)生的觀測數(shù)據(jù)量也呈指數(shù)級(jí)增長[1-3],具有數(shù)據(jù)種類多、數(shù)據(jù)規(guī)模大、數(shù)據(jù)實(shí)時(shí)性強(qiáng)以及價(jià)值密度低等特點(diǎn)[4]。同時(shí),自動(dòng)站也已成為監(jiān)視天氣變化、決策服務(wù)輔助支持的重要手段,為氣象預(yù)報(bào)、氣象防災(zāi)減災(zāi)、氣候預(yù)測與生態(tài)環(huán)境評(píng)估等提供十分重要的基礎(chǔ)數(shù)據(jù)支撐[5-6]。為進(jìn)一步推動(dòng)氣象事業(yè)高質(zhì)量發(fā)展,中國氣象局提出以氣象信息化推動(dòng)氣象現(xiàn)代化的發(fā)展戰(zhàn)略,而作為氣象服務(wù)最為核心業(yè)務(wù)應(yīng)用之一的自動(dòng)站數(shù)據(jù),也將面臨重大的挑戰(zhàn),對(duì)數(shù)據(jù)的實(shí)時(shí)采集處理、數(shù)據(jù)質(zhì)量、數(shù)據(jù)存儲(chǔ)及大規(guī)模查詢等要求也越來越高,需要在秒級(jí)甚至更短時(shí)間內(nèi)完成數(shù)據(jù)的全流程處理,從而提高響應(yīng)效率,發(fā)揮更重要的應(yīng)用價(jià)值。
Spark作為主流的開源分布式計(jì)算框架,具有可擴(kuò)展、高吞吐量和可容錯(cuò)等特點(diǎn)[7]。Spark Streaming則是Spark框架的實(shí)時(shí)流處理組件[8],采用了一種新的離散流處理模型,進(jìn)行計(jì)算處理時(shí),將數(shù)據(jù)流以時(shí)間片為單位進(jìn)行切割形成彈性分布式數(shù)據(jù)集RDD(Resilient Distributed Dataset),而RDD提供了共享內(nèi)存式的并行運(yùn)算,因此Spark在批處理、迭代計(jì)算、交互式查詢和流處理等多種計(jì)算模式方面具備高時(shí)效的處理能力[9]。HBase是Hadoop Database的簡稱,屬于NoSQL[10-11],是一個(gè)KeyValue類型的分布式存儲(chǔ)數(shù)據(jù)庫,具有海量存儲(chǔ)、高并發(fā)、高可靠以及可伸縮等特點(diǎn)[12],適應(yīng)于氣象數(shù)據(jù)的存儲(chǔ)管理應(yīng)用,能夠有效地解決氣象大數(shù)據(jù)的存儲(chǔ)和檢索響應(yīng)不足等問題[13]。
該文重點(diǎn)研究了基于Spark Streaming的氣象自動(dòng)站數(shù)據(jù)實(shí)時(shí)流處理與分布式存儲(chǔ)應(yīng)用技術(shù),通過對(duì)實(shí)時(shí)氣壓、溫度、降水、風(fēng)速、濕度等氣象要素的基本質(zhì)控算法設(shè)計(jì),實(shí)現(xiàn)了自動(dòng)站數(shù)據(jù)流式采集、解碼、基本質(zhì)控和入庫的分布式實(shí)時(shí)數(shù)據(jù)全流程處理功能。
目前,現(xiàn)有的氣象自動(dòng)站數(shù)據(jù)經(jīng)基層臺(tái)站設(shè)備采集后,通過寬帶網(wǎng)統(tǒng)一傳輸至省級(jí)落地入庫,通過氣象資料業(yè)務(wù)系統(tǒng)對(duì)數(shù)據(jù)進(jìn)行質(zhì)控后再共享或分發(fā)給各類業(yè)務(wù)應(yīng)用,其傳輸流程環(huán)節(jié)多,易導(dǎo)致數(shù)據(jù)處理不及時(shí)、交互響應(yīng)慢、統(tǒng)計(jì)時(shí)效差等問題。此外,由于省級(jí)部署的一些決策、服務(wù)等業(yè)務(wù)平臺(tái)開發(fā)早,大都依托于傳統(tǒng)的關(guān)系型數(shù)據(jù)庫進(jìn)行海量數(shù)據(jù)的存儲(chǔ),在大批量多并發(fā)查詢情況下,通常存在著數(shù)據(jù)檢索能力不足、檢索性能下降等問題。并且省級(jí)開發(fā)的業(yè)務(wù)系統(tǒng)大都應(yīng)用比較深入,覆蓋業(yè)務(wù)范圍廣,升級(jí)改造存在一定困難,極大地影響了氣象數(shù)據(jù)的應(yīng)用服務(wù)體驗(yàn)。因此,為滿足大規(guī)模自動(dòng)站數(shù)據(jù)能夠在秒級(jí)完成與用戶的交互響應(yīng),就對(duì)數(shù)據(jù)處理流程節(jié)點(diǎn)的集約化設(shè)計(jì)和高質(zhì)量、高可靠的數(shù)據(jù)存儲(chǔ)及檢索功能提出了更高的要求。
Spark Streaming技術(shù)的應(yīng)用研究,為實(shí)現(xiàn)高效實(shí)時(shí)數(shù)據(jù)處理提供了技術(shù)支撐[14]。開展基于數(shù)據(jù)流式采集、傳輸、質(zhì)控、存儲(chǔ)為一體的氣象自動(dòng)站數(shù)據(jù)全流程設(shè)計(jì)及應(yīng)用研究,能夠從根本上解決氣象自動(dòng)站數(shù)據(jù)落地環(huán)節(jié)多、任務(wù)處理耦合緊、處理系統(tǒng)部署分散等問題;進(jìn)一步提升氣象大數(shù)據(jù)的傳輸效率與數(shù)據(jù)質(zhì)量;改善業(yè)務(wù)平臺(tái)在多并發(fā),長序列數(shù)據(jù)檢索使用時(shí)的問題。在當(dāng)前氣象信息化任務(wù)高效推動(dòng)的背景下,對(duì)氣象數(shù)據(jù)以流式實(shí)現(xiàn)全流程的處理具有迫切的應(yīng)用研究需求。
系統(tǒng)總體架構(gòu)主要分為4層:數(shù)據(jù)層、處理層、邏輯層和應(yīng)用層,如圖1所示。
圖1 系統(tǒng)總體架構(gòu)
數(shù)據(jù)層主要分為源數(shù)據(jù)和處理后的分布式數(shù)據(jù)存儲(chǔ),源數(shù)據(jù)通過處理層相應(yīng)的Spark Streaming組件實(shí)現(xiàn)數(shù)據(jù)的實(shí)時(shí)流處理功能,F(xiàn)lume為源數(shù)據(jù)文件的解碼采集模塊,獲取源數(shù)據(jù)流后將數(shù)據(jù)暫存到Kafka消息中間件,由Spark Streaming調(diào)用Kafka中的數(shù)據(jù)做實(shí)時(shí)處理,再將所需處理的數(shù)據(jù)根據(jù)邏輯層相應(yīng)的算法及表結(jié)構(gòu)設(shè)計(jì)通過接口存儲(chǔ)到分布式數(shù)據(jù)庫中。邏輯層設(shè)計(jì)的功能主要包括溫度、壓強(qiáng)、降水等氣象自動(dòng)站數(shù)據(jù)基本要素的查詢以及檢索。應(yīng)用層分為用戶應(yīng)用和業(yè)務(wù)應(yīng)用兩類,用戶應(yīng)用指為科研人員提供數(shù)據(jù)服務(wù),主要為客戶端及科研用的虛擬化服務(wù)器等,業(yè)務(wù)應(yīng)用則指根據(jù)各業(yè)務(wù)場景的應(yīng)用需求開發(fā)的平臺(tái)和系統(tǒng)等。
2.2.1 數(shù)據(jù)實(shí)時(shí)采集技術(shù)分析
Flume[15]是一個(gè)分布式、高可靠、具備可定制化能力的日志采集傳輸系統(tǒng)[16],其數(shù)據(jù)流由事件(Event)貫穿始終,Event代表一個(gè)完整數(shù)據(jù)的最小單元,是事務(wù)的基本單位。這些Event攜帶日志數(shù)據(jù)并且?guī)в蓄^信息,由Agent外部的Source通過特定的格式化后生成,然后再發(fā)送到指定的目的地(Sink)進(jìn)行下一步操作。為確保數(shù)據(jù)能夠成功傳輸,通常Source會(huì)把Event推送至一個(gè)緩沖區(qū)(Channel)中,待確保前一個(gè)Event已由Sink處理完后,Channel再清空自己的緩存數(shù)據(jù)。Sink則負(fù)責(zé)持久化日志或者把事件推送到外部其他的Source。
Flume以Agent為最小的獨(dú)立運(yùn)行單位,每個(gè)Agent由Source、Channel和Sink組件構(gòu)成,F(xiàn)lume結(jié)構(gòu)如圖2所示。
圖2 Flume Agent結(jié)構(gòu)
Source:負(fù)責(zé)接收數(shù)據(jù)或通過特定機(jī)制生成數(shù)據(jù),然后以Flume的Event格式傳遞給一個(gè)或者多個(gè)Channel,F(xiàn)lume提供多種數(shù)據(jù)接收的方式,但Source必須至少和一個(gè)Channel相關(guān)聯(lián)。
Channel:位于Source和Sink間的一種存儲(chǔ)容器,用于緩存Source推送進(jìn)來的數(shù)據(jù),起著鏈接橋梁的作用。Channel將從Source接收到的Event緩存起來,直到它們被Sink消費(fèi)完成。同時(shí),它支持一個(gè)完整的事務(wù),可提供順序保證,這樣就確保了數(shù)據(jù)在收發(fā)時(shí)的一致性,并且可以和任意數(shù)量的Source和Sink工作。
Sink:負(fù)責(zé)將數(shù)據(jù)傳輸?shù)较乱惶蜃罱K目的地,任務(wù)成功結(jié)束后將數(shù)據(jù)從Channel移除。典型的Sink類型為:存儲(chǔ)數(shù)據(jù)到目的的終端Sink,如HDFS、HBase;自動(dòng)消耗的Sinks,如Null Sink;以及用于Agent間通信的IPC Sink,如Avro。
Event:Flume數(shù)據(jù)傳輸?shù)幕締卧?。一行文本?nèi)容會(huì)被反序列化成一個(gè)Event。
2.2.2 數(shù)據(jù)實(shí)時(shí)處理技術(shù)分析
數(shù)據(jù)的實(shí)時(shí)處理主要是對(duì)采集的氣象自動(dòng)站臟數(shù)據(jù)進(jìn)行清洗。數(shù)據(jù)格式錯(cuò)誤、數(shù)值錯(cuò)誤等多種原因?qū)е碌呐K數(shù)據(jù),若不經(jīng)過清洗就直接解析傳入到架構(gòu)組件中,會(huì)產(chǎn)生極大的成本和時(shí)間代價(jià)[17]。數(shù)據(jù)實(shí)時(shí)處理依據(jù)數(shù)據(jù)實(shí)時(shí)傳輸運(yùn)行框架環(huán)境,采用Kafka和Spark Streaming實(shí)現(xiàn)。Kafka是一個(gè)分布式、高吞吐、基于發(fā)布訂閱的消息系統(tǒng)[18],具有持久化、高吞吐、分布式、多客戶端支持以及實(shí)時(shí)等特點(diǎn),適用于離線和在線的消息消費(fèi)。利用Kafka技術(shù)可在廉價(jià)的PC Server上搭建起大規(guī)模消息系統(tǒng),從而大幅提升數(shù)據(jù)實(shí)時(shí)處理能力,其結(jié)構(gòu)如圖3所示。
圖3 Kafka結(jié)構(gòu)
Broker:在Kafka集群上一個(gè)服務(wù)器稱為一個(gè)Broker。
Topic:每條發(fā)布到Kafka集群的消息都有一個(gè)類別,稱為Topic。
Consumer:向Topic訂閱,并且接受發(fā)布到這些Topic的消息。
Producer:負(fù)責(zé)發(fā)布消息到Kafka Broker。
數(shù)據(jù)與處理:主要對(duì)數(shù)據(jù)的合法性進(jìn)行檢測,包括界限值、奇異值,以及數(shù)據(jù)內(nèi)部一致性等,該文主要檢測各字段是否與氣象行業(yè)數(shù)據(jù)質(zhì)控體系相符,如要素?cái)?shù)值、日期等格式。
2.2.3 數(shù)據(jù)分布式存儲(chǔ)技術(shù)分析
經(jīng)過Spark Streaming 流式處理后的數(shù)據(jù)將存儲(chǔ)到HBase[19]。HBase的實(shí)現(xiàn)包括三個(gè)主要功能組件,即庫函數(shù),一個(gè)Master主服務(wù)器和許多個(gè)RegionServer。主服務(wù)器Master負(fù)責(zé)管理和維護(hù)HBase表的分區(qū)信息,維護(hù)RegionServer列表、分配Region和負(fù)載均衡。RegionServer存儲(chǔ)和維護(hù)分配給自己的Region,用來處理來自客戶端的讀寫請(qǐng)求??蛻舳藙t不依賴于Master,它是通過請(qǐng)求ZooKeeper獲取存儲(chǔ)了Region和RegionServer映射關(guān)系的元數(shù)據(jù)表信息,然后直接從RegionServer獲取數(shù)據(jù)。ZooKeeper是一個(gè)分布式應(yīng)用程序協(xié)調(diào)服務(wù)[20-21],提供統(tǒng)一命名服務(wù)、配置管理和分布式鎖等基礎(chǔ)服務(wù)[22],具有集群管理、Master選舉、分布式協(xié)調(diào)通知和分布式隊(duì)列等功能[23]。基于以上所述的工作模式,實(shí)現(xiàn)了HBase快速響應(yīng)的特點(diǎn),其結(jié)構(gòu)如圖4所示。
圖4 HBase結(jié)構(gòu)
在HBase的使用中,RowKey的設(shè)計(jì)極為重要。HBase按照RowKey的順序去遍歷所有可能的數(shù)據(jù),然后再依次匹配相應(yīng)列的值,直到獲取所需的數(shù)據(jù)。若RowKey設(shè)計(jì)不合理,會(huì)造成單個(gè)Region訪問壓力過大,難以有效發(fā)揮其處理性能。因此,設(shè)計(jì)的RowKey要確保其具有唯一性,然后充分利用其有序性,均勻地分布在各個(gè)HBase節(jié)點(diǎn)上。
2.3.1 數(shù)據(jù)實(shí)時(shí)采集設(shè)計(jì)
將自動(dòng)站文件目錄設(shè)置為Flume監(jiān)控目錄,F(xiàn)lume將收到的源文件實(shí)時(shí)解析為數(shù)據(jù)流發(fā)送到Spark Streaming的監(jiān)控輸入目錄。Flume是分布式的,可以同步處理到達(dá)的多個(gè)文件,同時(shí)它也提供了許多可調(diào)的故障恢復(fù)和容錯(cuò)機(jī)制,當(dāng)某個(gè)節(jié)點(diǎn)出現(xiàn)故障時(shí),數(shù)據(jù)能夠被傳送到其他節(jié)點(diǎn)上而不會(huì)丟失,從而保證數(shù)據(jù)的完整性。以下是源文件以氣壓要素進(jìn)行解析數(shù)據(jù)流的代碼樣例。
static List
String[] tmp = null;
FileReader fr = new FileReader(FileName);
BufferedReader br = new BufferedReader(fr);
while ((line = br.readLine()) != null) {
tmp = line.split(" ");
for (int i = 0; i < tmp.length; i++) {
tmp_list.add(tmp[i]);
}
}
p_list.add(tmp_list.get(1));//獲取氣壓標(biāo)識(shí)段所有元素?cái)?shù)據(jù)
String[] p Tmp = null;
p Tmp = p_list.get(0).split(" ");
String station_P = p Tmp[1];//臺(tái)站氣壓數(shù)據(jù)
String sea_P = p Tmp[2];//海平面氣壓數(shù)據(jù)
PressureClass2 pc2 = new PressureClass2();
pc2.setStation_P(station_P);//封裝臺(tái)站氣壓數(shù)據(jù)
pc2.setSea_P(sea_P); //封裝海平面氣壓數(shù)據(jù)
2.3.2 數(shù)據(jù)實(shí)時(shí)處理設(shè)計(jì)
數(shù)據(jù)在前端由Flume收集起來,通過Kafka來做緩存和容災(zāi),最后由Spark Streaming來做實(shí)時(shí)處理。為減小代碼間的耦合性,設(shè)計(jì)中將自動(dòng)站各要素的質(zhì)控算法代碼單獨(dú)放入一個(gè)特定的Streaming來對(duì)數(shù)據(jù)流進(jìn)行實(shí)時(shí)清洗,再將Kafka的Topic中清洗完成后的數(shù)據(jù)放入另一個(gè)Topic中供后續(xù)的業(yè)務(wù)來處理。相關(guān)代碼示意如下:
String topics = "weatherTopic";
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("name");
JavaSparkContext sparkContext = new JavaSparkContext(conf);
sparkContext.setLogLevel("WARN");
JavaStreamingContext ssc = new JavaStreamingContext(sparkContext, Durations.seconds(1));//初始化StringContext
Collection
Map
kafkaParams.put("metadata.broker.list", "ip:port");//消費(fèi)端設(shè)置
kafkaParams.put("bootstrap.servers", " ip:port ");
2.3.3 數(shù)據(jù)質(zhì)控算法設(shè)計(jì)
氣象自動(dòng)站源文件數(shù)據(jù)要素多達(dá)100余種,設(shè)計(jì)中針對(duì)常用的基本氣象數(shù)據(jù)要素,如氣壓、溫度、降水、風(fēng)速、濕度等,依據(jù)氣象行業(yè)標(biāo)準(zhǔn)質(zhì)控技術(shù)規(guī)程中對(duì)質(zhì)控對(duì)象的分類,進(jìn)行流數(shù)據(jù)的基本質(zhì)控算法設(shè)計(jì)。本質(zhì)控算法設(shè)計(jì)分為分鐘數(shù)據(jù)質(zhì)控和小時(shí)數(shù)據(jù)質(zhì)控兩類,質(zhì)控方法則選用基本的格式檢查法、界限值檢查法和內(nèi)部一致性檢查法,相關(guān)規(guī)則如下所述。
規(guī)則1 在QC方法質(zhì)控碼(QCcode)中,為每個(gè)QC方法設(shè)置7級(jí)質(zhì)控碼,取值范圍為-3~3,其中0表示正確,±1表示可疑,±2表示警告,±3表示數(shù)據(jù)錯(cuò)誤。QC碼的符號(hào)表示疑誤數(shù)據(jù)偏離真值的方向,負(fù)號(hào)表示疑誤數(shù)據(jù)偏小,正號(hào)表示疑誤數(shù)據(jù)偏大,即數(shù)據(jù)質(zhì)量隨著控制碼數(shù)值絕對(duì)值的增加而降低。為方便使用,提出各質(zhì)控碼的符號(hào)表示形式,用f(e)表示數(shù)據(jù)格式檢查法的QC碼。f1(e)表示界限值檢查法的QC碼,f2(e)表示范圍值檢查法的QC碼,f3(e)表示內(nèi)部一致性檢查法的QC碼,其中e表示質(zhì)控的要素,如P表示氣壓,T表示溫度,U表示相對(duì)濕度,F(xiàn)表示風(fēng)速,R表示降水。
規(guī)則2 在數(shù)據(jù)格式檢查法中,按照地面自動(dòng)站氣象要素資料(國家站)格式說明。本站氣壓、海平面氣壓、最高本站氣壓、最低本站氣壓字段長度均為5 Byte;臺(tái)站氣溫、最高氣溫、最低氣溫、分鐘相對(duì)濕度、最小相對(duì)濕度、2分鐘平均風(fēng)速、10分鐘平均風(fēng)速、最大風(fēng)速、小時(shí)降水量、每1小時(shí)極大風(fēng)速、過去6小時(shí)極大風(fēng)速、過去12小時(shí)極大風(fēng)速均為4 Byte。
規(guī)則3 在界限值檢查法中,各氣象要素的界限值閾值范圍參考?xì)庀笥^測規(guī)范進(jìn)行設(shè)定,如 氣壓數(shù)據(jù)定義范圍為[500,1 200],溫度為[-55,55],濕度為[0,100],小時(shí)降水為[0,600],風(fēng)速為[0,150]。
規(guī)則4 在內(nèi)部一致性檢查法中,定義氣壓、溫度和相對(duì)濕度的第60分鐘的數(shù)據(jù)同小時(shí)正點(diǎn)數(shù)據(jù)不一致即為警告數(shù)據(jù),即E59≠En。此外,對(duì)于同一文件內(nèi)的數(shù)據(jù),當(dāng)前時(shí)刻的氣壓、溫度要素值應(yīng)介于最小值與最大值之間,即Emin≤En≤Emax,但需提出的是,當(dāng)前的溫度值(用Tn表示)應(yīng)不小于當(dāng)前的露點(diǎn)溫度值(用Td表示),即Td≤Tn。對(duì)于特殊的要素,如相對(duì)濕度,其當(dāng)前時(shí)刻的值不應(yīng)小于最小相對(duì)濕度值,即Emin≤En,本定義中的E均表示質(zhì)控的要素。具體算法說明如表1和表2所示。
表1 分鐘數(shù)據(jù)質(zhì)控算法設(shè)計(jì)說明
表2 小時(shí)數(shù)據(jù)質(zhì)控算法設(shè)計(jì)說明
2.3.4 數(shù)據(jù)分布式存儲(chǔ)表結(jié)構(gòu)設(shè)計(jì)
在該系統(tǒng)數(shù)據(jù)庫的表結(jié)構(gòu)設(shè)計(jì)中,需充分考慮源文件數(shù)據(jù)中所含基本信息要素的唯一性,利用MD5方法將行主鍵站號(hào)散列化,方便將所有數(shù)據(jù)散列到不同的Region上,從而有助于提高數(shù)據(jù)的查詢響應(yīng)效率。因此將行主鍵設(shè)計(jì)采用MD5(站號(hào))+源文件內(nèi)觀測時(shí)間組合的方法存儲(chǔ)Spark Streaming實(shí)時(shí)質(zhì)控后的要素?cái)?shù)據(jù),將質(zhì)控后的數(shù)據(jù)存儲(chǔ)在一個(gè)定義為aws_qcalldata的列族中,具體設(shè)計(jì)如表3所示。
表3 自動(dòng)站表結(jié)構(gòu)
MD5算法是在MD4算法基礎(chǔ)上由美國密碼學(xué)家羅納德·李維斯特(Ronald Linn Rivest)設(shè)計(jì),通過該算法能夠?qū)⑷我忾L度的文本轉(zhuǎn)換為一個(gè)固定長度(128位)的散列值,成為一個(gè)不可逆的字符串,從而有效保證了數(shù)據(jù)的安全性和信息傳輸?shù)耐暾訹24-26]。功能實(shí)現(xiàn)代碼樣例如下:
public static String getSaltMD5(String password) {
Random random = new Random();
StringBuilder sBuilder = new StringBuilder(16);
sBuilder.append(random.nextInt(99999999)).append(random.nextInt(99999999));
int len = sBuilder.length();
if (len < 16) {
for (int i = 0; i < 16 - len; i++) {
sBuilder.append("0");
}
}
String salt = sBuilder.toString();
password = md5Hex(password + salt);
char[] cs = new char[48];
for (int i = 0; i < 48; i += 3) {
cs[i] = password.charAt(i / 3 * 2);
char c = salt.charAt(i / 3);
cs[i + 1] = c;
cs[i + 2] = password.charAt(i / 3 * 2 + 1);
}
return String.valueOf(cs);
}
bc.setId(MD5Utils.getSaltMD5(stationNum)+ObservTime);//basic info 字段引入MD5算法
系統(tǒng)測試運(yùn)行環(huán)境采用4臺(tái)虛擬化服務(wù)器做集群,其具體部署情況如表4所示。
表4 測試環(huán)境部署架構(gòu)
系統(tǒng)運(yùn)行環(huán)境基于Spark 2.1.3版本,在程序中引入,數(shù)據(jù)庫集群由MPPDB 6.5.1.5構(gòu)建,F(xiàn)lume版本基于1.7.0,kafka為1.0.0,以及jdk 1.8。
江蘇共有國家基本氣象觀測自動(dòng)站70余個(gè),每個(gè)文件表示該臺(tái)站在某時(shí)刻所采集的氣象數(shù)據(jù)信息,文件內(nèi)容包含了百余類氣象要素?cái)?shù)據(jù)。測試中則采用實(shí)時(shí)業(yè)務(wù)應(yīng)用的國家基本氣象觀測站共享目錄中文本文件作為源數(shù)據(jù),將數(shù)據(jù)滾動(dòng)復(fù)制到應(yīng)用服務(wù)器Flume實(shí)時(shí)監(jiān)聽目錄data/listen_data中,使用Spark Streaming將Kakfa中的流數(shù)據(jù)寫入MPPDB,針對(duì)單個(gè)源文件及多個(gè)源文件從目錄監(jiān)聽、解析、實(shí)時(shí)質(zhì)控至數(shù)據(jù)入庫的全流程耗時(shí)分別進(jìn)行5次統(tǒng)計(jì),取平均耗時(shí)作為測試結(jié)果,具體數(shù)據(jù)如表5和表6所示。
表5 單文件數(shù)據(jù)流處理性能測試結(jié)果 ms
分析表5和表6可知,單站文件從源文件解析至入庫全流程平均耗時(shí)在4 s以內(nèi);多個(gè)文件同時(shí)傳輸時(shí),每個(gè)文件入庫的全流程平均耗時(shí)約1.46 s,能夠達(dá)到秒級(jí)數(shù)據(jù)處理能力,與現(xiàn)有的氣象自動(dòng)站數(shù)據(jù)在1 min內(nèi)到達(dá)預(yù)報(bào)員桌面的要求相比,完全滿足實(shí)時(shí)業(yè)務(wù)的應(yīng)用需求。
表6 多文件數(shù)據(jù)流處理性能測試結(jié)果
在多場景數(shù)據(jù)查詢性能測試中,根據(jù)常用的業(yè)務(wù)查詢需求,將多個(gè)場景的SQL寫入至一個(gè)查詢文件,同時(shí)對(duì)MPPDB數(shù)據(jù)庫的數(shù)據(jù)翻倍到TB量級(jí)后進(jìn)行批量查詢,查三次取平均值。
測試結(jié)果如表7所示。
表7 多場景數(shù)據(jù)查詢性能測試結(jié)果
測試結(jié)果表明,在不同場景的查詢條件下,該系統(tǒng)的點(diǎn)查詢響應(yīng)為毫秒級(jí),加權(quán)查詢?yōu)槊爰?jí),能夠有效地支撐實(shí)時(shí)業(yè)務(wù)中對(duì)氣象自動(dòng)站數(shù)據(jù)的查詢應(yīng)用。
從業(yè)務(wù)實(shí)際應(yīng)用需求出發(fā),基于Spark Streaming的流式計(jì)算框架,開展以數(shù)據(jù)流式采集、傳輸、質(zhì)控、存儲(chǔ)為一體的氣象自動(dòng)站數(shù)據(jù)全流程設(shè)計(jì)及應(yīng)用研究,通過模擬業(yè)務(wù)中使用的情景進(jìn)行性能測試,驗(yàn)證該系統(tǒng)的可行性和適用性。通過對(duì)測試結(jié)果的分析,表明該系統(tǒng)能夠有效地提升氣象自動(dòng)站數(shù)據(jù)的實(shí)時(shí)處理和查詢能力,與現(xiàn)有的數(shù)據(jù)處理系統(tǒng)相比具有以下優(yōu)點(diǎn):
(1) 將自動(dòng)站數(shù)據(jù)常用的氣象要素質(zhì)控算法設(shè)計(jì)并融入流處理組件中,實(shí)現(xiàn)基于數(shù)據(jù)流式采集、傳輸、質(zhì)控、存儲(chǔ)為一體的氣象自動(dòng)站數(shù)據(jù)全流程處理功能,減少了數(shù)據(jù)落地處理節(jié)點(diǎn),進(jìn)一步提高了數(shù)據(jù)處理的時(shí)效性和可用性。
(2)實(shí)現(xiàn)數(shù)據(jù)加密算法設(shè)計(jì)并應(yīng)用在分布式數(shù)據(jù)庫中,在提高數(shù)據(jù)存儲(chǔ)和檢索效率的同時(shí),極大地提升了數(shù)據(jù)的安全性。
(3)系統(tǒng)的設(shè)計(jì)開發(fā)從實(shí)際業(yè)務(wù)應(yīng)用需求出發(fā),且部署靈活,可作為省級(jí)自動(dòng)站質(zhì)控?cái)?shù)據(jù)存儲(chǔ)的實(shí)時(shí)備份,為氣象業(yè)務(wù)提供更加可靠的數(shù)據(jù)保障。
但此系統(tǒng)在應(yīng)用研究中也存在一些不足,如搭建系統(tǒng)環(huán)境的虛擬計(jì)算資源比較缺乏,數(shù)據(jù)庫和應(yīng)用服務(wù)器配置低于部署要求的最低配置標(biāo)準(zhǔn)。同時(shí),在集群應(yīng)用的規(guī)劃上,為節(jié)約資源,將主應(yīng)用和中間件部署在同一臺(tái)虛擬服務(wù)器上,一定程度上都影響了數(shù)據(jù)實(shí)時(shí)處理的性能。在后續(xù)的應(yīng)用研究中,將把文件所有氣象要素?cái)?shù)據(jù)的實(shí)時(shí)質(zhì)控處理納入設(shè)計(jì)工作,進(jìn)一步完善數(shù)據(jù)質(zhì)控算法,搭建資源充足的系統(tǒng)環(huán)境,優(yōu)化Spark Streaming的作業(yè)動(dòng)態(tài)調(diào)度配置以及調(diào)優(yōu)系統(tǒng)集群各組件的一致性、容錯(cuò)性和高可用性等,開展更為深入的業(yè)務(wù)應(yīng)用研究。