卞 琛,修位蓉,于 炯
(1廣東金融學(xué)院互聯(lián)網(wǎng)金融與信息工程學(xué)院,廣東 廣州 510521;2.廣州商學(xué)院信息技術(shù)與工程學(xué)院,廣東 廣州 511363;3新疆大學(xué)信息科學(xué)與工程學(xué)院,新疆 烏魯木齊 830046)
隨著非易失內(nèi)存的發(fā)展和高復(fù)雜度計(jì)算需求的提升,能夠較好地支持迭代計(jì)算和數(shù)據(jù)密集型應(yīng)用的內(nèi)存計(jì)算框架[1-3]已經(jīng)得到了廣泛的應(yīng)用。分布式內(nèi)存計(jì)算框架Spark[4,5]作為其中的典型代表,利用分布式集群的節(jié)點(diǎn),在內(nèi)存對(duì)數(shù)據(jù)進(jìn)行高效處理和臨時(shí)緩存,具有高性能、高可靠和低延遲的特性。傳統(tǒng)基于磁盤(pán)的分布式計(jì)算框架MapReduce[6]將應(yīng)用簡(jiǎn)單劃分為多個(gè)Map和Reduce操作,磁盤(pán)I/O的開(kāi)銷(xiāo)很大。與之不同的是,Spark將作業(yè)基于操作間的前后關(guān)系轉(zhuǎn)化成DAG圖,并根據(jù)處理的操作類(lèi)型不同(寬依賴(lài)操作或窄依賴(lài)操作)劃分成多個(gè)計(jì)算階段(stage),各階段串行處理,階段內(nèi)部由多個(gè)任務(wù)(task)并行處理。作業(yè)執(zhí)行時(shí)間取決于所有階段執(zhí)行時(shí)間之和,若能夠盡可能減少每個(gè)階段的執(zhí)行時(shí)間,則最終整個(gè)作業(yè)的執(zhí)行時(shí)間就能夠得到優(yōu)化。由于每個(gè)階段內(nèi)的任務(wù)需要同步,完成時(shí)長(zhǎng)取決于執(zhí)行時(shí)間最長(zhǎng)的任務(wù),讓每個(gè)任務(wù)都盡快完成才成達(dá)到優(yōu)化目標(biāo),因此并發(fā)任務(wù)數(shù)(并行度)和計(jì)算量分配是影響任務(wù)執(zhí)行速度的關(guān)鍵因素。
Spark框架的任務(wù)并行度是分布式內(nèi)存計(jì)算環(huán)境中各階段內(nèi)部任務(wù)執(zhí)行的并發(fā)程度。任務(wù)并行度包含物理并行度和邏輯并行度:物理并行度是集群資源限定下能夠承載的最大并發(fā)任務(wù)個(gè)數(shù),由注冊(cè)的工作節(jié)點(diǎn)數(shù)及每個(gè)節(jié)點(diǎn)貢獻(xiàn)的CPU核心數(shù)確定;而邏輯并行度為用戶(hù)/程序所預(yù)期的任務(wù)并發(fā)程度,由Spark的執(zhí)行參數(shù)確定。理想情況下,物理并行度和邏輯并行度一致時(shí)能夠獲得最佳效率,而由于存儲(chǔ)容量限制,數(shù)據(jù)量較大或分布不均衡可能會(huì)導(dǎo)致內(nèi)存空間不足或內(nèi)存溢出,反而影響執(zhí)行效率。同時(shí),邏輯并行度設(shè)置過(guò)大時(shí),也同樣會(huì)帶來(lái)調(diào)度和任務(wù)切換的額外開(kāi)銷(xiāo)。Spark的任務(wù)并行度通常在系統(tǒng)配置參數(shù)default.parallelism中設(shè)定,往往由程序員依靠經(jīng)驗(yàn)預(yù)先設(shè)定,較難契合各類(lèi)型的作業(yè)和集群環(huán)境,增加了溢寫(xiě)、容錯(cuò)甚至是程序異常的風(fēng)險(xiǎn)。同時(shí),并行度在整個(gè)作業(yè)所有階段的執(zhí)行過(guò)程中恒定不變,與作業(yè)類(lèi)型、數(shù)據(jù)特性和節(jié)點(diǎn)能力無(wú)關(guān),每個(gè)階段的數(shù)據(jù)分布和數(shù)據(jù)大小都不一樣,不能很好地適應(yīng)作業(yè)特點(diǎn),提高并發(fā)執(zhí)行效率。
在不改變分區(qū)規(guī)則的情況下,由原始數(shù)據(jù)分布和并行度參數(shù)共同決定計(jì)算數(shù)據(jù)量的分配,對(duì)作業(yè)執(zhí)行效率和集群資源利用率均產(chǎn)生重要影響,在異構(gòu)Spark集群中的表現(xiàn)尤為突出。因此,本文針對(duì)異構(gòu)Spark集群節(jié)點(diǎn)性能與數(shù)據(jù)分配的適配性問(wèn)題,研究數(shù)據(jù)分布和并行度設(shè)置對(duì)作業(yè)執(zhí)行效率的影響,建立節(jié)點(diǎn)資源模型、數(shù)據(jù)分配模型和任務(wù)執(zhí)行模型,分析數(shù)據(jù)分布、并行度參數(shù)和節(jié)點(diǎn)任務(wù)分配的耦合關(guān)系,異構(gòu)Spark集群的數(shù)據(jù)傾斜修正調(diào)度策略DSCS(Data Skew Correction Scheduling strategy),包括并行度預(yù)估算法、數(shù)據(jù)傾斜修正和異構(gòu)節(jié)點(diǎn)任務(wù)分配算法。并行度預(yù)估算法基于統(tǒng)計(jì)的輸入數(shù)據(jù)總量、可用節(jié)點(diǎn)數(shù)量、可用核心數(shù)量和可用內(nèi)存數(shù)量,對(duì)并行度和輪數(shù)進(jìn)行預(yù)估;數(shù)據(jù)傾斜修正算法對(duì)輸入數(shù)據(jù)分布傾斜度較高的階段進(jìn)行處理,盡可能均衡不同數(shù)據(jù)桶(Bucket)中的分配量,當(dāng)部分鍵值(key)數(shù)據(jù)量明顯大于其他鍵值時(shí),對(duì)該健值的數(shù)據(jù)進(jìn)行拆分并增加并行度;異構(gòu)節(jié)點(diǎn)任務(wù)分配算法分別根據(jù)節(jié)點(diǎn)計(jì)算能力和內(nèi)存空間大小倒序生成2個(gè)隊(duì)列,優(yōu)先分配更多的任務(wù)到計(jì)算能力更強(qiáng)的節(jié)點(diǎn)和內(nèi)存可用空間更大的節(jié)點(diǎn),從而使異構(gòu)Spark集群能夠通過(guò)并行調(diào)度策略,選擇具有更小內(nèi)存溢出量和更小任務(wù)調(diào)度開(kāi)銷(xiāo)的并行計(jì)算方案。
在內(nèi)存計(jì)算框架中,任務(wù)并行度設(shè)置的合理性將直接影響到作業(yè)執(zhí)行效率和集群資源利用率。針對(duì)內(nèi)存計(jì)算框架的任務(wù)并行調(diào)度效率問(wèn)題,研究者從不同角度提出了優(yōu)化方案。文獻(xiàn)[7]針對(duì)嵌套型列存儲(chǔ)Impala,利用多層查詢(xún)樹(shù)的結(jié)構(gòu)提高了查詢(xún)?nèi)蝿?wù)的并行度,有效地降低了系統(tǒng)廣播和查詢(xún)開(kāi)銷(xiāo)。文獻(xiàn)[8]分析發(fā)現(xiàn),在MapReduce環(huán)境中,并行度的設(shè)置會(huì)顯著影響緩存在作業(yè)執(zhí)行中的作用,合理的并行度能夠有效提高作業(yè)執(zhí)行效率,在此基礎(chǔ)上提出了基于并行度優(yōu)化的緩存設(shè)置算法。文獻(xiàn)[9]研究表明,在Spark環(huán)境中,各計(jì)算階段的任務(wù)并行度設(shè)置不合理會(huì)降低作業(yè)執(zhí)行性能,增加作業(yè)的計(jì)算開(kāi)銷(xiāo),在此基礎(chǔ)上提出通過(guò)調(diào)整并行度來(lái)提高內(nèi)存資源利用率的任務(wù)調(diào)度算法。文獻(xiàn)[10]在MapReduce環(huán)境中,為了合理地設(shè)置節(jié)點(diǎn)任務(wù)并行度,有效地均衡節(jié)點(diǎn)通信成本,提高作業(yè)執(zhí)行效率,設(shè)置了基于通信成本模型的任務(wù)調(diào)度算法。文獻(xiàn)[11]在并行度設(shè)置的基礎(chǔ)上,提出了任務(wù)本地化調(diào)度算法,從而優(yōu)化任務(wù)執(zhí)行機(jī)制,提高任務(wù)本地性,降低網(wǎng)絡(luò)開(kāi)銷(xiāo)。文獻(xiàn)[12]基于并行度分布的情況,提出了動(dòng)態(tài)任務(wù)調(diào)度策略,利用任務(wù)的數(shù)據(jù)分布情況,結(jié)合節(jié)點(diǎn)的計(jì)算能力為任務(wù)分配相應(yīng)的系統(tǒng)資源。文獻(xiàn)[13,14]對(duì)Spark環(huán)境中的具體參數(shù)進(jìn)行建模和預(yù)測(cè),實(shí)現(xiàn)對(duì)包含并行度等多個(gè)作業(yè)參數(shù)的優(yōu)化。
針對(duì)任務(wù)計(jì)算量分配問(wèn)題,也涌現(xiàn)出大量的研究成果。文獻(xiàn)[15]提出了SkewReduce,通過(guò)建立代價(jià)模型評(píng)估分區(qū)容量,在作業(yè)執(zhí)行過(guò)程中進(jìn)行元數(shù)據(jù)的逐步收集,達(dá)到契機(jī)時(shí)執(zhí)行分區(qū)優(yōu)化函數(shù)并實(shí)施新的分區(qū)方案。文獻(xiàn)[16]提出了SkewTune策略,該策略并不期望在執(zhí)行計(jì)劃階段就建立均衡的分區(qū),而是建立了Reduce端的任務(wù)剩余代價(jià)評(píng)估模型,任何Reduce端的任務(wù)完成后,都將觸發(fā)其他未完成任務(wù)的剩余代價(jià)評(píng)估,并將未處理數(shù)據(jù)向已完成任務(wù)的工作節(jié)點(diǎn)遷移,從而達(dá)到數(shù)據(jù)分配的整體均衡。文獻(xiàn)[17]提出了一種基于采樣的分區(qū)策略,該策略通過(guò)在Map端增加獨(dú)立的采樣進(jìn)程獲得近似數(shù)據(jù)分布,采樣達(dá)到閾值后對(duì)已生成的分區(qū)進(jìn)行拆分和重組,從而提高數(shù)據(jù)分配的均衡性。文獻(xiàn)[18,19]提出了精細(xì)分區(qū)和動(dòng)態(tài)拆分2種算法,首先通過(guò)精細(xì)分區(qū)算法生成固定數(shù)量的分區(qū),同時(shí)進(jìn)行采樣獲得近似數(shù)據(jù)分布,當(dāng)完成一定比例的Map任務(wù)后,觸發(fā)動(dòng)態(tài)拆分函數(shù),達(dá)到數(shù)據(jù)合理分配的目標(biāo)。文獻(xiàn)[20,21]提出了基于數(shù)據(jù)塊的采樣分區(qū)方法,該方法將原生的鍵值對(duì)轉(zhuǎn)換為〈blocking_key,entity〉形式,通過(guò)設(shè)計(jì)評(píng)估函數(shù)對(duì)塊內(nèi)數(shù)據(jù)進(jìn)行評(píng)估,對(duì)不符合條件的數(shù)據(jù)塊進(jìn)行調(diào)整。文獻(xiàn)[22]提出先對(duì)輸入數(shù)據(jù)進(jìn)行25%的隨機(jī)采樣,通過(guò)采樣結(jié)果獲得數(shù)據(jù)分布并制定分區(qū)函數(shù),然后啟動(dòng)任務(wù)填充數(shù)據(jù)。文獻(xiàn)[23]提出了LEEN策略,通過(guò)對(duì)輸入數(shù)據(jù)的預(yù)掃描獲取數(shù)據(jù)分布,在Map任務(wù)執(zhí)行過(guò)程中對(duì)數(shù)據(jù)的鍵值進(jìn)行頻率統(tǒng)計(jì),然后綜合數(shù)據(jù)分布和鍵值頻率統(tǒng)計(jì)設(shè)定合理的分區(qū)函數(shù)。文獻(xiàn)[24]提出了一種任務(wù)級(jí)別的啟發(fā)式調(diào)度策略,通過(guò)收集任務(wù)特性和資源需求,將任務(wù)發(fā)送到最適合的工作節(jié)點(diǎn),監(jiān)測(cè)慢任務(wù)并進(jìn)行重定向,但對(duì)任務(wù)劃分的合理性則沒(méi)有進(jìn)行評(píng)估和改進(jìn)。H-Scheduler[25]和Selecta[26]主要針對(duì)計(jì)算集群的存儲(chǔ)結(jié)構(gòu),評(píng)估HDD+SSD混合存儲(chǔ)結(jié)構(gòu)對(duì)作業(yè)執(zhí)行效率的影響。H-Scheduler利用存儲(chǔ)類(lèi)型和數(shù)據(jù)本地性對(duì)任務(wù)進(jìn)行分類(lèi),再根據(jù)分類(lèi)結(jié)果重新定義任務(wù)調(diào)度優(yōu)先級(jí)和工作節(jié)點(diǎn)映射。Selecta則分析影響作業(yè)效率的潛在因素,采用協(xié)同過(guò)濾算法對(duì)作業(yè)在不同配置環(huán)境中的性能表現(xiàn)進(jìn)行預(yù)測(cè),預(yù)測(cè)結(jié)果有94%的最佳性能配置命中率和80%的最優(yōu)成本命中率。
本文與現(xiàn)有研究成果的不同之處在于,發(fā)現(xiàn)并行度和計(jì)算量分配都是決定任務(wù)執(zhí)行時(shí)間的重要因素且兩者存在耦合,無(wú)法通過(guò)已知直接求得2個(gè)相互耦合未知數(shù)的最優(yōu)解,因此也并不期望在作業(yè)計(jì)劃階段就確定適合異構(gòu)Spark集群的最優(yōu)并行度和最佳數(shù)據(jù)分配,而是在多階段任務(wù)執(zhí)行過(guò)程中為每個(gè)階段制定相對(duì)合理的并行度,通過(guò)后期調(diào)整來(lái)優(yōu)化數(shù)據(jù)分配。首先通過(guò)建立計(jì)算節(jié)點(diǎn)資源模型、數(shù)據(jù)分配模型和任務(wù)執(zhí)行模型,確定并行度設(shè)置的約束,分析任務(wù)并行度和計(jì)算量分配的耦合關(guān)系,在此基礎(chǔ)上提出異構(gòu)Spark集群的數(shù)據(jù)傾斜修正調(diào)度策略DSCS,包括并行度預(yù)估、數(shù)據(jù)傾斜修正和異構(gòu)節(jié)點(diǎn)任務(wù)分配算法。并行度預(yù)估算法基于有限的數(shù)據(jù)集信息和Spark工作節(jié)點(diǎn)的資源狀況,預(yù)估并行度、分配輪數(shù)的相對(duì)合理值,實(shí)施作業(yè)的發(fā)送。在作業(yè)并行計(jì)算過(guò)程中,首個(gè)計(jì)算階段任務(wù)運(yùn)行完畢后進(jìn)行數(shù)據(jù)分布的采集和存儲(chǔ),根據(jù)傾斜情況對(duì)數(shù)據(jù)進(jìn)行切分,更新并行度參數(shù)。最后,在考慮節(jié)點(diǎn)異構(gòu)的情況下,根據(jù)節(jié)點(diǎn)計(jì)算能力的不同,將數(shù)據(jù)分配到不同節(jié)點(diǎn),從而有效均衡節(jié)點(diǎn)計(jì)算能力和計(jì)算數(shù)據(jù)量之間的關(guān)系。
本節(jié)主要分析Spark作業(yè)的并行執(zhí)行機(jī)制,進(jìn)行模型定義和相關(guān)定理證明,為第4節(jié)算法設(shè)計(jì)提供理論基礎(chǔ)。
定義1(節(jié)點(diǎn)計(jì)算能力) 定義由n個(gè)節(jié)點(diǎn)構(gòu)成的Spark集群w={w1,w2,…,wn},其中wx為第x個(gè)Spark計(jì)算節(jié)點(diǎn)(簡(jiǎn)稱(chēng)節(jié)點(diǎn)x)。內(nèi)存計(jì)算框架的節(jié)點(diǎn)計(jì)算資源主要包括CPU資源、內(nèi)存資源和網(wǎng)絡(luò)資源,通常同一個(gè)集群的網(wǎng)絡(luò)資源及傳輸能力相近,因此節(jié)點(diǎn)x的節(jié)點(diǎn)計(jì)算能力可定義為cx=(CPUx,Memx)。對(duì)節(jié)點(diǎn)計(jì)算能力進(jìn)行統(tǒng)計(jì)分析時(shí),計(jì)算能力主要反映單位時(shí)間處理的數(shù)據(jù)量,其中CPU貢獻(xiàn)用主頻和核數(shù)之積來(lái)評(píng)判,表示為CPUx=fx×corex。內(nèi)存則以吞吐量為指標(biāo),由于吞吐量牽涉的因素較多,而內(nèi)存計(jì)算的最大瓶頸是容量問(wèn)題,因此可定義內(nèi)存處理能力為Memx,用于表示內(nèi)存空間大小。
(1)
(2)
則節(jié)點(diǎn)x的綜合歸一化計(jì)算能力則可表示如式(3)所示:
(3)
定義2(節(jié)點(diǎn)資源占用率) 節(jié)點(diǎn)資源占用率是指系統(tǒng)中節(jié)點(diǎn)資源的已使用比率,占用率越高,節(jié)點(diǎn)計(jì)算能力越低;反之,則計(jì)算能力越高。節(jié)點(diǎn)x的資源利用率ux可定義如式(4)所示:
ux={ucx,umx|0≤ucx≤1,0≤umx≤1}
(4)
其中,ucx表示CPU占用率,umx表示內(nèi)存占用率。
ucx和umx均可通過(guò)系統(tǒng)性能監(jiān)測(cè)軟件獲得,本文利用nmon軟件進(jìn)行監(jiān)測(cè)。當(dāng)CPU利用率為ucx時(shí),則實(shí)際的CPU計(jì)算能力為fx×corex×(1-ucx),當(dāng)內(nèi)存資源占用率為umx時(shí),則實(shí)際的內(nèi)存可用空間為Memx×(1-umn)。
定義3(數(shù)據(jù)分布) 輸入數(shù)據(jù)分布對(duì)應(yīng)Spark作業(yè)的每個(gè)階段,第1個(gè)階段的輸入數(shù)據(jù)分布是原始數(shù)據(jù)集的鍵值分布情況,若輸入數(shù)據(jù)中包含m個(gè)鍵值,且km對(duì)應(yīng)的數(shù)據(jù)量為vm時(shí),則輸入數(shù)據(jù)分布如式(5)所示:
d=(〈k1,v1〉,〈k2,v2〉,…,〈km,vm〉)
(5)
后續(xù)計(jì)算階段的輸入數(shù)據(jù)分布對(duì)應(yīng)前一個(gè)階段的輸出結(jié)果,假設(shè)某個(gè)階段包含r個(gè)數(shù)據(jù)分區(qū)(partition),則需統(tǒng)計(jì)r個(gè)分區(qū)中鍵值的數(shù)量,則第b個(gè)分區(qū)的數(shù)據(jù)分布db=(〈k1,vb1〉,〈k2,vb2〉,…,〈km,vbm〉),而輸入總數(shù)據(jù)量則如式(6)所示:
(6)
定義4(數(shù)據(jù)傾斜度) 輸入數(shù)據(jù)傾斜是某個(gè)鍵值的數(shù)據(jù)量相較于其他鍵值數(shù)據(jù)量的差異程度,因此可定義為某個(gè)鍵值的數(shù)據(jù)量與總體數(shù)據(jù)量均值的方差。對(duì)于輸入數(shù)據(jù)的第j個(gè)鍵值,其數(shù)據(jù)傾斜度可定義為:
(7)
skewj越大,表示該鍵值數(shù)據(jù)傾斜度越高;反之,則表示該鍵值的數(shù)據(jù)量與平均數(shù)據(jù)量差距越小。
因此,輸入數(shù)據(jù)分布d的數(shù)據(jù)傾斜度可表示如式(8)所示:
(8)
S越大,表示數(shù)據(jù)分布越不均勻;S越趨近于0,則數(shù)據(jù)分布越均勻。
定義5(key-bucket分布) 對(duì)于作業(yè)中每個(gè)計(jì)算階段的輸出,需要將輸出數(shù)據(jù)按鍵值分配到多個(gè)數(shù)據(jù)桶中。Spark系統(tǒng)根據(jù)哈希函數(shù)映射,將每個(gè)分區(qū)數(shù)據(jù)的鍵值關(guān)聯(lián)到對(duì)應(yīng)的桶中,如式(9)所示:
bucketid=hashfunction(key)
(9)
每個(gè)計(jì)算階段輸出的數(shù)據(jù)桶個(gè)數(shù)由設(shè)置的并行度參數(shù)決定,若當(dāng)前并行度數(shù)為p,則數(shù)據(jù)桶的id取值為0到p-1。
定義6(數(shù)據(jù)桶傾斜度) 數(shù)據(jù)桶傾斜度是對(duì)于同一計(jì)算階段的多個(gè)數(shù)據(jù)桶,某個(gè)數(shù)據(jù)桶的數(shù)據(jù)量與總體數(shù)據(jù)量均值的方差。對(duì)于第p-1個(gè)數(shù)據(jù)桶,其數(shù)據(jù)傾斜度可定義為:
(10)
skewbucketp-1越大,表示該數(shù)據(jù)桶傾斜度越高。若所有數(shù)據(jù)桶的傾斜度越趨近于0,表示各數(shù)據(jù)桶的數(shù)據(jù)量分配越均勻。
定義7(任務(wù)內(nèi)存需求) 由于Spark利用內(nèi)存處理來(lái)提升計(jì)算效率,因此分配足夠內(nèi)存既能夠保障內(nèi)存計(jì)算的執(zhí)行速度,同時(shí)也能避免因不正確配置參數(shù)導(dǎo)致的不可預(yù)期程序異常。記Memtaskh為第h個(gè)任務(wù)的內(nèi)存需求,若共有q個(gè)任務(wù),則作業(yè)總體內(nèi)存需求如式(11)所示:
(11)
內(nèi)存需求量主要包括3部分:(1)基本內(nèi)存需求,即數(shù)據(jù)集中對(duì)象大小的2~3倍;(2)訪問(wèn)這些對(duì)象的內(nèi)存消耗;(3)垃圾回收消耗。因此,某個(gè)任務(wù)的執(zhí)行區(qū)需求量與所需計(jì)算的數(shù)據(jù)量有關(guān)。實(shí)際執(zhí)行區(qū)需求量可通過(guò)實(shí)時(shí)查看內(nèi)存所占用空間大小和溢出量來(lái)綜合判斷。
定理1并行度適度法則:并行度的選擇符合適度法則,既不能太少,也不能太多,需要和執(zhí)行作業(yè)的需求以及集群計(jì)算能力相匹配。若分區(qū)數(shù)較少,會(huì)產(chǎn)生額外磁盤(pán)開(kāi)銷(xiāo),影響執(zhí)行效率;若分區(qū)數(shù)較多,會(huì)增加任務(wù)切換的開(kāi)銷(xiāo),且生成較多的小文件。
因此,并行度的設(shè)置既不能太小也不能太大,想要獲得較好的性能,需根據(jù)作業(yè)DAG及具體數(shù)據(jù)狀況選擇合適的并行度。
□
定理2數(shù)據(jù)溢出規(guī)避法則:在某個(gè)計(jì)算階段中產(chǎn)生數(shù)據(jù)傾斜時(shí),分配更大的并行度,使得每個(gè)任務(wù)內(nèi)存需求量減小,其內(nèi)存溢出量也會(huì)隨之降低。
證明記節(jié)點(diǎn)x分配的內(nèi)存大小和核數(shù)分別為Memx和fx。設(shè)分配的任務(wù)數(shù)量為l。
任務(wù)內(nèi)存需求量分別為{Memtask1,Memtask2,…,Memtaskl}且Memtask1>Memtask2>Memtaskl,即將l個(gè)任務(wù)按照所需內(nèi)存大小降序排列,隨著l變大,則每個(gè)任務(wù)的內(nèi)存需求量隨之減小。
若l
□
本節(jié)基于模型的相關(guān)定義及定理證明,首先描述算法的程序模塊和代碼更新,然后進(jìn)行異構(gòu)Spark集群并行調(diào)度策略的整體描述,最后提出并行度預(yù)估算法、數(shù)據(jù)傾斜修正算法和異構(gòu)節(jié)點(diǎn)任務(wù)分配算法。
自適應(yīng)異構(gòu)并行調(diào)度策略根據(jù)數(shù)據(jù)的傾斜情況和節(jié)點(diǎn)計(jì)算能力,在作業(yè)執(zhí)行過(guò)程中對(duì)并行度和任務(wù)分配進(jìn)行調(diào)整。計(jì)算各階段并行度的前提是獲取前序階段的任務(wù)數(shù)及其對(duì)應(yīng)輸出數(shù)據(jù)量,然后根據(jù)計(jì)算節(jié)點(diǎn)的資源情況對(duì)任務(wù)進(jìn)行有效分配。對(duì)于迭代執(zhí)行的算法,首輪執(zhí)行Spark任務(wù)時(shí),根據(jù)并行度預(yù)估算法的默認(rèn)并行度執(zhí)行,在第2輪開(kāi)始,根據(jù)前一輪的數(shù)據(jù)統(tǒng)計(jì)和傾斜分析,利用預(yù)估算法和數(shù)據(jù)傾斜修正算法對(duì)當(dāng)前的并行度進(jìn)行調(diào)整,任務(wù)分配時(shí),則需要通過(guò)異構(gòu)節(jié)點(diǎn)任務(wù)分配算法根據(jù)當(dāng)前節(jié)點(diǎn)的資源狀況進(jìn)行任務(wù)量分配。自適應(yīng)異構(gòu)并行調(diào)度策略的主要過(guò)程如下所示:
(1)除第1個(gè)階段之外,其余每個(gè)計(jì)算階段收集前序階段輸出數(shù)據(jù)大小、數(shù)據(jù)分布和可用節(jié)點(diǎn)的空閑內(nèi)存及核數(shù)資源情況,構(gòu)建算法所需的元數(shù)據(jù)。
(2)根據(jù)采集的元數(shù)據(jù),執(zhí)行初始并行度預(yù)估算法(見(jiàn)4.2節(jié)),計(jì)算當(dāng)前計(jì)算階段的初始并行度。
(3)根據(jù)并行調(diào)度策略的機(jī)制,首先預(yù)判斷不均衡的鍵值,利用數(shù)據(jù)傾斜修正算法(見(jiàn)4.3節(jié))對(duì)鍵值進(jìn)行拆分評(píng)估, 對(duì)數(shù)據(jù)傾斜度較高的鍵值增加前綴,使其能夠映射到多個(gè)數(shù)據(jù)桶,然后將傾斜的數(shù)據(jù)桶進(jìn)一步劃分,增加相應(yīng)的并行度。
(4)當(dāng)確定并行度后,根據(jù)異構(gòu)節(jié)點(diǎn)任務(wù)分配算法(見(jiàn)4.4節(jié))對(duì)(3)中待分配數(shù)據(jù)桶進(jìn)行劃分和映射。
(5)迭代(1)~(4),在多寬依賴(lài)作業(yè)中實(shí)施多次分配,為后續(xù)計(jì)算階段確定合理的并行度和數(shù)據(jù)分配。
根據(jù)3.2節(jié)定理1可知,并行度設(shè)置應(yīng)當(dāng)在執(zhí)行任務(wù)內(nèi)存需求不超過(guò)所分配工作節(jié)點(diǎn)可用容量的情況下,選擇額外調(diào)度開(kāi)銷(xiāo)盡可能少的并行度設(shè)定值。因此,算法通過(guò)前序計(jì)算階段的數(shù)據(jù)量和可用節(jié)點(diǎn)的內(nèi)存空間計(jì)算并行度,并且保障任務(wù)平均分配數(shù)據(jù)大小不超過(guò)平均內(nèi)存空間大小。并行度預(yù)估算法詳細(xì)步驟如下所示:
(1)統(tǒng)計(jì)系統(tǒng)各節(jié)點(diǎn)資源情況,并將其存儲(chǔ)在node表中,其中包括節(jié)點(diǎn)內(nèi)存容量和核心數(shù)等。
(2)在計(jì)算開(kāi)始前,基于作業(yè)DAG生成計(jì)算階段結(jié)構(gòu)樹(shù)stageTree。
(3)遍歷計(jì)算階段結(jié)構(gòu)樹(shù),記錄計(jì)算階段集合stagei(wdi,inputRDDi和outputRDDi),其中inputRDDi和outputRDDi分別為stagei中輸入RDD和輸出RDD,可以是一個(gè)或多個(gè)RDD。
(4)初始化stage0,其并行度劃分取決于作業(yè)輸入數(shù)據(jù)在HDFS中分塊的個(gè)數(shù)blockNum,該值由輸入數(shù)據(jù)大小inputSize和默認(rèn)HDFS分塊大小blockSize共同決定。
(5)在stage1到stagen執(zhí)行階段中,獲取當(dāng)前可用節(jié)點(diǎn)的可用資源情況,存儲(chǔ)在availableResource表,包括內(nèi)存和核心數(shù),統(tǒng)計(jì)可用內(nèi)存總?cè)萘縯ms和可用核心總數(shù)tcn。
(6)同時(shí)統(tǒng)計(jì)每個(gè)任務(wù)的鍵值分布和數(shù)據(jù)大小。當(dāng)前序stagei執(zhí)行結(jié)束,統(tǒng)計(jì)各分區(qū)大小ptSize和數(shù)據(jù)分布情況〈key,Value〉,并計(jì)算所有parentRDD的總?cè)萘縫rSize。
(7)假定內(nèi)存需求量與數(shù)據(jù)量的比值關(guān)系為xp,默認(rèn)情況下將xp設(shè)置為2,即數(shù)據(jù)在內(nèi)存中所需的內(nèi)存空間為數(shù)據(jù)量大小的2倍,則計(jì)算parentRDD總?cè)萘颗c可用內(nèi)存總?cè)萘縯ms和核心總數(shù)tcn乘積的商值,即(xp×prSize/tms)×tcn。
(8)若(xp×prSize/tms)×tcn≤1,則初始并行度設(shè)置為tcn。
并行度預(yù)估算法如算法1所示。
算法1并行度預(yù)估算法
輸入:作業(yè)DAG,系統(tǒng)配置systemconfigruarion,輸入數(shù)據(jù)塊個(gè)數(shù)blocknum,可用資源availableResource,內(nèi)存需求比xp。
輸出:初始并行度stagei.Pa0
初始化:prSize←0;xp←2;
①stageTree←generate(DAG);//獲取執(zhí)行計(jì)劃
②stage0.Pa←blockNum;/*初始化第1個(gè)計(jì)算階段*/
③for(i=1 tostageTree.length-1)
④stagei(wdi,parentRDD)←get(DAG);
⑤ptnumi←stagei.partitionnum;
⑥tms←sum(availableResource.mem);
⑦tcn←sum(availableResource.core);
⑧for(j=0 toptnumi-1)
⑨ptSizeij←getsize(i,j);
⑩ (kid,ksize)←partitionij(keyid,keysize);
根據(jù)3.2節(jié)定理2可知,當(dāng)節(jié)點(diǎn)產(chǎn)生數(shù)據(jù)傾斜時(shí),為了盡可能地減少內(nèi)存溢出,需要對(duì)數(shù)據(jù)傾斜且內(nèi)存需求超過(guò)內(nèi)存的鍵值進(jìn)行拆分。傾斜修正算法的主要目的是利用分區(qū)函數(shù)將數(shù)據(jù)傾斜且明顯大于需求內(nèi)存的鍵值進(jìn)行重新劃分,進(jìn)而劃分到不同的分區(qū)中去,在分配之后去掉添加的前綴,恢復(fù)成原本的鍵值,再重新執(zhí)行一個(gè)Reduce操作。
數(shù)據(jù)傾斜修正算法的主要步驟如下所示:
(4)將所有拆分sk_id的拆分個(gè)數(shù)統(tǒng)計(jì)求和,記錄為tskn;
(6)此時(shí)將默認(rèn)數(shù)據(jù)桶數(shù)量修改為修正并行度Pa1=stagei.Pa0+tskn;
(7)通過(guò)計(jì)算hash(key) mod (Pa0+tskn)獲得寫(xiě)入數(shù)據(jù)的數(shù)據(jù)桶編號(hào),將數(shù)據(jù)存入相應(yīng)數(shù)據(jù)桶;
(8)數(shù)據(jù)桶則暫不建立映射關(guān)系,交由后續(xù)異構(gòu)節(jié)點(diǎn)任務(wù)分配算法進(jìn)行操作。
數(shù)據(jù)傾斜修正算法如算法2所示。
算法2數(shù)據(jù)傾斜修正算法
輸入:當(dāng)前執(zhí)行階段stagei,總元組數(shù)vinput,鍵值的個(gè)數(shù)m,可用內(nèi)存總?cè)萘縯ms,可用核心總數(shù)tcn,初始并行度Pa0。
初始化:vj+=Get(keyj_size);
//獲取按鍵值劃分的數(shù)據(jù)量
①forkeyjinstagei(j=0 tom-1)
③if(skewj>0 andvj>tms/tcn)then
④sk_id[l2]←kj(key_id);
⑤l2++;
⑥svl2←sk_id[l2].size;
⑧tskn+←svnl2;
⑨for(k=0 tosvnl2)
⑩key[n2]←addPrefix(sk_id[k]);
數(shù)據(jù)桶填充完畢之后,需要將桶映射到節(jié)點(diǎn)相應(yīng)的Reducer中,桶的數(shù)量與Reducer個(gè)數(shù)相同,系統(tǒng)默認(rèn)方式是根據(jù)編號(hào)直接建立映射關(guān)系,由于此時(shí)未考慮節(jié)點(diǎn)CPU計(jì)算能力和內(nèi)存大小的差異,因此不能有效地平衡節(jié)點(diǎn)計(jì)算能力和任務(wù)量之間的關(guān)系。異構(gòu)節(jié)點(diǎn)任務(wù)分配算法根據(jù)節(jié)點(diǎn)計(jì)算能力進(jìn)行排序,將各數(shù)據(jù)桶數(shù)據(jù)量大小進(jìn)行排序,盡可能將符合節(jié)點(diǎn)計(jì)算要求的數(shù)據(jù)桶分配到相應(yīng)的節(jié)點(diǎn)中。異構(gòu)節(jié)點(diǎn)任務(wù)分配算法主要過(guò)程如下所示:
(1)將并行度Pa1擴(kuò)展至Pa2=α×tcn,其中α為正整數(shù)且(α-1)×tcn≤Pa1≤α×tcn,α≥1;
(2)同時(shí),將Pa1個(gè)數(shù)據(jù)桶劃分為α輪進(jìn)行分配,其中前α-1輪,每輪分配tcn個(gè)數(shù)據(jù)桶,第α輪分配Pa1-(α-1)×tcn個(gè)數(shù)據(jù)桶;
(4)將bucket[Pa1]根據(jù)數(shù)據(jù)量大小進(jìn)行排序存入數(shù)組bs;
(5)從第1輪到第α-1輪中,每輪從bs[(round-1)×tcn]到bs[round×tcn-1]開(kāi)始選擇,將bs[(round-1) ×tcn]到bs[round×tcn-1]一一映射到reducer1到reducertcn;
(6)第α輪,將bs[(α-1)×tcn]至bs[Pa1-1]一一映射到節(jié)點(diǎn)計(jì)算能力較強(qiáng)的Reducer上,即Pa1-(α-1)×tcn到reducertcn。
異構(gòu)節(jié)點(diǎn)任務(wù)分配算法如算法3所示。
算法3異構(gòu)節(jié)點(diǎn)任務(wù)分配算法
輸入:當(dāng)前執(zhí)行階段stagei,可用核心總數(shù)tcn,修正并行度Pa1。
初始化:k←1;q1←1;
//將節(jié)點(diǎn)根據(jù)計(jì)算能力排序
③bs[e]←descendBy(vbucket[e]);
//將數(shù)據(jù)桶根據(jù)數(shù)據(jù)量大小排序
④for(round=1 toα-1)
⑤for(u=round-1)*tcntotcn)
⑥for(v=bs[(α-1)*tcntotcn)
⑦reducerk1←assign(bs[(round-1)*tcn+q1]);
⑧k1++;
⑨q1++;
⑩endfor/*將第1到α-1輪的數(shù)據(jù)桶映射到Reducer*/
本節(jié)將通過(guò)實(shí)驗(yàn)進(jìn)行比較和評(píng)價(jià),驗(yàn)證異構(gòu)并行調(diào)度策略的有效性。
本文的Spark集群共有10個(gè)節(jié)點(diǎn),包括1個(gè)主節(jié)點(diǎn)與9個(gè)工作節(jié)點(diǎn),各節(jié)點(diǎn)軟硬件配置如表1所示。Spark集群為5個(gè)工作節(jié)點(diǎn)各分配4 GB內(nèi)存和2個(gè)CPU核心,其余2個(gè)工作節(jié)點(diǎn)各分配2 GB內(nèi)存和1個(gè)CPU核心,2個(gè)工作節(jié)點(diǎn)各分配8 GB內(nèi)存和4個(gè)CPU核心,因此集群中共有40 GB內(nèi)存以及20個(gè)CPU邏輯核心用于執(zhí)行迭代應(yīng)用。
Table 1 Configuration parameters of worker nodes
實(shí)驗(yàn)采用nmon來(lái)監(jiān)控Spark集群的資源使用情況,由于主節(jié)點(diǎn)主要負(fù)責(zé)任務(wù)調(diào)度與資源分配,不需要實(shí)際執(zhí)行任務(wù),為方便起見(jiàn),本文將主節(jié)點(diǎn)與監(jiān)控服務(wù)器集成到一個(gè)節(jié)點(diǎn),避免對(duì)集群計(jì)算性能造成影響。實(shí)驗(yàn)使用基準(zhǔn)測(cè)試集BigDataBench[27]中多個(gè)作業(yè),包括WordCount算法、TeraSort算法、k-means聚類(lèi)算法和PageRank算法進(jìn)行評(píng)估。
實(shí)驗(yàn)采用數(shù)據(jù)密集型應(yīng)用PageRank作業(yè)對(duì)提出的3個(gè)算法分別進(jìn)行評(píng)估,數(shù)據(jù)選取SNAP(Stanford Network Analysis Project)提供的3個(gè)數(shù)據(jù)量差異較大的標(biāo)準(zhǔn)數(shù)據(jù)集,均為有向圖,如表2所示。任務(wù)選用數(shù)據(jù)密集型算法PageRank,因?yàn)閿?shù)據(jù)密集型算法對(duì)系統(tǒng)的并行度策略更加敏感,更有利于驗(yàn)證算法的有效性。為了明顯體現(xiàn)并行度對(duì)內(nèi)存溢出情況的顯著影響可用節(jié)點(diǎn)數(shù)量調(diào)整為4個(gè)工作節(jié)點(diǎn)和1個(gè)主節(jié)點(diǎn),將工作節(jié)點(diǎn)可用內(nèi)存空間調(diào)整為1 GB,其中2個(gè)節(jié)點(diǎn)的可用核心數(shù)為2個(gè),另外2個(gè)節(jié)點(diǎn)的可用核心數(shù)為4個(gè)。
Table 2 Information of test datasets
(1)初始并行度生成算法。
利用PageRank作業(yè)進(jìn)行10輪迭代,驗(yàn)證初始并行度生成算法的效率,與系統(tǒng)默認(rèn)并行度設(shè)置為2×40=80的情況進(jìn)行對(duì)比,實(shí)驗(yàn)結(jié)果如圖1所示。其中,圖1a為不同輸入數(shù)據(jù)類(lèi)型,利用動(dòng)態(tài)并行度進(jìn)行設(shè)置與默認(rèn)固定并行度的執(zhí)行時(shí)間對(duì)比;圖1b為不同輸入數(shù)據(jù)類(lèi)型,利用動(dòng)態(tài)并行度設(shè)置算法進(jìn)行配置時(shí),算法在不同計(jì)算階段的并行度變化。
Figure 1 Parallelism prediction algorithm
由圖1a可知,對(duì)3類(lèi)數(shù)據(jù)類(lèi)型而言,使用動(dòng)態(tài)并行度都能夠有效地縮短作業(yè)執(zhí)行時(shí)間,不同的并行度對(duì)作業(yè)執(zhí)行效率的影響很明顯,不恰當(dāng)?shù)牟⑿卸瓤赡軙?huì)使得部分節(jié)點(diǎn)資源利用率較低,其余節(jié)點(diǎn)內(nèi)存資源不足而溢出,從而增加任務(wù)執(zhí)行時(shí)間,降低作業(yè)執(zhí)行效率。其中Cit-Patents受到并行度影響更大,由于Cit-Patents具有較大的數(shù)據(jù)量,因此在執(zhí)行過(guò)程中,需要占用的內(nèi)存空間較大,更有可能發(fā)生溢寫(xiě),因此選擇合理的并行度能夠更好地縮短作業(yè)執(zhí)行時(shí)間。
結(jié)合圖1b可知,各個(gè)階段的并行度具有明顯的變化,DSCS能夠有效結(jié)合計(jì)算階段的執(zhí)行狀態(tài)進(jìn)行調(diào)整,隨著并行度的變化,優(yōu)化了階段內(nèi)任務(wù)的完成時(shí)間,最終縮短了作業(yè)總執(zhí)行時(shí)間。
(2)數(shù)據(jù)傾斜修正算法。
使用PageRank作業(yè)3個(gè)不同數(shù)據(jù)集,驗(yàn)證數(shù)據(jù)傾斜修正算法效率,并與系統(tǒng)默認(rèn)的數(shù)據(jù)桶劃分算法進(jìn)行對(duì)比。算法未修改的參數(shù)保持Spark系統(tǒng)默認(rèn),迭代次數(shù)為10,并行度固定設(shè)置為20,傾斜度分別設(shè)置為0.2,0.4,0.6和0.8。圖2a為不同傾斜度時(shí),數(shù)據(jù)傾斜修正算法與系統(tǒng)默認(rèn)數(shù)據(jù)桶劃分算法的執(zhí)行時(shí)間對(duì)比;圖2b為不同傾斜度時(shí),3個(gè)數(shù)據(jù)集平均內(nèi)存溢寫(xiě)情況的對(duì)比。
由圖2a可知,PageRank在數(shù)據(jù)量較大且產(chǎn)生數(shù)據(jù)傾斜時(shí),數(shù)據(jù)傾斜修正算法對(duì)執(zhí)行時(shí)間的影響較大;而在數(shù)據(jù)量較小時(shí),即使產(chǎn)生數(shù)據(jù)傾斜,該算法的效果也不明顯,因?yàn)椴煌蝿?wù)之間數(shù)據(jù)傾斜的差異較小。由圖2b可知,執(zhí)行作業(yè)數(shù)據(jù)量越大、數(shù)據(jù)傾斜越嚴(yán)重,在默認(rèn)數(shù)據(jù)桶劃分算法的情況下,產(chǎn)生的磁盤(pán)溢寫(xiě)量就越大。
(3)異構(gòu)節(jié)點(diǎn)任務(wù)分配算法。
利用PageRank作業(yè)驗(yàn)證異構(gòu)節(jié)點(diǎn)任務(wù)分配算法的效率,并與系統(tǒng)默認(rèn)任務(wù)分配一一映射的算法進(jìn)行對(duì)比。圖3a為異構(gòu)節(jié)點(diǎn)任務(wù)分配算法與系統(tǒng)默認(rèn)的節(jié)點(diǎn)分配算法的執(zhí)行時(shí)間對(duì)比;圖3b為利用異構(gòu)節(jié)點(diǎn)任務(wù)分配算法進(jìn)行設(shè)置時(shí),3個(gè)數(shù)據(jù)作業(yè)在不同迭代次數(shù)時(shí)的高性能節(jié)點(diǎn)3的資源利用率情況。由圖3a可知,PageRank在節(jié)點(diǎn)異構(gòu)情況下,使用系統(tǒng)默認(rèn)的數(shù)據(jù)桶分配算法與異構(gòu)節(jié)點(diǎn)任務(wù)分配算法相比,執(zhí)行時(shí)間相對(duì)略長(zhǎng)。在各節(jié)點(diǎn)之間計(jì)算能力差異明顯的情況下,隨著數(shù)據(jù)傾斜度的增加,效率差異會(huì)更加顯著。由圖3b可知,PageRank算法在使用異構(gòu)節(jié)點(diǎn)任務(wù)分配算法進(jìn)行配置時(shí),節(jié)點(diǎn)資源利用率得到了較好的均衡,表明充分利用了計(jì)算能力更強(qiáng)的節(jié)點(diǎn)。
使用多個(gè)作業(yè)包括WordCount算法、TeraSort算法、k-means聚類(lèi)算法和PageRank算法進(jìn)行綜合測(cè)試,將優(yōu)化算法嵌在插件中,通過(guò)建立的Spark平臺(tái)對(duì)DSCS策略進(jìn)行驗(yàn)證和評(píng)估,對(duì)比優(yōu)化前后的作業(yè)執(zhí)行時(shí)間和內(nèi)存利用率,實(shí)驗(yàn)結(jié)果如圖4所示。其中WordCount算法輸入集為Wiki,數(shù)據(jù)大小為7.8 GB;TeraSort算法輸入數(shù)據(jù)大小為3.5 GB;k-means算法輸入數(shù)據(jù)大小為16.4 GB;PageRank算法輸入數(shù)據(jù)大小為1.2 GB。
Figure 4 Overall evaluation of algorithm
通過(guò)圖4a和圖4b可知,經(jīng)過(guò)并行度優(yōu)化策略的調(diào)整,4個(gè)作業(yè)的作業(yè)執(zhí)行時(shí)間均得到降低,DSCS有效地提高了工作節(jié)點(diǎn)的平均內(nèi)存利用率。對(duì)并行度預(yù)估算法而言,作業(yè)處理的數(shù)據(jù)量和工作節(jié)點(diǎn)的內(nèi)存容量是評(píng)估并行度的重要依據(jù),能夠盡量減少工作節(jié)點(diǎn)內(nèi)存溢出和頻繁垃圾收集,縮短作業(yè)執(zhí)行時(shí)間;對(duì)于數(shù)據(jù)傾斜修正算法,數(shù)據(jù)分布越傾斜,寬依賴(lài)的同步代價(jià)越大,延時(shí)也越大。數(shù)據(jù)傾斜修正算法能夠?qū)A斜度較大的數(shù)據(jù)進(jìn)行劃分,為合理的分區(qū)映射提供依據(jù);異構(gòu)節(jié)點(diǎn)任務(wù)分配算法能夠根據(jù)節(jié)點(diǎn)的計(jì)算能力分配數(shù)據(jù)塊,解決節(jié)點(diǎn)計(jì)算能力不均衡的問(wèn)題,提高工作節(jié)點(diǎn)資源利用率和計(jì)算效率。因此,從總體上來(lái)看,數(shù)據(jù)傾斜修正調(diào)度策略具有良好的優(yōu)化效果。
現(xiàn)有的研究較少關(guān)注異構(gòu)Spark系統(tǒng)設(shè)置中的并行度參數(shù),往往根據(jù)經(jīng)驗(yàn)進(jìn)行設(shè)定,很難契合不同的作業(yè)類(lèi)型和數(shù)據(jù)量大小,不能在作業(yè)各計(jì)算階段的執(zhí)行過(guò)程中發(fā)生變化。因此,本文根據(jù)作業(yè)的數(shù)據(jù)量、數(shù)據(jù)分布傾斜情況和節(jié)點(diǎn)的計(jì)算能力進(jìn)行評(píng)估,提出了適應(yīng)計(jì)算階段狀態(tài)變化的數(shù)據(jù)傾斜修正調(diào)度策略,其中包括并行度預(yù)估算法、數(shù)據(jù)傾斜修正算法和異構(gòu)節(jié)點(diǎn)任務(wù)分配算法3個(gè)部分。計(jì)算細(xì)粒度任務(wù)的并行度并進(jìn)行計(jì)算數(shù)據(jù)合理分配。實(shí)驗(yàn)表明,該策略能夠較好地貼合作業(yè)類(lèi)型、數(shù)據(jù)分布和節(jié)點(diǎn)計(jì)算能力,有效地提高了作業(yè)執(zhí)行效率。下一步將針對(duì)于每階段任務(wù)的具體操作,計(jì)劃采用回歸的方法,爭(zhēng)取對(duì)數(shù)據(jù)的內(nèi)存需求進(jìn)行更精確的預(yù)估,從而實(shí)現(xiàn)作業(yè)執(zhí)行效率更高級(jí)別的提升,并將研究?jī)?nèi)容延伸至流式計(jì)算平臺(tái)的并行效率優(yōu)化。