戴 偉,汪 森,李秋虹,鄧 輝 ,梅 盈,王 鋒,
(1. 昆明理工大學(xué)云南省計(jì)算機(jī)技術(shù)應(yīng)用重點(diǎn)實(shí)驗(yàn)室,云南 昆明 650051;2. 復(fù)旦大學(xué),上海 210000;3. 廣州大學(xué)天體物理中心,廣東 廣州 510006)
為了進(jìn)一步開展當(dāng)前重大科學(xué)問題如暗物質(zhì)和暗能量、黑洞和致密天體、宇宙起源、天體起源以及宇宙生命起源等研究工作的需要,科學(xué)家提出了平方千米陣列項(xiàng)目[1-2],并成為國(guó)際上即將建造的最大綜合孔徑射電望遠(yuǎn)鏡[3],得到全球天文學(xué)家的重點(diǎn)關(guān)注。平方千米陣列建在澳大利亞、南非及南部非洲8個(gè)國(guó)家的無線電寧?kù)o區(qū),其接收面積可達(dá)1 km2,頻率覆蓋50 MHz~20 GHz。平方千米陣列作為下一代射電望遠(yuǎn)鏡,具有極高的靈敏度(比央斯基甚大陣(Jansky Very Large Array, JVLA)靈敏度提高50倍,搜尋速度提高10 000倍),以千千米的基線獲得極高的空間分辨率,以納秒級(jí)的采樣獲得精細(xì)的時(shí)間結(jié)構(gòu),以10 Pb/s的速率產(chǎn)生超越全球互聯(lián)網(wǎng)總量的數(shù)據(jù)。以寬視場(chǎng)、多波束、高動(dòng)態(tài)、高分辨和大數(shù)據(jù)為核心概念的平方千米陣列將顛覆射電天文學(xué)的傳統(tǒng)研究手段,給天文學(xué)研究帶來革命性全新的理念。
平方千米陣列高分辨率、高靈敏度、高動(dòng)態(tài)范圍和寬視場(chǎng)等新特性,給數(shù)據(jù)處理帶來了前所未有的挑戰(zhàn)。平方千米陣列的巨大規(guī)模和復(fù)雜程度遠(yuǎn)遠(yuǎn)超出了現(xiàn)有射電天文望遠(yuǎn)鏡陣列,全規(guī)模運(yùn)行的平方千米陣列產(chǎn)生的海量數(shù)據(jù)需要10億億次/秒的處理能力,是目前世界上最快的超級(jí)計(jì)算機(jī)神威太湖之光處理能力(0.9億億次/秒)的10倍??紤]到計(jì)算效率和軟件執(zhí)行效率(目前天文軟件在超算平臺(tái)上的執(zhí)行效率普遍在10%,甚至更低),實(shí)際需求將大大超出這個(gè)理論估算[4]。
為了充分利用計(jì)算資源,確保數(shù)據(jù)處理流程的可靠性,近5年來,平方千米陣列的科學(xué)數(shù)據(jù)處理器(Science Data Processor, SDP)一直在研究與測(cè)試執(zhí)行框架(Execution Framework, EF)技術(shù),以期找到滿足未來發(fā)展要求、性能突出的執(zhí)行框架技術(shù)。其中,除了本文作者所在項(xiàng)目組參與的DALiuGE等相關(guān)工作[5-6]以外,也一直在研討商用執(zhí)行框架如Spark[7-8],ASK[9]的可用性。本文針對(duì)這方面的工作,細(xì)致討論了Spark執(zhí)行框架在未來平方千米陣列科學(xué)數(shù)據(jù)處理中的可用性。
Spark于2009年誕生于加州大學(xué)伯克利分校AMPLab,已經(jīng)成為Apache軟件基金會(huì)旗下的頂級(jí)開源項(xiàng)目。相對(duì)于MapReduce上的批量計(jì)算、迭代型計(jì)算以及基于Hive的SQL查詢,Spark可以帶來上百倍的性能提升。目前Spark的生態(tài)系統(tǒng)日趨完善,Spark SQL的發(fā)布、Hive on Spark項(xiàng)目的啟動(dòng)以及大量大數(shù)據(jù)公司對(duì)Spark全棧的支持,讓Spark的數(shù)據(jù)分析范式更加豐富。Spark與Hadoop的MapReduce計(jì)算框架類似,但相對(duì)MapReduce而言,Spark的特點(diǎn)更為突出,如具有可伸縮、基于內(nèi)存計(jì)算等特性,可以直接讀寫Hadoop上任何格式的數(shù)據(jù),在進(jìn)行批處理時(shí)更加高效,延遲更低。目前已經(jīng)成為輕量級(jí)大數(shù)據(jù)快速處理的統(tǒng)一平臺(tái)。這其中,彈性分布式數(shù)據(jù)集(Resilient Distributed Datasets, RDD)是Spark的核心,彈性分布式數(shù)據(jù)集是一種分布式的內(nèi)存抽象,表示一個(gè)只讀的記錄分區(qū)的集合。特別需要關(guān)注的是, Spark是基于內(nèi)存計(jì)算的大數(shù)據(jù)并行計(jì)算框架,提高了在大數(shù)據(jù)環(huán)境下數(shù)據(jù)處理的實(shí)時(shí)性,同時(shí)保證了高容錯(cuò)性和高可伸縮性,允許用戶將Spark部署在大量廉價(jià)硬件之上,形成集群。這對(duì)于當(dāng)前平方千米陣列科學(xué)數(shù)據(jù)處理來說,基于全內(nèi)存與廉價(jià)集群的方式非常有吸引力。
算法參考庫(kù)(1)https://github.com/SKA-ScienceDataProcessor/algorithm-reference-library是由射電天文科學(xué)家Tim Cornwell領(lǐng)銜開發(fā)的射電干涉陣數(shù)據(jù)處理算法驗(yàn)證庫(kù),用于為后續(xù)平方千米陣列的數(shù)據(jù)處理提供算法驗(yàn)證。目前,算法參考庫(kù)基于Python語言,已經(jīng)實(shí)現(xiàn)了射電干涉陣處理的主要算法,全部程序開源,供射電干涉陣數(shù)據(jù)處理參考使用。自2018年平方千米陣列完成主要工作包的關(guān)鍵設(shè)計(jì)評(píng)估,進(jìn)入橋接階段后,算法參考庫(kù)已經(jīng)成為系統(tǒng)學(xué)習(xí)、研究平方千米陣列數(shù)據(jù)處理的基礎(chǔ)參考庫(kù)。
針對(duì)平方千米陣列一期中頻陣可見度校準(zhǔn)問題,當(dāng)前的算法參考庫(kù)中,實(shí)現(xiàn)了一個(gè)全串行的MID1 ICAL管線,分為預(yù)測(cè)、校準(zhǔn)、反饋、去卷積4部分。MID1 ICAL管線的特點(diǎn)如下:
(1)邏輯任務(wù)同時(shí)具有數(shù)據(jù)密集型和計(jì)算密集型的特性;
(2)邏輯任務(wù)之間存在數(shù)據(jù)依賴,且任務(wù)之間通訊量巨大;
(3)需要多次迭代完成,并且兩次迭代之間數(shù)據(jù)需要大量更新;
(4)現(xiàn)有存儲(chǔ)條件無法長(zhǎng)時(shí)間保留原始數(shù)據(jù),需要在一個(gè)觀測(cè)周期內(nèi)完成管線的執(zhí)行。
為了滿足平方千米陣列后續(xù)建設(shè)工作,如何將這樣的串行代碼移植到分布計(jì)算框架下,并分析其實(shí)現(xiàn)方法和性能變化,深入了解不同算法在分布計(jì)算框架下的實(shí)現(xiàn)方式與性能評(píng)價(jià),對(duì)于后續(xù)科學(xué)數(shù)據(jù)處理有重要作用,這也正是平方千米陣列橋接階段的工作重點(diǎn),需求非常迫切。
平方千米陣列一期中頻有197個(gè)天線,最大基線長(zhǎng)度120 km。低頻有130 000個(gè)天線,最長(zhǎng)基線長(zhǎng)度40 km。預(yù)計(jì)平方千米陣列一期的數(shù)據(jù)注入速度約為2 TB/s。為進(jìn)行實(shí)驗(yàn)與性能分析,本文以中頻天線設(shè)計(jì)指標(biāo)為基礎(chǔ),定義基線數(shù)19 306個(gè),8 192個(gè)通道,每個(gè)通道寬度800 MHz,4個(gè)極化以及36個(gè)采樣時(shí)間,總計(jì)算任務(wù)28 800個(gè)。
在未來的系統(tǒng)部署中,成像管線(Imaging Pipeline)和非成像管線(Non-Imaging Pipeline)是其中的兩個(gè)重點(diǎn)。成像管線包括可見度函數(shù)接收、可見度函數(shù)預(yù)處理、實(shí)時(shí)校準(zhǔn)、快速成像、瞬變?cè)春蜻x體檢測(cè)、數(shù)據(jù)緩沖等。非成像管線包括接收脈沖星守時(shí)特性文件、脈沖星候選體接收、瞬變觀測(cè)緩沖、脈沖星守時(shí)處理等。從當(dāng)前平方千米陣列的建設(shè)看,這些管線的研制是整個(gè)數(shù)據(jù)處理系統(tǒng)的核心。本文討論的管線,是實(shí)時(shí)校準(zhǔn)管線的一個(gè)關(guān)鍵部分。
圖1給出了成像和校準(zhǔn)管線單一通道對(duì)應(yīng)的邏輯任務(wù),將這樣的邏輯任務(wù)由串行轉(zhuǎn)為分布計(jì)算,最關(guān)鍵的是將原來可以緊耦合的功能轉(zhuǎn)變?yōu)樗缮Ⅰ詈系哪K。重點(diǎn)說明Reppre_ifft和Degrid功能部分和Pharotpre_dft_sumvis功能部分。
圖1 MID1 ICAL管線單一通道對(duì)應(yīng)的邏輯任務(wù),括號(hào)內(nèi)是需要處理的數(shù)據(jù)量
Reppre_ifft和Degrid階段是根據(jù)局部天空模型,預(yù)測(cè)觀測(cè)天空的可見數(shù)據(jù),開發(fā)中利用了算法參考庫(kù)函數(shù)。這一部分并行的實(shí)現(xiàn)方法是對(duì)頻段、分片、時(shí)間片等進(jìn)行數(shù)據(jù)分片。最終對(duì)于MID1 ICAL管線,采用以單個(gè)頻段(不同頻段可以并行處理)分拆任務(wù)的方法實(shí)現(xiàn)分布并行計(jì)算,Reppre_ifft任務(wù)和Degrid任務(wù)的關(guān)系如圖2。
圖2 Degkerupd_deg階段任務(wù)依賴關(guān)系圖Fig.2 The dependency diagram in Degkerupd_deg phase
Reppre_ifft階段主要是對(duì)圖像類和Skycomponent的處理,整個(gè)過程的輸入是圖像和Skycomponent,輸出是圖像,并送入下一階段。代碼基本調(diào)用過程包括:
(1)arl/skycomponent/operations/insert_skycomponent:將skycomponent中的信息按照nchan和npol兩個(gè)軸插入到圖像,關(guān)聯(lián)函數(shù)包括:insert_function_sinc, insert_function_L, insert_function_pswf, insert_function_array;
(2)arl/fourier_transforms/fft_support/fft:對(duì)圖像進(jìn)行傅里葉變換;
(3)arl/image/operations/reproject_image:將切片后的圖像按照新的世界坐標(biāo)系統(tǒng)進(jìn)行重投影;
(4)arl/image/iterators/raster_iter:Image的切片函數(shù)
Degrid階段主要是對(duì)圖像中的數(shù)據(jù)進(jìn)行卷積,具體卷積的過程和可見度數(shù)據(jù)相關(guān),并將結(jié)果放入本來為空的Visibility類的Data屬性中。
在卷積之前,圖像先被填充到切片之前的大小,即在它的周圍填充0。然后整個(gè)矩陣再和前一步在get_kernelist中得到的griding correction function做一個(gè)點(diǎn)對(duì)點(diǎn)的乘法,對(duì)得到的結(jié)果再做快速傅里葉變換,就得到了通道化的圖像,這個(gè)通道化的圖像接下來將被卷積。
在分布執(zhí)行后,最終用以下方法合并Reppre_ifft和Degrid任務(wù)。
sc.parallelize(initset).flatMap(ix=>reppre_ifft_degrid_kernel(ix, broads_input_telescope_data, broadcast_lsm))
這里initset是六元組(beam, major_loop, frequency, time, facet, polarisation)。
為了避免RDD Pharotpre_dft_sumvis的一個(gè)
圖3 時(shí)間槽階段任務(wù)依賴關(guān)系圖Fig.3 The dependency diagram in Timeslots phase
圖4 grikerupd_rep階段任務(wù)依賴關(guān)系圖Fig.4 The dependency diagram in grikerupd_rep phase
平方千米陣列管線的每個(gè)任務(wù)輸入輸出大小以及計(jì)算量都是確定的。能夠準(zhǔn)確計(jì)算出每個(gè)任務(wù)具體的輸入輸出數(shù)據(jù)、中間數(shù)據(jù)的大小, 以及每個(gè)任務(wù)的計(jì)算量,對(duì)于分布式計(jì)算的任務(wù)調(diào)度具有指導(dǎo)意義。管線數(shù)據(jù)建模的依據(jù)是對(duì)每一個(gè)管線中的邏輯任務(wù),通過分析所采用算法的復(fù)雜度,并且計(jì)算輸入輸出的大小。
測(cè)試中采用3套不同配置的集群作為測(cè)試環(huán)境,集群1包括1個(gè)節(jié)點(diǎn),該節(jié)點(diǎn)配備1.5 TB內(nèi)存,中央處理器為80核,每個(gè)核主頻為2.2 GHz。集群2包括4個(gè)節(jié)點(diǎn),每個(gè)節(jié)點(diǎn)配備64 GB內(nèi)存,中央處理器為8核,每個(gè)核主頻為1.8 GHz。軟件系統(tǒng)中Spark的版本為2.0,JDK版本為1.8。
本文分別對(duì)單機(jī)串行程序和基于Spark的程序進(jìn)行了測(cè)試,串行程序完全在集群1的高配環(huán)境中完成,最終的執(zhí)行時(shí)間見圖5。
圖5 性能對(duì)比示意圖,在3個(gè)進(jìn)程數(shù)下執(zhí)行的時(shí)間對(duì)比Fig.5 The contrast diagram of execution time elapse with 3 different process number
Spark是當(dāng)前工業(yè)界最流行的分布式執(zhí)行框架之一。實(shí)驗(yàn)表明,基于Spark構(gòu)建平方千米陣列的分布式執(zhí)行框架是有可能的,但Spark存在一些很實(shí)際的困難,具體說明如下:
(1)Spark考慮更多的是數(shù)據(jù)密集型,它的任務(wù)調(diào)度和資源管理目前只支持中央處理器,需要更改其任務(wù)調(diào)度和資源管理代碼來支持混合計(jì)算任務(wù)的調(diào)度。研究發(fā)現(xiàn),Spark性能瓶頸在于數(shù)據(jù)的連接操作,Spark的 “cogroup” 需要對(duì)幾個(gè)大的數(shù)據(jù)集進(jìn)行排序操作,產(chǎn)生大量的節(jié)點(diǎn)通訊。另外,彈性分布式數(shù)據(jù)集鏈過長(zhǎng),內(nèi)存不能及時(shí)釋放,Spark內(nèi)存不足時(shí),數(shù)據(jù)需要寫到磁盤,序列化與反序列化耗費(fèi)大量時(shí)間。Spark cogroup和groupByKeys需要排序操作,耗費(fèi)大量?jī)?nèi)存。從這方面看,Spark要滿足平方千米陣列的建設(shè)要求存在較多待改進(jìn)的地方。未來可以在如下幾方面繼續(xù)進(jìn)行性能優(yōu)化:
1)增大內(nèi)存。Spark內(nèi)存不足時(shí),數(shù)據(jù)需要寫到磁盤,序列化與反序列化耗費(fèi)大量時(shí)間。Spark的cogroup操作需要內(nèi)存排序,耗費(fèi)內(nèi)存資源;
2)利用分布式緩存系統(tǒng)替換cogroup操作;
3)利用分布式緩存系統(tǒng)存儲(chǔ)部分彈性分布式數(shù)據(jù)集的內(nèi)容,打破長(zhǎng)彈性分布式數(shù)據(jù)集鏈,及時(shí)釋放內(nèi)存資源;
4)利用Spark的Partitioning操作代替cogroup, Partitioning可以做到數(shù)據(jù)重組。同時(shí)設(shè)計(jì)分區(qū)函數(shù),較少連接操作。根據(jù)射電數(shù)據(jù)處理的特點(diǎn),在預(yù)測(cè)階段,設(shè)計(jì)根據(jù)頻率的分區(qū)函數(shù),需要將同一頻段的可見度數(shù)據(jù)聚集在一起,按照頻段進(jìn)行分區(qū),可以避免連接操作。在去卷積階段,設(shè)計(jì)根據(jù)分片的分區(qū)函數(shù),避免連接操作。
(2) Spark的數(shù)據(jù)劃分性能存在嚴(yán)重缺陷,數(shù)據(jù)劃分是MapReduce框架中的一個(gè)特定的phase(分階段),介于Map phase和Reduce phase之間,當(dāng)映射的輸出結(jié)果要被規(guī)約使用時(shí),輸出結(jié)果需要按key哈希,并且分發(fā)到每一個(gè)Reducer,這個(gè)過程就是數(shù)據(jù)劃分。由于數(shù)據(jù)劃分涉及磁盤的讀寫和網(wǎng)絡(luò)的傳輸,因此,數(shù)據(jù)劃分性能的高低直接影響整個(gè)程序的運(yùn)行效率。針對(duì)天文海量數(shù)據(jù)處理的要求,Spark的數(shù)據(jù)劃分的性能顯然滿足不了要求。擬采用內(nèi)存數(shù)據(jù)庫(kù)代替數(shù)據(jù)劃分的相關(guān)操作,可以進(jìn)一步提高Spark的并行計(jì)算性能。這是后續(xù)研究的重點(diǎn)。