林金羽,閆 格
(1.閩南師范大學(xué)計算機學(xué)院,福建漳州363000;2.福建省粒計算及其應(yīng)用重點實驗室,福建 漳州363000)
隨著大數(shù)據(jù)時代的到來,非結(jié)構(gòu)化的數(shù)據(jù)在以指數(shù)級的速率增長的同時,逐漸成為數(shù)據(jù)處理中的主流,占80%以上,從而針對海量的非結(jié)構(gòu)化數(shù)據(jù)處理將是一個巨大的挑戰(zhàn)[1].目前,雅虎在2008年發(fā)布的分布式應(yīng)用Hadoop被廣泛應(yīng)用于處理各種結(jié)構(gòu)化和非結(jié)構(gòu)化的數(shù)據(jù)[2],其框架主要核心由如下三部分組成:Hadoop 分布式文件系統(tǒng)(HDFS),MapReduce 并行計算框架以及資源管理YARN.MapReduce 在計算以及處理數(shù)據(jù)的過程中需要占用大量的I/O資源,如將被處理的數(shù)據(jù)分發(fā)到各個節(jié)點并存儲在本地磁盤中,根據(jù)節(jié)點資源使用情況將任務(wù)分配到各個節(jié)點上執(zhí)行是作業(yè)調(diào)度的主要作用.作業(yè)調(diào)度是實現(xiàn)高性能處理大數(shù)據(jù)的關(guān)鍵因素,影響作業(yè)調(diào)度性能的因素有很多,如數(shù)據(jù)量、數(shù)據(jù)格式、數(shù)據(jù)傳輸速率、數(shù)據(jù)安全性等.因此,使用改進(jìn)調(diào)度算法的策略來提升Hadoop應(yīng)對處理非結(jié)構(gòu)化數(shù)據(jù)的性能就成為了亟待解決的問題.
在Hadoop 中,文件在HDFS 上被分成一個或多個數(shù)據(jù)塊,這些數(shù)據(jù)塊存儲在多個節(jié)點中,當(dāng)進(jìn)行MapReduce 工作時,需要將其他節(jié)點上的數(shù)據(jù)傳輸?shù)綄嶋H計算的節(jié)點上.為了解決數(shù)據(jù)由一個節(jié)點傳輸?shù)搅硪粋€節(jié)點需要時間以及消耗大量的網(wǎng)絡(luò)帶寬的問題,數(shù)據(jù)局部性的概念應(yīng)運而生.調(diào)度算法需要通過增強數(shù)據(jù)局部性,將計算任務(wù)放在輸入數(shù)據(jù)的節(jié)點上來避免不必要的數(shù)據(jù)傳輸[3].
Hadoop MapReduce 計算框架中有3 種自帶的調(diào)度算法:先進(jìn)先出(FIFO Scheduler)調(diào)度算法、公平(Fair Scheduler)調(diào)度算法和計算能力(Capacity Scheduler)調(diào)度算法.FIFO Scheduler 是Hadoop1.0 默認(rèn)使用的調(diào)度策略,該算法采用隊列,將所有作業(yè)都提交在隊列中,按照作業(yè)的提交時間順序執(zhí)行作業(yè),該算法不關(guān)注作業(yè)的處理時間和處理的優(yōu)先級,存在數(shù)據(jù)局部性的問題.Fair Scheduler支持多個隊列,為每個隊列中的作業(yè)分配均等的資源,存在配置復(fù)雜、未考慮作業(yè)的重要程度等問題.Capacity Scheduler 是Hadoop2.7.2及以上默認(rèn)使用的調(diào)度策略,該算法支持多個隊列,每個隊列可以共享集群的部分資源,做到最大化吞吐量及集群資源利用率[4].
現(xiàn)有的研究集中于對調(diào)度算法進(jìn)行改進(jìn),從而更好地做到集群的負(fù)載均衡[5].文獻(xiàn)[6]提出一種異構(gòu)環(huán)境下基于節(jié)點計算能力的調(diào)度算法,即給每個服務(wù)器節(jié)點的性能分配不同的權(quán)值,Reduce 任務(wù)根據(jù)不同的權(quán)重進(jìn)行任務(wù)調(diào)度,該算法考慮到了節(jié)點的異構(gòu)性,能較好地做到負(fù)載均衡;文獻(xiàn)[7]考慮節(jié)點與任務(wù)兩種因素,提出一種能夠?qū)⒐?jié)點性能高低進(jìn)行實時排序且按將相似的任務(wù)進(jìn)行歸并的算法,該算法將復(fù)雜的任務(wù)分配給性能強的節(jié)點執(zhí)行,減少了任務(wù)執(zhí)行時間;文獻(xiàn)[8]提出一種在Reduce 階段,考慮網(wǎng)絡(luò)帶寬、Map輸出結(jié)果的數(shù)據(jù)偏移量和節(jié)點計算能力等因素,選取合適的節(jié)點進(jìn)行計算,該算法能夠動態(tài)地獲取節(jié)點信息,提高了資源的利用率.
文獻(xiàn)[9]提出一種根據(jù)節(jié)點的處理能力將輸入的數(shù)據(jù)塊分配給節(jié)點,根據(jù)節(jié)點的計算能力將map 與reduce 任務(wù)進(jìn)行分配;文獻(xiàn)[10]提出一種新的基于HDFS 分割的索引框架,該框架減少MapReduce程序的IO 開銷,提高了資源利用率,但框架較為復(fù)雜;文獻(xiàn)[11]深入Hadoop內(nèi)存管理,優(yōu)化內(nèi)存的分配,在任務(wù)執(zhí)行過程中優(yōu)化垃圾處理機制,但該方法學(xué)習(xí)成本高,需要一定的Java內(nèi)存優(yōu)化基礎(chǔ).
本算法在為作業(yè)分配資源時考慮了節(jié)點和作業(yè)的異構(gòu)性,在任務(wù)到達(dá)前設(shè)置監(jiān)聽器獲取作業(yè)到達(dá)時消息及資源空閑時消息,根據(jù)作業(yè)到達(dá)率和平均作業(yè)執(zhí)行時間將作業(yè)分類,考慮作業(yè)所需資源的最小資源數(shù)量,優(yōu)先將本地任務(wù)分配給節(jié)點,確保每個節(jié)點都能公平地獲取到任務(wù),當(dāng)收到作業(yè)到達(dá)消息或資源空閑消息時,將任務(wù)進(jìn)行分類并分配給空閑的資源.
算法在初始化階段設(shè)置兩個監(jiān)聽器分別用來監(jiān)聽系統(tǒng)資源空閑時消息及作業(yè)到達(dá)時消息,當(dāng)監(jiān)聽器收到監(jiān)聽消息后,調(diào)度算法根據(jù)消息的類型進(jìn)行相應(yīng)的處理.當(dāng)新作業(yè)到達(dá)后,調(diào)度算法根據(jù)作業(yè)估計完成時間及數(shù)據(jù)到達(dá)率將作業(yè)進(jìn)行分類、處理,當(dāng)系統(tǒng)中有空閑資源時,調(diào)度算法將合適的作業(yè)分配給該資源.
提出的算法根據(jù)作業(yè)的估計完成時間對作業(yè)進(jìn)行分類,因為在集群中大部分作業(yè)是重復(fù)的,優(yōu)先級相同且等待時間相似,根據(jù)作業(yè)的估計完成時間將作業(yè)分為兩類,能夠更好地進(jìn)行調(diào)度,做到負(fù)載均衡,提高資源利用率.
MapReduce 執(zhí)行作業(yè)可以分為三個階段:Map 階段,Shuffle 階段,Reduce 階段.Map 階段讀取數(shù)據(jù)并處理,Shuffle階段對Map輸出進(jìn)行整合,Reduce階段對任務(wù)進(jìn)行整合、輸出.本節(jié)給出計算三個階段的估計完成時間的過程.
2.2.1 Map階段的作業(yè)估計執(zhí)行時間
Map 通過RecordReader 讀取輸入的鍵值對,將輸入的數(shù)據(jù)進(jìn)行分片,為每個分片構(gòu)建map 任務(wù),map函數(shù)完成自定義的任務(wù)后輸出新的鍵值對,map將輸出的結(jié)果寫入本地磁盤,根據(jù)鍵值進(jìn)行排序.Map階段輸出的是中間結(jié)果,該中間結(jié)果要由Reduce處理后才會輸出到HDFS中.
當(dāng)提交任務(wù)后,定義該作業(yè)的輸入數(shù)據(jù)大小記為S,將輸入數(shù)據(jù)分為n塊等長的小數(shù)據(jù)塊,對于任務(wù)的每一個map任務(wù),將第i個map任務(wù)的輸入數(shù)據(jù)大小定義為εi,則輸入數(shù)據(jù)的大小S可以表示為:
將處理第i個map任務(wù)的時間定義為ti,那么Map階段估計完成時間為:
2.2.2 Shuffle階段的作業(yè)估計執(zhí)行時間
Shuffle階段把所有map任務(wù)輸出的中間結(jié)果中相同鍵值的鍵值對組合在一起,組合后的鍵值對作為輸入傳給reduce函數(shù).Shuffle 階段所傳輸?shù)臄?shù)據(jù)與Reduce階段的輸入數(shù)據(jù)相同,設(shè)該階段所傳輸?shù)臄?shù)據(jù)為δ,傳輸一個數(shù)據(jù)單元需要時間ts,則Shuffle階段所需要的時間為:
2.2.3 Reduce階段的作業(yè)估計執(zhí)行時間
Reduce 階段以Shuffle 的輸出結(jié)果作為輸入,根據(jù)鍵值對中的鍵值進(jìn)行合并、排序.當(dāng)所有的任務(wù)完成后,將輸出結(jié)果寫入到HDFS 中.在Reduce 階段需要處理的數(shù)據(jù)為δ,設(shè)處理第i個reduce 任務(wù)的數(shù)據(jù)單元所需要花費的時間為tri,則Reduce階段所需要的時間為:
因此,完成每個作業(yè)的估計完成時間為:
如式(6)所示,Ti表示每個作業(yè)的平均完成時間,l表示共有l(wèi)個作業(yè)提交到集群,已完成的所有任務(wù)運行時間與作業(yè)個數(shù)的比值可以計算出每個作業(yè)的平均完成時間,每當(dāng)有作業(yè)完成,平均執(zhí)行時間就會重新計算一次并更新數(shù)值,這樣可以使得結(jié)果更加準(zhǔn)確.
當(dāng)Map 階段執(zhí)行到第k個map 任務(wù)時,smap表示為已經(jīng)完成的map 任務(wù)數(shù)據(jù)量,則每個map 任務(wù)平均所需要的時間可以表示為:
得到單個map 任務(wù)的平均完成時間后就可以計算出正在運行的作業(yè)以及即將運行的作業(yè)所需要的時間.m表示目前集群中正在執(zhí)行map任務(wù)數(shù)量,tused表示作業(yè)已運行的時間.則作業(yè)到達(dá)率φ表示為:
算法利用作業(yè)平均完成時間以及作業(yè)到達(dá)率將作業(yè)分為短作業(yè)類以及公平類.每個作業(yè)的估計完成時間為Ti,集群中作業(yè)的平均完成時間為,將Ti與進(jìn)行比較,如果作業(yè)的估計完成時間小于集群作業(yè)的平均完成時間,將該作業(yè)的分類標(biāo)記為短作業(yè)類,即滿足式(9),反之將該作業(yè)的類標(biāo)記為公平類.
Hadoop在分配任務(wù)時,默認(rèn)所有的節(jié)點都是完全相同的,但是在實際的環(huán)境中無法實現(xiàn)這一點,因此就會發(fā)生負(fù)載不均衡的情況,導(dǎo)致資源的浪費.當(dāng)用戶請求資源來執(zhí)行作業(yè)時,系統(tǒng)會立即給予每個作業(yè)最小資源數(shù)量,最小資源數(shù)量不是一個確定的數(shù)值,但是調(diào)度器常用它對資源分配進(jìn)行優(yōu)先排序.在本算法中,調(diào)度器將資源優(yōu)先分配給短作業(yè)類以及其它作業(yè)的最小資源數(shù)量,最后將多余的資源分配給公平類.在公平類的資源分配中,資源總是優(yōu)先分配給本地任務(wù),如果在第一個作業(yè)中沒有找到本地任務(wù),調(diào)度器將繼續(xù)搜索下一個作業(yè),直到找到本地任務(wù)為止,在分配完本地任務(wù)后,將資源分配給剩余的作業(yè).當(dāng)有新作業(yè)到達(dá)時,監(jiān)聽器向調(diào)度算法發(fā)送作業(yè)到達(dá)消息對作業(yè)進(jìn)行處理.當(dāng)出現(xiàn)資源空閑時,監(jiān)聽器向調(diào)度算法發(fā)送資源空閑消息,將合適的作業(yè)分配給資源進(jìn)行處理,這種調(diào)度策略在一定程度上解決了數(shù)據(jù)局部性的問題.
為了驗證提出的改進(jìn)后的算法正確性,實驗搭建4個節(jié)點的異構(gòu)Hadoop集群,Hadoop版本為2.6.4,如表1所示4 個節(jié)點的性能不完全相同,集群設(shè)有1 個NameNode 節(jié)點和3 個DataNode 節(jié)點,1 個ResourceManager節(jié)點和3個NodeManager節(jié)點.集群的總資源量為:內(nèi)存12 GByte,虛擬CPU 12個,磁盤60 GB.本實驗所采取的數(shù)據(jù)集為BigDataBench生成的鍵值對數(shù)據(jù).
表1 節(jié)點參數(shù)表Tab.1 Description of node sets
選取各種類型的Hadoop 基準(zhǔn)程序進(jìn)行測試有利于評估調(diào)度算法的性能.實驗采用WordCount、TeraSort、Grep這3種不同類型的MapReduce任務(wù).WordCount程序?qū)儆谟嬎忝芗停瑢?nèi)存、CPU 以及IO的資源需求較低;TeraSort 程序適用于大量數(shù)據(jù)的處理,對于內(nèi)存、IO 以及CPU 有較高的需求;Grep 程序在進(jìn)行模糊搜索時需要占用大量的CPU資源,對于其他資源的要求較低[14].測試程序類型如表2所示.
表2 測試程序類型Tab.2 Type of test program
本實驗將從2 個方面來衡量改進(jìn)后的算法的性能:任務(wù)執(zhí)行時間以及Shuffle 階段傳輸?shù)臄?shù)據(jù).將Hadoop 自帶的Fair Scheduler 和Capacity Scheduler 與改進(jìn)后的調(diào)度算法作為對比,執(zhí)行不同的作業(yè)類型并多次實驗取實驗結(jié)果的平均值.實驗1結(jié)果如圖1所示.
圖1 任務(wù)執(zhí)行時間Fig.1 Task execution time
由圖1可知,本文算法在執(zhí)行計算密集型以及IO 密集型的程序時執(zhí)行時間有所減少,執(zhí)行效率相對于Fair Scheduler分別提高了12%、20%,相對于Capacity Scheduler分別提高了27%、11%,執(zhí)行效率的提高說明本文算法的作業(yè)以及資源分配更合理.CPU 密集型任務(wù)的執(zhí)行時間相比Capacity Scheduler 有所提高,這是因為在運行過程中動態(tài)資源使用的不確定性以及作業(yè)的初始優(yōu)先級不同.
MapReduce的Shuffle階段跨節(jié)點傳輸數(shù)據(jù)會占用大量的網(wǎng)絡(luò)帶寬,造成網(wǎng)絡(luò)堵塞,影響框架性能.圖2、圖3所示為Shuffle階段跨節(jié)點傳輸數(shù)據(jù)量,由此可知,本文算法在資源需求一般的作業(yè)以及資源需求較大的作中,跨節(jié)點傳輸?shù)臄?shù)據(jù)量均有所減少,證明了本文算法能夠減少數(shù)據(jù)的跨節(jié)點傳輸,提高了數(shù)據(jù)本地性.
圖2 WordCount基準(zhǔn)程序Shuffle階段跨節(jié)點傳輸數(shù)據(jù)量Fig.2 The amount of data transferred across nodes in shuffle phase of wordcount benchmark program
圖3 TeraSort基準(zhǔn)程序Shuffle階段跨節(jié)點傳輸數(shù)據(jù)量Fig.3 The amount of data transferred across nodes in shuffle phase of TeraSort benchmark program
主要研究了Hadoop原生的資源調(diào)度器,提出原有的調(diào)度算法存在數(shù)據(jù)本地性以及負(fù)載均衡的問題.針對以上問題,提出根據(jù)作業(yè)估計執(zhí)行時間將作業(yè)分類,給不同類別的作業(yè)提供資源的改進(jìn)算法,通過與Fair Scheduler和Capacity Scheduler實驗對比,實驗結(jié)果證明本文提出的算法在異構(gòu)環(huán)境下減少了任務(wù)執(zhí)行時間以及數(shù)據(jù)遷移量,提高了資源利用率.在現(xiàn)實的生產(chǎn)環(huán)境中,影響負(fù)載均衡的原因還有很多,如用戶的優(yōu)先級、作業(yè)的優(yōu)先級等,未來的研究可在本研究的基礎(chǔ)上加入更多影響負(fù)載均衡的因素使得集群性能有更大的提高.