• 
    

    
    

      99热精品在线国产_美女午夜性视频免费_国产精品国产高清国产av_av欧美777_自拍偷自拍亚洲精品老妇_亚洲熟女精品中文字幕_www日本黄色视频网_国产精品野战在线观看 ?

      分布式環(huán)境中的多作業(yè)執(zhí)行調(diào)度策略與優(yōu)化*

      2021-06-25 09:46:02季航旭趙宇海王國(guó)仁
      關(guān)鍵詞:算子集群分布式

      季航旭,姜 蘇,趙宇海,吳 剛,王國(guó)仁

      (1.東北大學(xué)計(jì)算機(jī)科學(xué)與工程學(xué)院,遼寧 沈陽 110819;2.北京理工大學(xué)計(jì)算機(jī)學(xué)院,北京 100081)

      1 引言

      隨著信息技術(shù)的快速發(fā)展,各個(gè)領(lǐng)域積累的數(shù)據(jù)量日漸增多。數(shù)據(jù)量的增加以及數(shù)據(jù)挖掘算法的研究與普及,使得人們?cè)絹碓街匾晹?shù)據(jù)中隱含的價(jià)值,因此如何快速地從數(shù)據(jù)中獲取有價(jià)值的信息成為各個(gè)研究領(lǐng)域的關(guān)注點(diǎn)。為了應(yīng)對(duì)快速增長(zhǎng)的數(shù)據(jù),人們開發(fā)出了一代又一代大數(shù)據(jù)處理系統(tǒng)并產(chǎn)生了大量相關(guān)的優(yōu)化技術(shù)。目前比較流行的大數(shù)據(jù)處理系統(tǒng)有Hadoop[1]、Storm[2]、Samza[3]、Spark[4,5]和Flink[6]等,它們都采用分布式集群的方式進(jìn)行平臺(tái)的搭建和系統(tǒng)的部署,并有著各自獨(dú)特的優(yōu)勢(shì)。

      目前,大數(shù)據(jù)計(jì)算系統(tǒng)已經(jīng)普及,基于它們的數(shù)據(jù)查詢和數(shù)據(jù)分析等任務(wù)也日益復(fù)雜化、多樣化,如實(shí)時(shí)智能推薦、復(fù)雜事件處理等。分布式計(jì)算系統(tǒng)經(jīng)常面臨的挑戰(zhàn)是資源分配與作業(yè)調(diào)度,這是分布式環(huán)境與生俱來的問題。由于分布式環(huán)境存在計(jì)算資源異構(gòu)、帶寬異構(gòu)和單個(gè)作業(yè)內(nèi)部計(jì)算方式復(fù)雜等情況,作業(yè)執(zhí)行過程中經(jīng)常出現(xiàn)由于資源分配不合理、調(diào)度優(yōu)化不足導(dǎo)致的效率低、吞吐量低等缺點(diǎn)。更加令人堪憂的是,分布式計(jì)算具有計(jì)算結(jié)點(diǎn)規(guī)模大、計(jì)算任務(wù)復(fù)雜等特點(diǎn),計(jì)算引擎往往要同時(shí)運(yùn)行復(fù)雜繁多的分布式多作業(yè),也就是所謂的分布式多作業(yè)。分布式多作業(yè)相比單作業(yè)在作業(yè)執(zhí)行過程中將更加不利于計(jì)算資源的充分利用,這對(duì)于分布式大數(shù)據(jù)任務(wù)的執(zhí)行將更加雪上加霜。目前,仍然沒有一個(gè)完美的資源分配與管理機(jī)制滿足分布式多作業(yè)的需求,因此資源的合理分配與回收仍然是提升大數(shù)據(jù)處理系統(tǒng)計(jì)算性能的關(guān)鍵。

      現(xiàn)在最常用的大數(shù)據(jù)計(jì)算系統(tǒng)(如Flink、Spark)都是以多層執(zhí)行圖(Graph)的方式表示作業(yè)的具體信息與執(zhí)行過程。多層執(zhí)行圖是計(jì)算系統(tǒng)在作業(yè)提交與作業(yè)執(zhí)行之間生成的一系列有向無環(huán)圖DAG(Directed Acyclic Graph),也是計(jì)算引擎中最核心的數(shù)據(jù)結(jié)構(gòu),它們決定了分布式作業(yè)在每個(gè)節(jié)點(diǎn)上的資源部署。也就是說,分布式任務(wù)的執(zhí)行都是根據(jù)執(zhí)行圖中的信息在每個(gè)節(jié)點(diǎn)上進(jìn)行任務(wù)部署。因此,如何在多作業(yè)執(zhí)行過程中使DAG的合并達(dá)到最優(yōu),以及如何優(yōu)化作業(yè)的提交順序與調(diào)度策略,將是高效執(zhí)行多作業(yè)的重要保障。

      本文通過對(duì)主流的大數(shù)據(jù)處理系統(tǒng)的研究和探索,結(jié)合目前流行的大數(shù)據(jù)處理系統(tǒng)優(yōu)化技術(shù),提出并實(shí)現(xiàn)了在作業(yè)層面上的多作業(yè)合并算法與調(diào)度策略。本文的主要貢獻(xiàn)點(diǎn)在于:

      (1)提出了一種啟發(fā)式作業(yè)合并算法。通過采集到的作業(yè)特征,以作業(yè)并行度為基礎(chǔ)分析DAG結(jié)構(gòu)上的差異性,合并浪費(fèi)資源的作業(yè),釋放占用資源較少的作業(yè)資源,提高集群資源的利用率。

      (2)提出了一種基于負(fù)載均衡的多作業(yè)調(diào)度算法。根據(jù)基于作業(yè)特征的多路K-means聚類算法的分析結(jié)果使用基于負(fù)載均衡的多作業(yè)自平衡輪詢調(diào)度算法提交作業(yè),進(jìn)一步達(dá)到系統(tǒng)負(fù)載均衡。

      (3)使用目前最新一代大數(shù)據(jù)計(jì)算系統(tǒng)Flink對(duì)本文提出的作業(yè)合并算法與多作業(yè)調(diào)度算法的有效性進(jìn)行了驗(yàn)證。結(jié)果表明,2種作業(yè)合并算法都可以減少作業(yè)的運(yùn)行時(shí)間,提高系統(tǒng)吞吐量;基于負(fù)載均衡的多作業(yè)調(diào)度算法在最好情況下可減少40%的線程啟動(dòng)數(shù)。

      2 相關(guān)工作

      2.1 DAG計(jì)算模型

      DAG是分布式計(jì)算領(lǐng)域中很常見的一種數(shù)據(jù)結(jié)構(gòu),通常由一系列用戶自定義的算子組成,在各種大數(shù)據(jù)處理系統(tǒng)中都能發(fā)現(xiàn)它的身影,比如Storm、Spark和Flink等。DAG計(jì)算將計(jì)算任務(wù)分解成為若干個(gè)子任務(wù)[7],并將這些子任務(wù)之間的邏輯關(guān)系或順序構(gòu)建成DAG結(jié)構(gòu)。大數(shù)據(jù)計(jì)算引擎中的DAG計(jì)算通??梢猿橄鬄?層結(jié)構(gòu):應(yīng)用表達(dá)層、執(zhí)行引擎層和物理執(zhí)行層。應(yīng)用表達(dá)層位于最上層,定義相關(guān)算子和轉(zhuǎn)換,將計(jì)算任務(wù)分解成由若干子任務(wù)形成的DAG結(jié)構(gòu),其優(yōu)點(diǎn)是表達(dá)的便捷性,便于開發(fā)者快速描述或構(gòu)建大數(shù)據(jù)應(yīng)用。執(zhí)行引擎層介于應(yīng)用表達(dá)層和物理執(zhí)行層之間,將應(yīng)用表達(dá)層構(gòu)建的DAG計(jì)劃任務(wù)通過轉(zhuǎn)換和映射,部署到下層的物理機(jī)集群中運(yùn)行,任務(wù)的調(diào)度[8]、底層的容錯(cuò)恢復(fù)機(jī)制、數(shù)據(jù)與集群信息的傳遞等都要依賴執(zhí)行引擎層。下層是物理執(zhí)行層,即物理集群。

      2.2 Flink中的DAG

      Flink是Apache 開發(fā)的一個(gè)同時(shí)用于處理批數(shù)據(jù)和流數(shù)據(jù)的有狀態(tài)的計(jì)算框架和分布式處理引擎。Flink使用4層DAG結(jié)構(gòu)來描述和表達(dá)作業(yè)的執(zhí)行流程,每一層都對(duì)作業(yè)執(zhí)行流程做了不同程度的封裝、優(yōu)化和相關(guān)屬性的配置。DAG結(jié)構(gòu)是Flink作業(yè)執(zhí)行和部署的核心,主要包含數(shù)據(jù)流圖(StreamGraph)、作業(yè)圖(JobGraph)、執(zhí)行圖(ExecutionGraph)和物理執(zhí)行圖,F(xiàn)link正是通過這4層圖結(jié)構(gòu)把整個(gè)作業(yè)的優(yōu)化、資源分配和算子部署進(jìn)行分離。Flink的4層DAG結(jié)構(gòu)如圖1所示。

      Figure 1 Four-layer DAG structure of Flink

      圖1中,數(shù)據(jù)流圖是用戶通過API接口編寫的、用來表達(dá)用戶所要執(zhí)行的計(jì)劃任務(wù)的邏輯結(jié)構(gòu)。作業(yè)圖是在數(shù)據(jù)流圖的基礎(chǔ)上進(jìn)行優(yōu)化以及調(diào)整各種參數(shù)配置后的數(shù)據(jù)結(jié)構(gòu),它裹挾著作業(yè)運(yùn)行期間所需的必要信息。這些信息被客戶端提交到集群中的協(xié)調(diào)中心,即作業(yè)管理器(JobManager)。執(zhí)行圖可以被視作并行化的作業(yè)圖,當(dāng)接收到一個(gè)新的作業(yè)圖時(shí),會(huì)把其中的每一個(gè)算子按照其并行度轉(zhuǎn)化成多個(gè)可實(shí)際部署的子任務(wù)(在執(zhí)行圖中以Execution表示)。當(dāng)執(zhí)行圖中的一系列子任務(wù)真正在從結(jié)點(diǎn)機(jī)器上運(yùn)行的時(shí)候,才會(huì)構(gòu)成物理執(zhí)行圖。

      2.3 多作業(yè)執(zhí)行與調(diào)度研究現(xiàn)狀

      目前最流行的大數(shù)據(jù)處理平臺(tái)默認(rèn)情況下都以FIFO的形式調(diào)度作業(yè)。Wang等[9]為了解決在虛擬化云環(huán)境中同時(shí)運(yùn)行的多個(gè)作業(yè)之間的干擾問題,開發(fā)了數(shù)據(jù)驅(qū)動(dòng)分析模型,估計(jì)多個(gè)作業(yè)之間的干擾對(duì)作業(yè)執(zhí)行時(shí)間的影響,并為此提出了一種干擾感知作業(yè)調(diào)度算法。黃廷輝等[10]通過對(duì)分布式系統(tǒng)關(guān)鍵技術(shù)的分析,得出I/O和CPU的不匹配是影響計(jì)算性能的一個(gè)重要因素,提出合并文件的運(yùn)行方式,可以減少緩存文件的數(shù)量,提高I/O效率,不過仍存在內(nèi)存成本高的弊端。

      Flink系統(tǒng)中資源是按處理槽(Slot)進(jìn)行劃分的,支持多種已有的成熟的資源管理器,例如Yarn和Mesos等。Storm作為曾經(jīng)最流行的流式大數(shù)據(jù)處理系統(tǒng),默認(rèn)是采用輪詢的調(diào)度方式管理作業(yè)的[11]。Qian等[12]為了解決Storm集群中擴(kuò)展更多新計(jì)算機(jī)時(shí)帶來的負(fù)載不均衡問題,設(shè)計(jì)了S-Storm,為負(fù)載均衡群集中均勻分配Slot??傊?,目前的分布式作業(yè)調(diào)度算法和資源分配算法都是基于作業(yè)對(duì)資源的需求或者集群中結(jié)點(diǎn)資源的使用情況,進(jìn)行作業(yè)的調(diào)度和資源的分配的,它們面向的是單個(gè)作業(yè),并沒有考慮作業(yè)間的關(guān)系對(duì)集群性能的影響。

      3 基本概念

      一個(gè)復(fù)雜的DAG通常由多種類型的算子組成,有些算子只涉及本地運(yùn)算,它們以內(nèi)存共享的方式傳輸數(shù)據(jù),運(yùn)行效率高,給系統(tǒng)增加的負(fù)載小。也有些算子會(huì)通過網(wǎng)絡(luò)協(xié)議棧傳輸數(shù)據(jù),除了網(wǎng)絡(luò)本身的不可靠性會(huì)增加延遲,還有網(wǎng)絡(luò)緩沖數(shù)據(jù)、序列化、反序列化和用戶態(tài)/內(nèi)核態(tài)之間的切換等耗時(shí)操作持續(xù)地占用系統(tǒng)資源。為了便于描述,本文定義了全局算子和本地算子這2個(gè)概念。

      定義1(全局算子) 全局算子指在分布式集群中,需要從其他結(jié)點(diǎn)獲取數(shù)據(jù)進(jìn)行處理的算子,如Join和Reduce等。

      定義2(本地算子) 本地算子指在分布式集群環(huán)境中,不需要從其他結(jié)點(diǎn)獲取數(shù)據(jù),只對(duì)本地?cái)?shù)據(jù)進(jìn)行處理的算子,如Filter、Map和FlatMap等。

      本文在研究作業(yè)合并和作業(yè)調(diào)度時(shí)需要提取DAG的相關(guān)特征量,計(jì)算作業(yè)之間的差異性并通過聚類算法對(duì)作業(yè)進(jìn)行區(qū)分。聚類算法是一種經(jīng)典的群分析方法[13],它以數(shù)據(jù)間距度量數(shù)據(jù)相似性[14],把相似的數(shù)據(jù)集中到一起,是一種發(fā)現(xiàn)數(shù)據(jù)集內(nèi)部結(jié)構(gòu)特征的無監(jiān)督學(xué)習(xí)算法[15]。聚類算法按聚類思想可以分為:劃分法聚類、密度法聚類[16]、圖論聚類法[17]和網(wǎng)格法聚類等。

      本文采用的K-means算法是一種典型的劃分聚類法,其思想是預(yù)先指定聚類數(shù)目和聚類中心,計(jì)算點(diǎn)與點(diǎn)之間的距離,把每一個(gè)點(diǎn)歸類到與其距離最近的聚類中心。距離的度量方式很多,本文使用歐氏距離(式(1))和曼哈頓距離(式(2))相結(jié)合的方式度量作業(yè)之間的距離,其中n為樣本點(diǎn)維度。

      (1)

      (2)

      歐氏距離從幾何空間的角度衡量元素間的距離,常用于測(cè)量度量標(biāo)準(zhǔn)一樣的數(shù)據(jù)間的距離;曼哈頓距離用來計(jì)算數(shù)據(jù)在多維屬性上的差之和,可以減弱離群數(shù)據(jù)帶來的影響。

      4 基于啟發(fā)的作業(yè)合并算法

      本節(jié)詳細(xì)介紹基于啟發(fā)的作業(yè)合并算法。首先對(duì)作業(yè)進(jìn)行分析,解析作業(yè)的DAG圖,以及作業(yè)任務(wù)量與作業(yè)分配到的內(nèi)存資源之間的關(guān)系;然后分別采用基于并行度的作業(yè)合并算法和基于DAG結(jié)構(gòu)差異性的作業(yè)合并算法,對(duì)占用系統(tǒng)內(nèi)存資源較多的作業(yè)進(jìn)行合并,從而提高系統(tǒng)的吞吐量。

      4.1 作業(yè)相關(guān)特征的提取

      本文采用廣度優(yōu)先遍歷的方式提取作業(yè)執(zhí)行圖中相關(guān)的信息,一個(gè)典型的作業(yè)執(zhí)行圖如圖2所示,主要包含以下信息:數(shù)據(jù)源文件路徑、作業(yè)并行度和算子總數(shù)等。

      Figure 2 Job execution graph

      處理的數(shù)據(jù)量和作業(yè)分配到的內(nèi)存資源需要通過計(jì)算獲得。算法根據(jù)文件路徑信息訪問文件大小,從系統(tǒng)配置文件中讀取為Slot分配的內(nèi)存大小。作業(yè)的分類貫穿于信息采集過程,算法根據(jù)數(shù)據(jù)來源、文件大小、作業(yè)分配到的內(nèi)存資源大小和作業(yè)的執(zhí)行邏輯將作業(yè)分為可合并型作業(yè)與不可合并型作業(yè)。在作業(yè)執(zhí)行流的遍歷過程中,算法以矩陣結(jié)構(gòu)存儲(chǔ)頂點(diǎn)間的連接信息,元素值的大小表示算子間的連接數(shù)。表1是對(duì)圖2的信息提取。

      Table 1 Statistics of the number of connections between operators

      4.2 基于并行度的作業(yè)合并算法

      并行度決定了作業(yè)在執(zhí)行時(shí)所占集群內(nèi)存資源的總量,且和集群中的Slot是對(duì)應(yīng)的,意味著并行度相同的作業(yè)將分配到相同大小的內(nèi)存資源。因此,對(duì)于沒有充分占用內(nèi)存資源的作業(yè),合并并行度相同的作業(yè),可使2個(gè)作業(yè)共用1個(gè)作業(yè)的內(nèi)存資源,同時(shí)不會(huì)對(duì)作業(yè)執(zhí)行邏輯造成影響。

      影響作業(yè)執(zhí)行的因素有很多,定義3~定義5的3個(gè)度量:任務(wù)量大小比值(F)、DAG最大深度比值(D)和DAG全局算子數(shù)比值(G),決定作業(yè)的特征。

      定義3(任務(wù)量大小比值(F)) 任務(wù)量大小比值是表示2個(gè)作業(yè)處理任務(wù)量大小差異性的重要指標(biāo)之一,其計(jì)算如式(3)所示:

      (3)

      其中,x和y分別表示2個(gè)作業(yè)所處理的數(shù)據(jù)集數(shù)量,wf_mi、wf_mj分別表示2個(gè)不同作業(yè)處理的文件集合中單個(gè)文件的大小。通過實(shí)驗(yàn)得知,F(xiàn)的閾值取值為[0.5,2]。

      定義4(DAG最大深度比值(D)) 表示2個(gè)作業(yè)的執(zhí)行圖中最長(zhǎng)算子鏈長(zhǎng)度的比值,它是反映2個(gè)作業(yè)DAG差異性最明顯的指標(biāo),其計(jì)算如式(4)所示:

      (4)

      其中,dept_m和dept_n分別表示2個(gè)作業(yè)執(zhí)行圖的最大深度。DAG深度越大的作業(yè)執(zhí)行時(shí)間越長(zhǎng),因此合并后的作業(yè)在數(shù)據(jù)量相當(dāng)?shù)那闆r下,其執(zhí)行時(shí)間取決于合并前DAG深度較大的作業(yè)。D的閾值取值為[0.5,2]。

      定義5(DAG全局算子數(shù)比值(G)) 表示2個(gè)作業(yè)圖在全局算子數(shù)量上的差異。全局算子和數(shù)據(jù)傳輸緊密相關(guān),是影響作業(yè)執(zhí)行速度的重要指標(biāo)之一,體現(xiàn)2個(gè)作業(yè)在傳輸上的差異。其計(jì)算如式(5)所示:

      (5)

      其中,G表示2個(gè)并行度相同的作業(yè)的全局算子數(shù)的比值,gol_m和gol_n分別表示2個(gè)作業(yè)中全局算子的個(gè)數(shù)。DAG中全局算子的個(gè)數(shù)越多,執(zhí)行時(shí)間越長(zhǎng)。通過實(shí)驗(yàn)得知,G的閾值取值為[0.5,2]。

      基于并行度的作業(yè)合并算法執(zhí)行過程如算法1所示。

      算法1基于并行度的作業(yè)合并算法

      輸入:待合并作業(yè)j;不包含j的待合并作業(yè)集合Jobs。

      輸出:合并后的作業(yè)mergeJob。

      1.forjobinJobsdo

      2.ifjob.parallelism==j.parallelism

      3.計(jì)算j與job任務(wù)量比值F;

      4.ifF∈[0.5,2]do

      5. 計(jì)算j與job的DAG圖最大深度比值D;

      6.ifD∈[0.5,2]do

      7. 計(jì)算j與job的全局算子的比值G;

      8.endif

      9.ifG∈[0.5,2]do

      10.mergeJob=merge(j,job);

      11.removejobfromJobs,returnmergeJob;

      12.endif

      13.endif

      14.endif

      15.endfor

      (1)首先從待合并作業(yè)緩沖池的作業(yè)集中取出一個(gè)作業(yè)j,然后遍歷Jobs,從中取出一個(gè)與j并行度相同的作業(yè)job。

      (2)使用3個(gè)度量值衡量作業(yè)job與j的匹配程度,如果job與j在上述3個(gè)比值上都能落到對(duì)應(yīng)的閾值空間,兩者匹配,調(diào)用merge函數(shù)合并job與j,返回合并后的結(jié)果,終止循環(huán);否則繼續(xù)循環(huán)。

      (3)循環(huán)結(jié)束后,檢查mergeJob的值是否為空,如果mergeJob的值為空,說明Jobs中沒有與j并行度相同并且符合3個(gè)條件的job,那么j會(huì)轉(zhuǎn)而參與基于DAG圖結(jié)構(gòu)差異性的作業(yè)合并計(jì)算。

      4.3 基于DAG結(jié)構(gòu)差異性的作業(yè)合并算法

      對(duì)于作業(yè)緩沖池中剩余的由于F、D、G取值落在閾值空間以外而無法合并的作業(yè),采用基于DAG結(jié)構(gòu)差異性的作業(yè)合并算法處理。

      算法以DAG結(jié)構(gòu)差異性為切入點(diǎn),Slot只隔離內(nèi)存資源,因此為了避免作業(yè)對(duì)CPU資源的爭(zhēng)搶,盡量選擇異構(gòu)程度高的作業(yè)進(jìn)行合并。算法增加2個(gè)度量為基于DAG結(jié)構(gòu)差異性的作業(yè)合并算法提供支持。

      定義6(作業(yè)并行度比值(P)) 作業(yè)并行度是作業(yè)最明顯的特征之一,并行度比值是衡量2個(gè)作業(yè)在并行度上的差異最明顯的指標(biāo)。其計(jì)算如式(6)所示:

      (6)

      其中,P表示2個(gè)作業(yè)并行度的比值,parallelism_m和parallelism_n表示2個(gè)作業(yè)的并行度。并行度是對(duì)應(yīng)于集群中的Slot數(shù)量,因此基于DAG的作業(yè)合并算法在合并作業(yè)時(shí)首先需要考慮的就是作業(yè)并行度。通過實(shí)驗(yàn)得知,P的閾值取值為[0.5,2]。

      定義7(DAG結(jié)構(gòu)相似性(S)) DAG結(jié)構(gòu)相似性反映2個(gè)作業(yè)在執(zhí)行邏輯上的差異,以歐氏距離為基礎(chǔ)定義了DAG結(jié)構(gòu)相似性,其計(jì)算如式(7)所示:

      (7)

      其中,o表示算子的數(shù)量。

      在特征提取過程中使用矩陣保存作業(yè)執(zhí)行流程圖的基本信息,M和N分別表示存儲(chǔ)作業(yè)執(zhí)行流程圖基本信息的矩陣,Mij和Nij分別表示矩陣中的元素。算法執(zhí)行過程如算法2所示。

      算法2基于DAG結(jié)構(gòu)差異性的作業(yè)合并算法

      輸入:待合并作業(yè)j;不包含j的待合并作業(yè)集合Jobs。

      輸出:合并后的作業(yè)mergeJob。

      1.按并行度大小給Jobs中的作業(yè)從小到大排序

      2.中間作業(yè)集合為midJobs;

      3.forjobinJobsdo

      4. 計(jì)算j與job任務(wù)量比值F;

      5.ifF∈[0.5,2]do

      6. 計(jì)算j與job的DAG圖最大深度比值D

      7.ifD∈[0.5,2]do

      8. 計(jì)算j與job的全局算子的比值G

      9.ifG∈[0.5,2]do

      10. 計(jì)算j與job并行度比值P

      11.ifP∈[0.5,2]do

      12. addjobtomidJobs

      13.endif

      14.endif

      15.endif

      16.endif

      17.endfor

      18.forjobinmidJobs

      19.計(jì)算j與jobDAG圖矩陣間的歐氏距離U;

      20.更新U獲取最小值,并記錄相應(yīng)的job;

      21.endfor

      22.mergeJob=merge(j,job)

      23.returnmergeJob

      (1)從待合并作業(yè)中取出一個(gè)作業(yè)j,然后遍歷Jobs,獲取一個(gè)與j并行度相同的作業(yè)job;

      (2)在循環(huán)中使用4個(gè)度量值衡量作業(yè)job與作業(yè)j的匹配程度,如果符合對(duì)應(yīng)的閾值空間,則把作業(yè)job加入到中間作業(yè)集midJobs中;

      (3)遍歷中間作業(yè)集合midJobs,使用歐氏距離從中間數(shù)據(jù)集合中選出與作業(yè)j在歐氏距離上相似度最小的作業(yè)job,合并作業(yè)j與job并返回結(jié)果。

      5 基于負(fù)載均衡的多作業(yè)調(diào)度算法

      除了作業(yè)合并之外,作業(yè)的執(zhí)行順序與調(diào)度策略也是影響多作業(yè)執(zhí)行效率的重要因素。因此,本文提出基于負(fù)載均衡的多作業(yè)調(diào)度算法,其由3部分組成:

      (1)預(yù)處理模塊:進(jìn)行相關(guān)特征的提取工作,包括作業(yè)并行度、算子個(gè)數(shù)和算子類型等;(2)分類模塊:采用K-means聚類算法根據(jù)提取的特征信息對(duì)作業(yè)進(jìn)行聚類分析,聚類算法在負(fù)載均衡方面應(yīng)用廣泛[18,19],經(jīng)過聚類把作業(yè)分成3個(gè)類別:大作業(yè)、中等作業(yè)和小作業(yè);(3)調(diào)度模塊:調(diào)度模塊根據(jù)聚類結(jié)果,使用自平衡輪詢調(diào)度算法計(jì)算作業(yè)的提交順序,同時(shí)充分利用集群的Slot資源,防止Slot閑置。

      5.1 作業(yè)相關(guān)特征的提取

      基于負(fù)載均衡的多作業(yè)調(diào)度算法主要使用作業(yè)并行度、算子總數(shù)、各類型算子個(gè)數(shù)和作業(yè)圖深度為參考,通過遍歷對(duì)信息進(jìn)行采集。該算法執(zhí)行過程如算法3所示。

      算法3DAG特征提取算法

      輸入:作業(yè)DAG結(jié)構(gòu)圖Plan。

      輸出:提取到的信息集合infoList。

      1.fornodeinPlando

      2.max=Math.max(max,BFS(node));

      3.ifnodeis not visited

      4. add node’s characters toinfoList,node.visited=true;

      5.node相連接的未被訪問的頂點(diǎn)入隊(duì)列Q;

      6.whileQis not emptydo

      7.v=Q頭元素出隊(duì)列;

      8.addv’s characters toinfoList,v.visited= true;

      9.v相連接的未被訪問的頂點(diǎn)入隊(duì)列Q;

      10.endwhile

      11.endif

      12.infoList.max=max

      13.endfor

      14.returninfoList

      (1)使用深度優(yōu)先搜索DFS(Depth First Search)計(jì)算從Sink算子到距離最遠(yuǎn)的Source算子的距離,并記錄在max中;如果node頂點(diǎn)未被訪問過,將頂點(diǎn)信息存入infoList中。

      (2)將與node頂點(diǎn)相連的頂點(diǎn)加入隊(duì)列Q,如果Q不為空,從Q中取出一個(gè)頂點(diǎn)v,將v的信息記錄到infoList中,與v相連的未訪問過的頂點(diǎn)加入隊(duì)列。

      (3)更新infoList中的DAG深度,for循環(huán)直到遍歷完P(guān)lan中的所有頂點(diǎn),返回infoList。

      5.2 基于作業(yè)特征的多路聚類分析

      聚類分析模塊將根據(jù)特征信息對(duì)作業(yè)進(jìn)行分類,使用4種數(shù)據(jù)度量作業(yè)之間的相似性,分別是作業(yè)并行度、各類算子個(gè)數(shù)、作業(yè)執(zhí)行流程圖深度和全局算子的個(gè)數(shù)。算法采用歐氏距離與曼哈頓距離相結(jié)合的方式測(cè)量作業(yè)間的距離。ope[i]是以數(shù)組的形式存儲(chǔ),dept、全局算子ops的大小是衡量作業(yè)流程復(fù)雜性的度量標(biāo)準(zhǔn)。

      定義8(作業(yè)在不同算子類型上的差異性) 算子及算子類型最能區(qū)分作業(yè)的不同,算子類型的差異性反映了作業(yè)的總體差異性,其計(jì)算如式(8)所示:

      (8)

      其中,mope[i]與nope[i]分別為作業(yè)m與作業(yè)n的不同類型的算子的個(gè)數(shù)。

      定義9(作業(yè)在DAG結(jié)構(gòu)深度上的差異性) DAG結(jié)構(gòu)深度是作業(yè)最明顯的特征之一,它描述了作業(yè)運(yùn)行時(shí)數(shù)據(jù)流通的最大路徑,其計(jì)算如式(9)所示:

      distancedept(m,n)=|mdept-ndept|

      (9)

      其中,mdept與ndept分別為作業(yè)m與作業(yè)n的DAG結(jié)構(gòu)深度。

      定義10(作業(yè)在Task線程數(shù)上的差異性) 作業(yè)在集群中開啟的線程數(shù)直接反映作業(yè)對(duì)系統(tǒng)CPU資源的占用量,作業(yè)在Task線程數(shù)上的差異性計(jì)算如式(10)所示:

      distancetasknum(m,n)=|mpara*mops-npara*nops|

      (10)

      其中,mpara與npara分別為作業(yè)m與作業(yè)n的并行度,mops與nops分別為作業(yè)m與作業(yè)n的全局算子數(shù)量。

      定義11(作業(yè)的差異性) 本文從3個(gè)角度衡量了作業(yè)之間的差異性,其計(jì)算如式(11)所示:

      distance(m,n)=distanceope(m,n)+distancedept(m,n)+distancetasknum(m,n)

      (11)

      本文提出的基于作業(yè)特征的多路K-means聚類分析算法如算法4所示。

      算法4基于作業(yè)特征的多路K-means聚類分析算法

      輸入:作業(yè)及其特征集合PlanList。

      輸出:聚類結(jié)果ClusterResult。

      1. 根據(jù)并行度乘以算子總數(shù)的大小對(duì)PlanList進(jìn)行排序;

      2. 獲取初始聚類中心點(diǎn)集合;

      3.fori=1 to 3do

      4.center_i=K_means(PlanList,center_i);

      5.endfor

      6.fori=1 to 3do

      7. 計(jì)算每個(gè)聚類中心點(diǎn)將PlanList劃分的程度;

      8.endfor

      9.center=K_means(PlanList,center);

      10.根據(jù)center將PlanList分組并放入ClusterResult中

      11.returnClusterResult

      (1)對(duì)作業(yè)及其特征集合PlanList按并行度乘以算子總數(shù)大小進(jìn)行排序。

      (2)從排好序的PlanList中選擇3個(gè)作業(yè)作為聚類中心;以排好序的PlanList的隊(duì)列頭作業(yè)、隊(duì)列尾作業(yè)和中間作業(yè)作為聚類中心;從排好序的PlanLsit中分別取隊(duì)列頭3個(gè)作業(yè)、隊(duì)列中間3個(gè)作業(yè)、隊(duì)列尾部3個(gè)作業(yè),取其平均值作為聚類中心。

      (3)調(diào)用K_means算法循環(huán)更新每個(gè)聚類中心的值;計(jì)算每個(gè)聚類中心將PlanList劃分的程度,劃分程度度量標(biāo)準(zhǔn)為,聚類結(jié)果每類作業(yè)的數(shù)量越平均越好。選取聚類結(jié)果好的2個(gè)聚類中心取其平均值,調(diào)用K_means算法進(jìn)行最后聚類;計(jì)算聚類結(jié)果,并輸出結(jié)果。

      5.3 基于負(fù)載均衡的多作業(yè)自平衡輪詢調(diào)度

      通過多路聚類的方式優(yōu)化了聚類中心點(diǎn)的選取,通過基于作業(yè)特征的多路K-means聚類分析可以把作業(yè)集合根據(jù)聚類中心點(diǎn)聚集成3個(gè)作業(yè)類別,為算法提供可靠的支持。

      本文以輪詢調(diào)度法[20 - 23]為基礎(chǔ)實(shí)現(xiàn)了多作業(yè)的提交優(yōu)化,目的是在不浪費(fèi)集群Slot資源的情況下,使集群開啟的Task線程數(shù)量保持平穩(wěn),以此達(dá)到在多作業(yè)情況下平衡集群性能的目的。集群中作業(yè)工作的線程數(shù)量是由作業(yè)并行度和算子個(gè)數(shù)決定的,因此控制作業(yè)的提交順序,可以達(dá)到控制集群開啟的Task線程數(shù)量的目的。作業(yè)能否提交成功取決于集群剩余并行度是否滿足作業(yè)的并行度需求,如果作業(yè)的并行度比集群中可用并行度大,作業(yè)就會(huì)被拒絕,因此輪詢的作業(yè)提交方式并不會(huì)嚴(yán)格執(zhí)行,而且集群空閑的Slot資源會(huì)隨著作業(yè)的提交和結(jié)束動(dòng)態(tài)地變化。針對(duì)這種情況本文設(shè)計(jì)了自平衡的輪詢調(diào)度算法,如算法5所示。

      算法5基于負(fù)載均衡的多作業(yè)自平衡輪詢調(diào)度算法

      輸入:聚類結(jié)果ClusterResult;最后的聚類中心center。

      輸出:下一個(gè)提交的作業(yè)Job。

      1. 對(duì)K-means聚類結(jié)果收集排序;

      2. 平分排好序的作業(yè)到3個(gè)隊(duì)列中,并設(shè)置指針p;

      3.翻轉(zhuǎn)隊(duì)列midQueue、minQueue,查詢集群剩余Slot;

      4.ifslotNum> 0do

      5.ifjobNum> 0do

      6.pre=p;queue=Queue[p];

      7.whilequeueis not emptydo

      8.max= 0;

      9.foreleminqueuedo

      10.ifelem.parallelism≤slotNumdo

      11.ifmax

      12.job=elem;max=elem.parallelism;

      13.endif

      14.endif

      15.endfor

      16.endwhile

      17.ifmax!= 0do

      18.p=(p++)%3;

      19.endif

      20.ifmax== 0do

      21.p=(p++)%3;

      22.ifp==predo返回 4;

      23.endif

      24. 返回 7;

      25.endif

      26.endif

      27.endif

      (1)對(duì)K-means聚類產(chǎn)生的3個(gè)集合中的元素按元素距離聚類中心點(diǎn)的距離大小進(jìn)行排序;比較3個(gè)聚類中心點(diǎn)的大小,按聚類中心點(diǎn)的大小,從大到小合并3個(gè)排好序的作業(yè)集合。

      (2)將合并后的集合平均分成3份,并放入3個(gè)隊(duì)列中,將midQueue和minQueue隊(duì)列進(jìn)行逆轉(zhuǎn)。

      (3)每隔5 s查詢一次集群剩余Slot資源,從指針指向的隊(duì)列開始,遍歷隊(duì)列中的元素找到集群中空閑Slot資源能滿足的最大并行度的作業(yè)提交。每次提交作業(yè)后,修改指針指向下一隊(duì)列。

      (4)對(duì)3個(gè)集合進(jìn)行判斷,如果出現(xiàn)隊(duì)列為空,并且總作業(yè)的數(shù)量大于2,按順序收集3個(gè)集合中的隊(duì)列,再平分所有的作業(yè)到3個(gè)集合中,并更改指針使其指向midQueue,否則不再進(jìn)行作業(yè)收集。

      6 實(shí)驗(yàn)

      本文使用2種類型的作業(yè)來進(jìn)行對(duì)比實(shí)驗(yàn),一種是單詞統(tǒng)計(jì)(WordCount),另一種是表連接(Join)。因?yàn)槿炙阕又凶顝?fù)雜的算子就是Join類型算子,其他簡(jiǎn)單類型的算子使用最多的是Filter、Map和FlatMap,因此WordCount作業(yè)和Join作業(yè)足以覆蓋實(shí)際應(yīng)用中的大部分場(chǎng)景。

      6.1 數(shù)據(jù)集與評(píng)估指標(biāo)

      本文實(shí)驗(yàn)采用大數(shù)據(jù)測(cè)試基準(zhǔn)TPC-H生成的數(shù)據(jù)集,是事務(wù)性能管理委員會(huì)TPC(Transaction Processing Performance Council)發(fā)布的權(quán)威數(shù)據(jù)庫(kù)評(píng)測(cè)基準(zhǔn),可以保證生成的模擬數(shù)據(jù)具有真實(shí)性、客觀性與健壯性。在WordCount實(shí)驗(yàn)中本文選用5個(gè)基本的大數(shù)據(jù)集來模擬批處理環(huán)境中的大規(guī)模數(shù)據(jù)處理。在表連接實(shí)驗(yàn)中,本文選取TPC-H生成的Lineitem表和Orders表作為數(shù)據(jù)源,其中Lineitem有16個(gè)字段,前3個(gè)字段Orderkey、Partkey和Suppkey是主鍵。Orders表有9個(gè)字段,前2個(gè)字段Orderkey和Custkey是主鍵。

      實(shí)驗(yàn)的評(píng)估指標(biāo)有3個(gè):

      (1)作業(yè)運(yùn)行時(shí)間:在相同硬件條件下,任務(wù)量相同、處理邏輯相同的作業(yè)處理速度越快,表明系統(tǒng)性能越好。

      (2)作業(yè)吞吐量:?jiǎn)挝粫r(shí)間內(nèi)集群處理的平均數(shù)據(jù)量大小,即被處理的總數(shù)量(totalDataVolume)與運(yùn)行總時(shí)間(totalProcessTime)的比值,其定義如式(12)所示:

      (12)

      (3)集群開啟的最高Task線程數(shù):本文提出的基于負(fù)載均衡的多作業(yè)調(diào)度算法以降低集群同一時(shí)刻開啟的最高Task線程數(shù)為首要目標(biāo)。

      6.2 實(shí)驗(yàn)環(huán)境設(shè)置

      本文所描述的相關(guān)技術(shù)細(xì)節(jié)均在Flink 1.8.0版本中進(jìn)行實(shí)現(xiàn),實(shí)驗(yàn)運(yùn)行的軟硬件環(huán)境如下所示:

      (1)硬件環(huán)境:采用的分布式環(huán)境由4臺(tái)服務(wù)器組成,1臺(tái)主結(jié)點(diǎn),3臺(tái)從結(jié)點(diǎn),結(jié)點(diǎn)之間通過千兆以太網(wǎng)連接。配置為:CPU:Intel Xeon E5-2603 V4 *2,核心數(shù)目:6核心;內(nèi)存:128 GB(從結(jié)點(diǎn)64 GB);硬盤:400 GB SSD。

      (2)軟件環(huán)境:操作系統(tǒng):CentOs 7;Flink版本:1.8.0,JDK版本:1.8.0;存儲(chǔ)環(huán)境:Hadoop 2.7.5。

      6.3 實(shí)驗(yàn)結(jié)果與分析

      (1)基于并行度的作業(yè)合并算法實(shí)驗(yàn)。

      作業(yè)合并算法實(shí)驗(yàn)對(duì)一對(duì)相同的WordCount作業(yè)和一對(duì)不同的Join作業(yè)分別進(jìn)行順序執(zhí)行和合并執(zhí)行。表2展示了作業(yè)的基本信息。

      Table 2 Job sets information for experiment 1

      圖3對(duì)比了2個(gè)WordCount作業(yè)在相同實(shí)驗(yàn)環(huán)境、相同數(shù)據(jù)集上順序執(zhí)行和合并執(zhí)行的執(zhí)行結(jié)果。其中圖3a對(duì)比了執(zhí)行時(shí)間,合并執(zhí)行的執(zhí)行時(shí)間減少了5%~23%。在內(nèi)存資源足夠使用的前提下,單個(gè)WordCount程序?qū)篊PU的利用沒有達(dá)到時(shí)刻滿負(fù)荷運(yùn)行的狀態(tài),所以作業(yè)合并不僅能提高集群的內(nèi)存資源利用,也能提升集群CPU資源的利用。圖3b對(duì)比了吞吐量,采用了作業(yè)合并算法后系統(tǒng)可以更快地到達(dá)吞吐量峰值。

      Figure 3 Results of WordCount job merging based on the number of parallelism

      圖4對(duì)比了Join1和Join2作業(yè)在相同實(shí)驗(yàn)環(huán)境、相同數(shù)據(jù)集上順序執(zhí)行和合并執(zhí)行的執(zhí)行結(jié)果。其中圖4a對(duì)比了運(yùn)行時(shí)間,圖4b對(duì)比了系統(tǒng)吞吐量。盡管效果不如WordCount作業(yè)明顯,但基于并行度的作業(yè)合并算法對(duì)運(yùn)行時(shí)間仍有一定縮減,吞吐量提升效果在4.5%~20%。

      Figure 4 Results of Join job merging based on the number of parallelism

      (2)基于DAG結(jié)構(gòu)差異的作業(yè)合并算法實(shí)驗(yàn)。

      實(shí)驗(yàn)先后提交了2個(gè)并行度不同的WordCount作業(yè)和Join作業(yè),來模擬基于DAG結(jié)構(gòu)差異性的作業(yè)合并。

      圖5和圖6從運(yùn)行時(shí)間和吞吐量2個(gè)方面展示了作業(yè)合并算法的提升效果。合并執(zhí)行的執(zhí)行時(shí)間明顯低于順序執(zhí)行的總時(shí)間,并且差距明顯,因?yàn)楸緦?shí)驗(yàn)不是在滿并行度的條件下進(jìn)行的,實(shí)際執(zhí)行時(shí)可能會(huì)出現(xiàn)不同情況,對(duì)于WordCount作業(yè),基于DAG結(jié)構(gòu)差異性的作業(yè)合并算法具有明顯的優(yōu)勢(shì)。

      Figure 5 Results of WordCount job merging based on DAG structure difference

      Figure 6 Results of Join job merging based on DAG structure difference

      (3)基于負(fù)載均衡的多作業(yè)調(diào)度算法實(shí)驗(yàn)

      對(duì)于多作業(yè)調(diào)度算法,實(shí)驗(yàn)以4個(gè)作業(yè)為基礎(chǔ),表3列出了作業(yè)算子的基本信息,這些作業(yè)特征信息是衡量作業(yè)之間差異性的關(guān)鍵。實(shí)驗(yàn)?zāi)M了7個(gè)任務(wù)量大小不同的作業(yè),采用隨機(jī)的方式模擬了作業(yè)的提交順序,將其執(zhí)行結(jié)果與多作業(yè)調(diào)度算法的結(jié)果進(jìn)行比較。表4展示了作業(yè)對(duì)應(yīng)的編號(hào)以及其處理任務(wù)量信息,表5展示了優(yōu)化前后作業(yè)的提交順序。

      Table 3 Job sets information for experiment 3

      Table 4 Job number and corresponding processing task volume

      Table 5 Order of job submission

      圖7展示了基于負(fù)載均衡的多作業(yè)調(diào)度算法的提升效果。從圖7a可以看出,通過調(diào)優(yōu)作業(yè)的提交順序可縮短作業(yè)處理的時(shí)間,但存在某些按FIFO提交模式的順序比優(yōu)化后的輪詢提交順序要好,該情況的出現(xiàn)是因?yàn)樗惴ㄔ趫?zhí)行過程中并未考慮到任務(wù)量的大小。從圖7b可以看出,基于負(fù)載均衡的多作業(yè)調(diào)度算法在吞吐量性能上提升了5%左右。圖7c 展示了集群開啟的Task線程數(shù)對(duì)比,基于負(fù)載均衡的多作業(yè)調(diào)度算法執(zhí)行作業(yè)集時(shí),集群開啟的最大線程數(shù)在多數(shù)情況下有所減少,最好情況下減少了40%的線程。

      Figure 7 Running results of multi-job scheduling algorithm based on load balancing

      7 結(jié)束語

      本文通過分析作業(yè)與系統(tǒng)資源之間的關(guān)系,以及作業(yè)與作業(yè)之間的關(guān)系,提出并實(shí)現(xiàn)了提高集群資源利用率和負(fù)載均衡能力的算法,本文提出的優(yōu)化主要包含2個(gè)方面:

      (1)提出了啟發(fā)式的作業(yè)合并算法,通過分析作業(yè)任務(wù)量和作業(yè)分配到的集群資源之間的關(guān)系,合并對(duì)集群資源利用率低的作業(yè),使它們共用同一個(gè)作業(yè)的資源。該算法解決了集群部分作業(yè)資源利用率低的問題,并通過實(shí)驗(yàn)驗(yàn)證了該算法在不同類型作業(yè)上對(duì)集群性能提升的有效性。

      (2)提出了基于負(fù)載均衡的多作業(yè)調(diào)度算法,首先對(duì)作業(yè)進(jìn)行特征提??;然后通過多路K-means聚類算法將作業(yè)分為3類:大作業(yè)、中等作業(yè)和小作業(yè);之后采用自平衡輪詢調(diào)度算法提交分類好的作業(yè),保證大作業(yè)不會(huì)在集群中集中執(zhí)行,降低了集群由于開啟過多線程造成集群性能下降的概率,并通過實(shí)驗(yàn)驗(yàn)證了該算法的有效性。

      分布式系統(tǒng)在多作業(yè)執(zhí)行層面還有許多需要優(yōu)化和提高的部分,未來可繼續(xù)研究的問題有:

      (1)動(dòng)態(tài)調(diào)度。目前的分布式大數(shù)據(jù)處理系統(tǒng)未能做到在作業(yè)執(zhí)行過程中動(dòng)態(tài)地調(diào)整作業(yè)的執(zhí)行流程,這種方式不利于資源的動(dòng)態(tài)回收與共享。針對(duì)這一問題,系統(tǒng)需要做出相應(yīng)的優(yōu)化和改進(jìn)。

      (2)優(yōu)化多作業(yè)并行度。并行度是作業(yè)執(zhí)行的關(guān)鍵,目前在分布式大數(shù)據(jù)處理平臺(tái)的應(yīng)用中,一般都是從業(yè)人員根據(jù)數(shù)據(jù)與業(yè)務(wù)特性手動(dòng)優(yōu)化并行度,這樣就給并行度的優(yōu)化帶來了很多困難。因此,研究和設(shè)計(jì)出一套并行度設(shè)置的優(yōu)化方案,也是分布式大數(shù)據(jù)系統(tǒng)應(yīng)用方面的一個(gè)研究課題。

      猜你喜歡
      算子集群分布式
      擬微分算子在Hp(ω)上的有界性
      各向異性次Laplace算子和擬p-次Laplace算子的Picone恒等式及其應(yīng)用
      海上小型無人機(jī)集群的反制裝備需求與應(yīng)對(duì)之策研究
      一類Markov模算子半群與相應(yīng)的算子值Dirichlet型刻畫
      一種無人機(jī)集群發(fā)射回收裝置的控制系統(tǒng)設(shè)計(jì)
      電子制作(2018年11期)2018-08-04 03:25:40
      分布式光伏熱錢洶涌
      能源(2017年10期)2017-12-20 05:54:07
      分布式光伏:爆發(fā)還是徘徊
      能源(2017年5期)2017-07-06 09:25:54
      Python與Spark集群在收費(fèi)數(shù)據(jù)分析中的應(yīng)用
      勤快又呆萌的集群機(jī)器人
      Roper-Suffridge延拓算子與Loewner鏈
      五大连池市| 舟曲县| 东城区| 安塞县| 安义县| 兰坪| 天长市| 广灵县| 田林县| 青河县| 乌拉特后旗| 安岳县| 昌平区| 绥棱县| 抚远县| 上犹县| 铜川市| 鹤壁市| 上思县| 措美县| 南汇区| 兴化市| 巫溪县| 武定县| 华安县| 长春市| 东乡族自治县| 东阿县| 墨脱县| 改则县| 汝南县| 武邑县| 博白县| 宁强县| 南郑县| 建湖县| 泗水县| 唐海县| 诸暨市| 内江市| 桃园县|