張文彬,王春梅,王 靜,陳 托,智 佳
(1.中國(guó)科學(xué)院 國(guó)家空間科學(xué)中心,北京 100190;2.中國(guó)科學(xué)院大學(xué) 計(jì)算機(jī)科學(xué)與技術(shù)學(xué)院,北京 100049)
科學(xué)衛(wèi)星有效載荷產(chǎn)生的科學(xué)探測(cè)數(shù)據(jù)具有數(shù)據(jù)量大、參數(shù)多、處理實(shí)時(shí)性要求高的特點(diǎn),其中參數(shù)解析是有效載荷數(shù)據(jù)實(shí)時(shí)處理的關(guān)鍵環(huán)節(jié),其參數(shù)越多數(shù)據(jù)量越大,解析過(guò)程就越復(fù)雜越耗時(shí)。目前有效載荷數(shù)據(jù)的實(shí)時(shí)參數(shù)解析主要采用單機(jī)多線(xiàn)程處理方法[1],其存在吞吐率低、擴(kuò)展能力弱的不足,因此,提高有效載荷參數(shù)解析的速率具有必要性。
當(dāng)前主流的大數(shù)據(jù)流式計(jì)算框架Storm、Spark Streaming等具有低延遲、高吞吐、可擴(kuò)展等優(yōu)勢(shì)[2,3],本文結(jié)合衛(wèi)星有效載荷數(shù)據(jù)流的特點(diǎn)[4],利用大數(shù)據(jù)計(jì)算框架良好的實(shí)時(shí)處理性能和易擴(kuò)展的能力,以提高有效載荷參數(shù)解析的吞吐率[5]。其中Spark[6,7]提供的生態(tài)系統(tǒng)具備同時(shí)支持批處理、交互式查詢(xún)和流數(shù)據(jù)處理的優(yōu)勢(shì),可實(shí)現(xiàn)數(shù)據(jù)的無(wú)縫共享。Spark Streaming是Spark計(jì)算引擎內(nèi)的流式計(jì)算框架,因此,本文結(jié)合Spark Streaming和Kafka[8,9],設(shè)計(jì)并實(shí)現(xiàn)了一種有效載荷實(shí)時(shí)參數(shù)解析的處理方法,以提高有效載荷數(shù)據(jù)參數(shù)解析處理的實(shí)時(shí)性。
有效載荷數(shù)傳數(shù)據(jù),其格式遵循國(guó)際空間數(shù)據(jù)系統(tǒng)咨詢(xún)委員會(huì)(consultative committee for space data systems,CCSDS)的高級(jí)在軌系統(tǒng)(advanced orbit system,AOS)標(biāo)準(zhǔn)[10],有效載荷數(shù)傳數(shù)據(jù)結(jié)構(gòu)見(jiàn)表1。
有效載荷數(shù)傳數(shù)據(jù)的處理[11,12]步驟如圖1所示,在對(duì)CCSDS格式的數(shù)傳/遙測(cè)數(shù)據(jù)進(jìn)行AOS幀同步、解密、解擾、分包、拼接等預(yù)處理之后,形成中間數(shù)據(jù)格式,稱(chēng)為數(shù)據(jù)幀,其主要結(jié)構(gòu)見(jiàn)表2,其中,數(shù)據(jù)段部分存放各類(lèi)參數(shù)的二進(jìn)制編碼,參數(shù)解析過(guò)程即是針對(duì)數(shù)據(jù)段中的各個(gè)參數(shù)編碼進(jìn)行處理,將其按要求解析為電流、溫度等物理量。
表1 有效載荷數(shù)傳數(shù)據(jù)結(jié)構(gòu)
圖1 數(shù)據(jù)處理流程
傳輸標(biāo)簽衛(wèi)星標(biāo)識(shí)消息長(zhǎng)度幀計(jì)數(shù)衛(wèi)星時(shí)間碼狀態(tài)量計(jì)數(shù)2B2B4B6B1B2B數(shù)據(jù)段應(yīng)用數(shù)據(jù)變長(zhǎng)
為提高有效載荷數(shù)據(jù)的實(shí)時(shí)參數(shù)解析吞吐率,采用基于Spark Streaming與Kafka相結(jié)合的方法,處理流程如圖2所示,利用Kafka集群作為消息中間件實(shí)現(xiàn)數(shù)據(jù)分流,為數(shù)據(jù)接入提供保障,流式計(jì)算部分采用Spark Streaming集群作為計(jì)算平臺(tái),通過(guò)Spark Streaming獲取Kafka消息隊(duì)列的數(shù)據(jù)[13],并對(duì)參數(shù)進(jìn)行解析,然后將解析結(jié)果發(fā)送給Kafka作為數(shù)據(jù)緩沖區(qū)進(jìn)行合并,最終將計(jì)算結(jié)果發(fā)送給實(shí)現(xiàn)參數(shù)錄入的軟件。
圖2 系統(tǒng)處理流程
為避免數(shù)據(jù)源產(chǎn)生堆積,提高數(shù)據(jù)處理的速率,采用Kafka集群實(shí)現(xiàn)數(shù)據(jù)分流。Kafka是一種基于發(fā)布/訂閱的分布式消息系統(tǒng),可以在多個(gè)分布式生產(chǎn)者、消費(fèi)者并發(fā)的情況下,保證消息的有序性和負(fù)載均衡,可同時(shí)支持離線(xiàn)數(shù)據(jù)和實(shí)時(shí)數(shù)據(jù)的處理,其吞吐量可隨集群的擴(kuò)展而線(xiàn)性增加,且消息持久化的時(shí)間復(fù)雜度為O(1)[14],具有高吞吐率、高可靠性和易擴(kuò)展的優(yōu)點(diǎn)。
Kafka集群部署模式如圖3所示,在Kafka集群部署架構(gòu)中,可以存在多個(gè)Producer(生產(chǎn)者),生產(chǎn)者負(fù)責(zé)收集消息并將消息發(fā)布到Broker(代理)相應(yīng)的Topic(主題)中,Broker接收消息,并將消息在本地持久化,數(shù)據(jù)按照Topic名存儲(chǔ)在不同分類(lèi)中,一個(gè)Topic可以分成多個(gè)Partition(分區(qū)),每個(gè)Partition內(nèi)部消息強(qiáng)有序,將數(shù)據(jù)處理為多個(gè)分區(qū)的消息隊(duì)列流,用以作為中間數(shù)據(jù)源,在隊(duì)列底端存在多個(gè)Consumer(消費(fèi)者)[15]。消費(fèi)者是消息的真正使用者,從Topic中讀取隊(duì)列消息進(jìn)行處理[16]。其中Broker1、Broker2分別部署在不同服務(wù)器上,Spark Streaming的實(shí)時(shí)計(jì)算程序充當(dāng)消費(fèi)者訂閱Topic1,當(dāng)Topic1中有數(shù)據(jù),會(huì)將數(shù)據(jù)不停的從集群的指定消息隊(duì)列中發(fā)送給消費(fèi)者做參數(shù)解析處理。
圖3 Kafka集群部署模式
Spark Streaming是在Spark架構(gòu)上基于離散化數(shù)據(jù)流(discretized stream,DStream)模型擴(kuò)展的分布式流式計(jì)算框架,其中,DStream表示持續(xù)不斷的數(shù)據(jù)流,其可以是不同類(lèi)型數(shù)據(jù)源的數(shù)據(jù),包括文件流、套接字流、基于Kafka的輸入數(shù)據(jù)流等[17]。Spark Streaming可以在多達(dá)100個(gè)節(jié)點(diǎn)上運(yùn)行,實(shí)時(shí)處理吞吐率能達(dá)到秒級(jí)的延遲需求[18],可以有效實(shí)現(xiàn)高吞吐的參數(shù)解析處理,并且Spark Streaming支持節(jié)點(diǎn)的錯(cuò)誤恢復(fù),是具備容錯(cuò)機(jī)制的實(shí)時(shí)流數(shù)據(jù)的處理框架[19]。
因此,采用Spark Streaming實(shí)現(xiàn)有效載荷數(shù)據(jù)幀的實(shí)時(shí)參數(shù)解析,搭建Spark Streaming集群,設(shè)置集群中的主從節(jié)點(diǎn),其中主節(jié)點(diǎn)負(fù)責(zé)資源分配,從節(jié)點(diǎn)負(fù)責(zé)監(jiān)控本節(jié)點(diǎn)的CPU及內(nèi)存情況,接收主節(jié)點(diǎn)命令。將Spark Strea-ming 作為消費(fèi)者訂閱Kafka集群中的Topic1,當(dāng)Topic1中有數(shù)據(jù)時(shí),消費(fèi)者從消息隊(duì)列中獲取數(shù)據(jù)。Spark Strea-ming 處理進(jìn)程在獲取數(shù)據(jù)之后按照參數(shù)名、起止位置、轉(zhuǎn)換公式等結(jié)構(gòu)信息,對(duì)參數(shù)進(jìn)行解析,其處理架構(gòu)如圖4所示,主要包括如下步驟。
(1)數(shù)據(jù)分流
利用Kafka將數(shù)據(jù)源轉(zhuǎn)換為消息隊(duì)列流,按FIFO(first input first output,先進(jìn)先出)方式有序緩存于 Topic1 中的各個(gè)Partition中,等待Spark處理進(jìn)程作為消費(fèi)者消費(fèi)數(shù)據(jù)。
(2)Spark處理進(jìn)程
主節(jié)點(diǎn)為從節(jié)點(diǎn)的Executor(執(zhí)行進(jìn)程)分配內(nèi)存、CPU內(nèi)核等資源,并啟動(dòng)Executor進(jìn)程,每個(gè)從節(jié)點(diǎn)運(yùn)行若干Executor,每個(gè)Executor獨(dú)立運(yùn)行參數(shù)解析處理程序,即將數(shù)據(jù)幀按照數(shù)據(jù)格式中的參數(shù)名、起止位置、轉(zhuǎn)換公式等信息進(jìn)行解析,解析結(jié)果作為Producer發(fā)送給Kafka的Topic2。
(3)歸并
將Topic2中各個(gè)Partition的參數(shù)結(jié)果按時(shí)間先后順序進(jìn)行歸并。Kafka的Partition內(nèi)部消息強(qiáng)有序,從各Partition中獲取結(jié)果數(shù)據(jù)1~n,按時(shí)間順序排序?qū)?shù)解析結(jié)果合并,最后將參數(shù)存入數(shù)據(jù)庫(kù)。
搭建3臺(tái)虛擬機(jī)組成Spark Streaming集群,集群配置見(jiàn)表3。
測(cè)試采用表2格式的仿真數(shù)據(jù)進(jìn)行實(shí)驗(yàn),對(duì)單機(jī)多線(xiàn)程方法與基于Spark Streaming的集群方法進(jìn)行了仿真測(cè)試,令單機(jī)多線(xiàn)程方法運(yùn)行在表3中的任一臺(tái)從節(jié)點(diǎn)(Worker1或Worker2)虛擬機(jī)上,基于Spark Streaming的集群方法運(yùn)行在表3中的3臺(tái)虛擬機(jī)搭建的集群上,測(cè)試結(jié)果見(jiàn)表4,結(jié)果表明在相同的處理單元數(shù)量下,單機(jī)多線(xiàn)程處理方法的吞吐率為10.24 Mbps,基于Spark Streaming的集群方法為25.56 Mbps,相比單機(jī)多線(xiàn)程方法的數(shù)據(jù)處理吞吐率提高了150%,并且基于Spark Streaming的集群方法可以通過(guò)增加從節(jié)點(diǎn)的方式進(jìn)一步提升處理速率,具有很強(qiáng)的擴(kuò)展能力,在實(shí)時(shí)參數(shù)解析處理中更具優(yōu)勢(shì)。
本文提出并實(shí)現(xiàn)了一種基于Spark的有效載荷實(shí)時(shí)參數(shù)解析處理方法,采用了Apache Kafka和Spark Streaming相結(jié)合的處理方法,利用Kafka對(duì)有效載荷實(shí)時(shí)數(shù)據(jù)分流,Spark Streaming獲取數(shù)據(jù)并進(jìn)行實(shí)時(shí)參數(shù)解析,解決了單機(jī)多線(xiàn)程方法在吞吐率和擴(kuò)展能力上的局限性,提高了有效載荷參數(shù)解析處理的實(shí)時(shí)吞吐率,仿真結(jié)果表明,所提方法相比目前的單機(jī)多線(xiàn)程方法在相同處理單元配置下的數(shù)據(jù)吞吐率提高了150%,具有更優(yōu)的實(shí)時(shí)參數(shù)解析能力。
圖4 參數(shù)解析處理架構(gòu)
序號(hào)名稱(chēng)角色配置1Master主節(jié)點(diǎn)CPU:2.83 GHz,2核心;內(nèi)存:6 G;硬盤(pán):20 GB;操作系統(tǒng):CentOS72Worker1從節(jié)點(diǎn)CPU:2.83 GHz,4核心;內(nèi)存:4 G;硬盤(pán):20 GB;操作系統(tǒng):CentOS73Worker2從節(jié)點(diǎn)CPU:2.83 GHz,4核心;內(nèi)存:4 G;硬盤(pán):20 GB;操作系統(tǒng):CentOS7
表4 兩種方法測(cè)試結(jié)果比較