魏占辰,劉曉宇 ,黃秋蘭,孫功星
1.中國科學(xué)院 高能物理研究所,北京 100049
2.中國科學(xué)院大學(xué),北京 100049
隨著信息技術(shù)的快速發(fā)展,數(shù)據(jù)量呈現(xiàn)爆炸式的增長,海量數(shù)據(jù)給存儲和處理帶來了極大的挑戰(zhàn),越來越多的應(yīng)用朝著分布式系統(tǒng)的方向發(fā)展,因此以Hadoop[1]、Spark[2]為代表的大數(shù)據(jù)處理框架應(yīng)運而生。Spark具有良好的數(shù)據(jù)處理效率,保證了系統(tǒng)穩(wěn)定性、可擴展性和可用性,還提供交互式編程接口,非常適合迭代式、交互式或?qū)崟r數(shù)據(jù)分析的場景,具有廣泛的適用性,能夠適應(yīng)各類領(lǐng)域的大數(shù)據(jù)應(yīng)用。
雖然Spark等分布式系統(tǒng)能夠極大提升數(shù)據(jù)處理效率和系統(tǒng)吞吐量,但不可避免地帶來額外的性能損耗,這些損耗包括分布式任務(wù)派發(fā)、調(diào)度、結(jié)果收集,以及算法自身在分布式場景的開銷。如果不能有效地降低額外損耗,它們可能成為分布式計算中影響性能提升的瓶頸。在部分迭代密集型計算的場景中,例如高能物理數(shù)據(jù)分析中的分波分析(Partial Wave Analysis,PWA)計算,由于每輪迭代過程只有極少量參數(shù)發(fā)生變化,迭代次數(shù)多,迭代任務(wù)短,因此在現(xiàn)有的Spark 機制下,框架帶來的額外損耗成為了計算過程中的主要性能瓶頸點。目前學(xué)術(shù)界對于Spark 優(yōu)化方法的研究,主要集中在開發(fā)原則優(yōu)化、內(nèi)存優(yōu)化、配置參數(shù)優(yōu)化、調(diào)度優(yōu)化和Shuffle 過程優(yōu)化這5 個方面[3],具體研究內(nèi)容有數(shù)據(jù)填充和拉取策略研究[4],內(nèi)存數(shù)據(jù)緩存和替換策略的研究[5],優(yōu)化數(shù)據(jù)存儲機制的研究[6],以及Shuffle過程中的數(shù)據(jù)讀寫過程優(yōu)化[7]、文件寫入機制優(yōu)化[8]、調(diào)度算法優(yōu)化[9]、數(shù)據(jù)壓縮算法決策[10]等研究。對于迭代計算的研究,則主要有為增量型數(shù)據(jù)設(shè)計的增量式迭代計算模型[11],以及利用數(shù)據(jù)的分散性和局部性進(jìn)行分級、分區(qū)域的迭代計算方法[12]等。文獻(xiàn)[13]提出了一個Spark 性能預(yù)測模型,文獻(xiàn)[14]分析了CPU、磁盤和網(wǎng)絡(luò)等計算資源對Spark 性能的影響,但是它們均未對執(zhí)行效率及Spark框架自身進(jìn)行深入分析。上述文獻(xiàn)對于迭代密集型應(yīng)用沒有給出一個有效的優(yōu)化策略和解決方案。
在系統(tǒng)性分析Spark 的核心機制后,本文歸納并總結(jié)了Spark 應(yīng)用在執(zhí)行過程中的額外消耗,據(jù)此提出一種分析Spark 執(zhí)行效率的公式。通過該公式可以分析Spark 應(yīng)用的性能瓶頸點,并針對迭代密集型應(yīng)用提出相應(yīng)優(yōu)化策略。經(jīng)過測試,該公式可以準(zhǔn)確分析性能瓶頸,優(yōu)化策略能夠極大提高計算效率。
在Spark中,引入了一個可并行化處理的、帶有容錯性的新型分布式數(shù)據(jù)模型——彈性分布式數(shù)據(jù)集[15](Resilient Distributed Dataset,RDD),用戶所提交的程序或啟動的交互式Shell稱為一個Application,包含若干RDD運算。RDD通過劃分分區(qū)(Partition)進(jìn)行數(shù)據(jù)分片和計算并行化,通過兩類算子Transformation 和Action完成復(fù)雜運算。每個Action 算子會觸發(fā)一個真實的運算過程,稱為Job。每個Transformation算子會產(chǎn)生一個新的RDD,據(jù)此建立與父RDD的依賴關(guān)系。RDD的依賴關(guān)系有兩種,分別是依賴固定父RDD 分區(qū)數(shù)量的窄依賴關(guān)系和依賴全部父RDD 分區(qū)并進(jìn)行數(shù)據(jù)混洗(Shuffle)的寬依賴關(guān)系。由于Shuffle 的后續(xù)數(shù)據(jù)處理過程必須等待Shuffle完成后才能計算,因此Spark在寬依賴關(guān)系處進(jìn)行切分,劃分出不同的Stage,以Shuffle連接,形成有向無環(huán)圖(Direct Acyclic Graph,DAG),圖1為一個典型的Spark 單個Job 的計算流程圖,即以RDD的依賴關(guān)系形成的有向無環(huán)圖。
圖1 Spark計算流程圖
在Stage 內(nèi)部全部為窄依賴關(guān)系,因此每個分區(qū)的數(shù)據(jù)可以獨立計算,Spark將按照RDD的依賴順序形成一條計算流水線,稱為Task;在Shuffle 處,Spark 將前一個Stage 產(chǎn)生的數(shù)據(jù)排序并序列化寫入磁盤,交由后續(xù)Stage進(jìn)行處理,Shuffle過程是Spark內(nèi)最容易影響性能的瓶頸點。
在一個Application 內(nèi),執(zhí)行用戶數(shù)據(jù)處理邏輯、分析RDD依賴關(guān)系、生成并派發(fā)Task的組件稱為Driver,存儲數(shù)據(jù)、接收并執(zhí)行Task的組件稱為Executor。用戶在Driver 中提交的Job 均會由DAGScheduler 分析RDD依賴關(guān)系,并在各Stage 內(nèi)生成一組TaskSet,交由Task-Scheduler 調(diào)度并序列化為二進(jìn)制數(shù)據(jù),由Scheduler-Backend分發(fā)到Executor執(zhí)行,每組Task的數(shù)量與RDD的分區(qū)數(shù)量一致,代表了計算的并行度,圖2是Spark任務(wù)調(diào)度運行的流程圖。
廣播變量(Broadcast)用于Application 內(nèi)的數(shù)據(jù)共享,它在每個節(jié)點內(nèi)只保存一份,因此廣播變量產(chǎn)生的數(shù)據(jù)副本的數(shù)量與節(jié)點數(shù)一致。累加器(Accumulator)是一種分布式變量,它在Task中進(jìn)行數(shù)值更改,最后在Driver中聚合這些修改,因此它的副本數(shù)量與Task數(shù)量一致。
根據(jù)2.1節(jié)的分析,可知Spark應(yīng)用在運行時不可避免地會產(chǎn)生額外的消耗,包括分布式系統(tǒng)自身的消耗和實現(xiàn)分布式算法所引入的額外消耗。因此,需要建立一個用于分析分布式計算額外消耗和執(zhí)行效率的模型,為有針對性地優(yōu)化Spark分布式程序提供相關(guān)理論基礎(chǔ)。
圖2 Spark任務(wù)調(diào)度運行流程圖
定義1(有效計算時間)一個算法或任務(wù)為得到相應(yīng)結(jié)果而進(jìn)行的必要的數(shù)據(jù)計算、處理和分析所用的時間。有效計算時間是衡量一個算法的性能和復(fù)雜度的重要指標(biāo)。
定義2(分布式并行計算代價)分布式并行計算代價為Spark在執(zhí)行某一任務(wù)、處理某一數(shù)據(jù)時,因框架自身和算法所需而產(chǎn)生的額外消耗。由于這些代價需要額外CPU 時間處理,因此本文以處理分布式并行計算代價的時間(即計算代價時間)作為其評估指標(biāo)。
定義3(有效計算比)一個算法執(zhí)行的總時間包含了有效計算時間和計算代價時間,有效計算時間與總時間的比值即為有效計算比。有效計算比越大,表明該數(shù)據(jù)處理過程的效率越高,越容易達(dá)到理想的并行加速比。
下面根據(jù)Spark 應(yīng)用執(zhí)行流程確定其有效計算時間、計算代價時間和有效計算比。根據(jù)定義1、定義2和定義3,在某個Job中,假設(shè)有m個Stage,則有(m-1)個Shuffle,則其組成可定義為Job={stage1,stage2,…,stagem}∪{shuffle1,shuffle2,…,shufflem-1} ,使用了v個Executor。由于每個Stage會產(chǎn)生與輸入RDD的分區(qū)數(shù)量一致的Task,若stagej輸入RDD的分區(qū)數(shù)為nj,則其任務(wù)集為Taskj={task1,j,task2,j,…,tasknj,j}。對于taski,j,設(shè)其序列化時間為di,j,傳輸?shù)絜xecutor 的時間為ei,j,反序列化的時間為fi,j,由于Spark 的任務(wù)序列化和發(fā)送工作在Driver 節(jié)點串行執(zhí)行,由此可以得到Taskj的任務(wù)準(zhǔn)備時間Dj為:
在原有串行算法改為并行后,需要有額外的初始數(shù)據(jù)分配、額外的結(jié)果收集以及輔助算子才能完成數(shù)據(jù)處理過程。設(shè)初始數(shù)據(jù)分配時間為M,結(jié)果數(shù)據(jù)集為Result={result1,result2,…,resultnm} ,resulti的序列化時間、傳輸時間和反序列化時間分別為gi、hi、li,由于Driver接收數(shù)據(jù)及反序列化過程為串行,因此可以得到結(jié)果收集的時間R為:
使用輔助運算功能(如Accumulator、Broadcast)的計算代價與實際使用情況密切相關(guān),并且具有一定的節(jié)點內(nèi)共享特性,因此要根據(jù)實際情況測算算法帶來的計算代價E。若一個 Job 中,Accumulator 和 Broadcast 均為1 個,且它們的平均傳輸時間分別為a、b,并行度為u,executor數(shù)量為v時,可以得到一個關(guān)于E的計算公式為:
除此之外,由于Spark 在Stage 之間要產(chǎn)生Shuffle,設(shè)shufflej的計算代價為sj,由此可以得到一個Job中總的計算代價C為:
若taski,j的有效計算時間為ti,j,不難得出對于任務(wù)集Taskj的有效計算時間Tj和整個Job 的有效計算時間V為:
由公式(4)、(6)可以得出一個Job 的有效計算比K為:
根據(jù)公式(1)~(7),可以根據(jù)實際情況推導(dǎo)出整個Spark 應(yīng)用的有效計算時間、計算代價時間和有效計算比,因此本文不再贅述。
由此可以看出,計算代價受到框架自身、分布式并行算法以及原始數(shù)據(jù)分布等多方面的影響。在大多數(shù)Spark大數(shù)據(jù)應(yīng)用中,由于原始數(shù)據(jù)難以預(yù)測,因此很容易造成數(shù)據(jù)傾斜,將會造成大量的Shuffle。由公式(3)、(7)可知,巨大的Shuffle 過程會極大提升計算代價,嚴(yán)重拖慢Spark應(yīng)用的性能,降低整個應(yīng)用的有效計算比。
在Spark 效率分析公式的基礎(chǔ)上,可以根據(jù)實際情況進(jìn)行有針對性的優(yōu)化,從而提高數(shù)據(jù)分析效率。在高能物理中使用的分波分析方法是一類Spark迭代密集型應(yīng)用,下面以此為例具體介紹優(yōu)化過程。
分波分析是一種觀察高能物理實驗中產(chǎn)生的輕強子之間的共振態(tài)結(jié)構(gòu)的數(shù)據(jù)分析方法,它能夠精確測量共振態(tài)參數(shù)以及其產(chǎn)生衰變的性質(zhì)[16]。分波分析需要在高統(tǒng)計量樣本數(shù)據(jù)上進(jìn)行數(shù)值擬合的計算,其核心過程為使用最大似然法估計待定參數(shù),需要反復(fù)迭代以求得最優(yōu)參數(shù),因此分波分析是一類典型的大數(shù)據(jù)科學(xué)計算。
由于樣本數(shù)據(jù)的計算過程是獨立的,因此可以很容易地將該過程并行化,將數(shù)據(jù)劃分成若干區(qū)塊放入不同的RDD分區(qū)中,在各部分計算完成后由Spark框架匯總結(jié)果,并決定是否進(jìn)行下一輪迭代計算。
由Spark 分波分析計算的執(zhí)行流程可知,該類計算在Spark中每進(jìn)行一次迭代就會產(chǎn)生一個Job,每次迭代的任務(wù)完全一致,只有若干參數(shù)進(jìn)行更新,沒有Shuffle過程,并且每一個Job僅包含一個Stage。由于輸入數(shù)據(jù)不發(fā)生變化,因此除第一次計算外,其他迭代計算過程均沒有數(shù)據(jù)的初始分配時間。若總迭代次數(shù)為p,忽略數(shù)據(jù)分配時間,則可得到第i次迭代的計算代價時間ci以及總計算代價C為:
有效計算時間和有效計算比的計算方法與公式(6)和公式(7)一致,由此可以得到,計算代價時間與迭代次數(shù)正相關(guān),降低迭代過程中每一輪的計算代價以及消除計算代價與迭代次數(shù)的相關(guān)性是優(yōu)化該類問題的關(guān)鍵。
由于每次迭代計算過程不變,因此可以采用將多個迭代計算的Job 化簡為一個Job,即將Task 只分發(fā)一次的策略,消除冗余Task分發(fā),從而達(dá)到將計算代價與迭代次數(shù)的相關(guān)性消除的目的,為計算過程帶來極大的性能提升。
為實現(xiàn)上述策略,本文基于Spark 現(xiàn)有運行機制設(shè)計并實現(xiàn)了一個迭代控制服務(wù)模塊(Iteration Control Service,ICS),將參數(shù)更新分發(fā)、迭代流程控制以及結(jié)果收集過程交由該模塊負(fù)責(zé),從而與Spark 原有任務(wù)和數(shù)據(jù)分發(fā)機制進(jìn)行一定分離,達(dá)到降低計算代價的目的。ICS由Master和Worker兩部分組成,其中Master為迭代主控制模塊,負(fù)責(zé)控制Worker 計算、分發(fā)迭代參數(shù)、收集結(jié)果;Worker為迭代計算模塊,執(zhí)行具體計算任務(wù),并緩存每輪迭代計算結(jié)果以確保數(shù)據(jù)同步和任務(wù)容錯性。由此可以很容易得出基于ICS 機制優(yōu)化過后的分波分析總計算代價C′為:
由公式(10)可以看出,使用ICS消除了任務(wù)分發(fā)的冗余,使計算代價僅與參數(shù)更新和結(jié)果收集相關(guān),而ICS模塊將允許使用者將這兩部分自行控制,根據(jù)數(shù)據(jù)分析的實際情況,進(jìn)行細(xì)粒度的優(yōu)化與控制。
為評估本文提出的Spark迭代計算優(yōu)化方法的實際效果,本文設(shè)計了一組實驗來進(jìn)行性能對比。
本文的實驗環(huán)境是一個5 節(jié)點組成的集群,使用Spark Standalone模式進(jìn)行任務(wù)調(diào)度。除Spark外,還部署了Hadoop 和Alluxio 以完成分波分析的計算工作。Standalone Master、NameNode 和 AlluxioMaster 部署于主節(jié)點中,Standalone Worker、DataNode和AlluxioWorker部署于4個從節(jié)點中,實驗環(huán)境的詳細(xì)情況如表1所示。
表1 測試集群軟硬件環(huán)境
為分析RDD 迭代分波分析計算的計算代價情況,實驗選擇了約6 GB 樣本數(shù)據(jù)作為目標(biāo)分析數(shù)據(jù),完成該樣本的分析需要迭代237次,實驗過程使用全部資源進(jìn)行計算,即并行度為48。
由于分波分析過程的迭代次數(shù)與實際數(shù)據(jù)的擬合過程相關(guān),無法對其進(jìn)行精確控制,因此為測試不同迭代次數(shù)情況下的Spark 分布式并行分波分析的運行性能,在現(xiàn)有基礎(chǔ)上將迭代次數(shù)修改為直接由用戶指定,以此仿真實際情況中不同迭代次數(shù)的運算過程。
本文首先測試了選取的分波分析樣例在優(yōu)化前后的性能對比以及與串行程序的對比,評估指標(biāo)選取了執(zhí)行時間和有效計算比。執(zhí)行時間越短、有效計算比越高表明計算任務(wù)的性能和效率越好。測試結(jié)果如表2所示。
表2 分波分析測試結(jié)果對比
從測試結(jié)果中可看出,優(yōu)化前的Spark 分波分析程序相比于原有的串行程序,執(zhí)行時間縮短了約80.2%;優(yōu)化后的Spark 分波分析程序相比于串行程序,執(zhí)行時間縮短了約93.7%;與優(yōu)化前相比,執(zhí)行時間縮短了約68.2%,單次迭代的計算代價減小了約80.5%,有效計算比提升了約0.373。由上述數(shù)據(jù)可以看出,Spark效率分析公式和依據(jù)該公式制定的優(yōu)化策略能夠有效提升高能物理分波分析程序的性能和執(zhí)行效率。
為測試本文提出的優(yōu)化策略在不同迭代次數(shù)和不同子任務(wù)長度下對于執(zhí)行效率和有效計算比的提升情況,在仿真條件下分別測試了程序的有效計算比和執(zhí)行性能,其中執(zhí)行性能以執(zhí)行時間評估。本文設(shè)計了三組仿真作業(yè),所有仿真作業(yè)均人為指定迭代次數(shù),其中仿真作業(yè)1的迭代子任務(wù)與真實計算過程一致,仿真作業(yè)2和仿真作業(yè)3的迭代子任務(wù)會將計算過程反復(fù)執(zhí)行若干次,從而延長有效計算時間,經(jīng)測算仿真作業(yè)2的單次有效計算時間約為400 ms,仿真作業(yè)3約為1 000 ms。圖3和圖4展示了仿真作業(yè)1在迭代次數(shù)為100、200、400、800、1 600、3 200 和 6 400 時的有效計算比和執(zhí)行時間,為便于圖形展示,執(zhí)行時間取對數(shù)后作為圖4的縱坐標(biāo)。
圖3 優(yōu)化前后仿真作業(yè)1有效計算比對比圖
圖4 優(yōu)化前后仿真作業(yè)1執(zhí)行時間對比圖
從圖3 中可以看出,優(yōu)化前的仿真作業(yè)1 有效計算比基本保持不變;而優(yōu)化之后由于減少了任務(wù)的分發(fā)次數(shù),提高了參數(shù)分發(fā)和結(jié)果收集過程的效率,使得有效計算比得到約0.066~0.520的提升。從圖4中可以看出,優(yōu)化前的仿真作業(yè)1與串行版本相比,執(zhí)行時間縮短了約83.4%~90.6%,而優(yōu)化后與串行相比縮短了約91.4%~97.2%,與優(yōu)化前相比縮短了約48.3%~69.4%。
圖5、圖6 和圖7 展示了仿真作業(yè)2 和仿真作業(yè)3 在迭代次數(shù)為100、200、400、800和1 600時有效計算比和執(zhí)行時間的對比情況,為便于圖形展示,執(zhí)行時間取對數(shù)后作為圖6、圖7的縱坐標(biāo)。
圖5 優(yōu)化前后仿真作業(yè)2、3有效計算比對比圖
圖6 優(yōu)化前后仿真作業(yè)2執(zhí)行時間對比圖
圖7 優(yōu)化前后仿真作業(yè)3執(zhí)行時間對比圖
由上述測試可以看出在單次迭代的有效計算時間較長時,本文提出的優(yōu)化策略也能夠在一定程度上提高迭代計算的執(zhí)行效率和有效計算比。在有效計算比方面,優(yōu)化后的仿真作業(yè)2 提升了約0.038~0.073,優(yōu)化后的仿真作業(yè)3提升了約0.027~0.067。在執(zhí)行時間方面,優(yōu)化前的仿真作業(yè)2相比于串行程序縮短了約96%,優(yōu)化后與優(yōu)化前相比縮短了約16.2%~20.6%,與串行程序相比縮短了約97%。優(yōu)化前的仿真作業(yè)3 相比于串行程序,執(zhí)行時間縮短了約96.5%,優(yōu)化后相比于串行程序,執(zhí)行時間縮短了約97%,相比于優(yōu)化前,執(zhí)行時間縮短了約6.6%~12.9%。
Spark是當(dāng)前應(yīng)用最廣泛的并行計算框架和模型之一,本文主要致力于Spark 運行迭代密集型應(yīng)用的性能優(yōu)化研究。通過深入研究Spark 分布式任務(wù)執(zhí)行流程,提出有效計算時間、計算代價和有效計算比等概念,以此構(gòu)建Spark 效率分析公式,為精確分析Spark 應(yīng)用的效率提供理論支持。在此基礎(chǔ)上,本文還提出了一個針對迭代密集型應(yīng)用(例如高能物理中的分波分析方法)的優(yōu)化策略,將Spark多次任務(wù)分發(fā)過程簡化為一次,并優(yōu)化了參數(shù)更新和結(jié)果收集等過程,從而減少了計算代價,提升了有效計算比,使執(zhí)行效率得到大幅提升,在實際應(yīng)用中取得了較好的優(yōu)化效果。本文在實現(xiàn)迭代密集型應(yīng)用的優(yōu)化策略時設(shè)計了迭代控制服務(wù)模塊,該模塊擴展了標(biāo)準(zhǔn)Spark 的功能,保證了良好的編程接口和可擴展性,能夠為類似應(yīng)用提供參考。
Spark效率分析公式能夠為開發(fā)者合理安排并行算法并行度和有針對性優(yōu)化分布式并行應(yīng)用程序提供理論依據(jù)和參考。在未來的工作中,將進(jìn)一步研究Spark的性能瓶頸點,繼續(xù)完善和挖掘Spark的性能優(yōu)化方法,提升整個系統(tǒng)的效率和吞吐率,擴大Spark在多個領(lǐng)域,特別是科學(xué)計算領(lǐng)域的應(yīng)用。