郝志峰,黃澤林,蔡瑞初,傅正佳,溫 雯,唐凱麟
(1.廣東工業(yè)大學(xué)計(jì)算機(jī)學(xué)院,廣州 510006;2.佛山科學(xué)技術(shù)學(xué)院數(shù)學(xué)與大數(shù)據(jù)學(xué)院,廣東佛山 528000)
物聯(lián)網(wǎng)、社交網(wǎng)絡(luò)、云計(jì)算等技術(shù)的出現(xiàn),同時(shí)伴隨計(jì)算能力、空間存儲(chǔ)和網(wǎng)絡(luò)帶寬的快速發(fā)展,人類生活中的各類數(shù)據(jù)在互聯(lián)網(wǎng)、通信、金融、商業(yè)、醫(yī)療等諸多領(lǐng)域不斷增長(zhǎng)和累積。由于數(shù)據(jù)量日益增多,并且在各種應(yīng)用場(chǎng)景下業(yè)務(wù)需求和競(jìng)爭(zhēng)壓力對(duì)數(shù)據(jù)處理的實(shí)時(shí)性和準(zhǔn)確性提出了更高的要求,因此傳統(tǒng)的單機(jī)器數(shù)據(jù)處理技術(shù)已經(jīng)無法應(yīng)對(duì)時(shí)代的變化,而大數(shù)據(jù)的發(fā)展面臨著現(xiàn)實(shí)應(yīng)用的難題。因此,各種創(chuàng)新性的技術(shù)得到了充分發(fā)展,如云計(jì)算[1]、量子計(jì)算、分布式計(jì)算等技術(shù)。其中,分布式計(jì)算以其強(qiáng)大的計(jì)算能力和低廉的成本得到了廣泛應(yīng)用和深入研究。分布式系統(tǒng)[2]通常分為批數(shù)據(jù)處理系統(tǒng)和流數(shù)據(jù)處理[3]系統(tǒng)兩類,批處理應(yīng)用廣泛的有Hadoop[4]和Spark[5]等,流處理應(yīng)用廣泛的有Storm[6]、Flink[7]、Spark streaming[5]等。相對(duì)于批處理系統(tǒng),流處理更關(guān)注數(shù)據(jù)的實(shí)時(shí)性。實(shí)時(shí)數(shù)據(jù)流具有高速、不穩(wěn)定的特征,這也導(dǎo)致使用分布式流處理系統(tǒng)時(shí)對(duì)于計(jì)算資源必須有合理而準(zhǔn)確的調(diào)度。
分布式集群資源的動(dòng)態(tài)調(diào)度是當(dāng)前的一個(gè)研究熱點(diǎn),如Hadoop自帶的YARN[8]以及Apache開源的Mesos[9]框架,它們都可以在集群中對(duì)資源進(jìn)行管理。而在國內(nèi),也有類似于基于分簇的小基站用戶資源分配方案[10],但不足之處在于,這種管理更多地需要人工干預(yù),無法實(shí)時(shí)監(jiān)測(cè)集群資源的利用情況并進(jìn)行合理的資源分配。在流數(shù)據(jù)處理任務(wù)中,集群需要一直處于準(zhǔn)備狀態(tài),以應(yīng)對(duì)流數(shù)據(jù)處理負(fù)載的變化。因此,需要根據(jù)負(fù)載情況,通過負(fù)載均衡對(duì)任務(wù)進(jìn)行合理分配,如文獻(xiàn)[11]提出基于負(fù)載平衡和經(jīng)驗(yàn)值的工作流任務(wù)分配策略。而對(duì)于流數(shù)據(jù)處理任務(wù),資源負(fù)載是實(shí)時(shí)變化的,因此,如何在不停變化的實(shí)時(shí)負(fù)載過程中自動(dòng)地對(duì)集群資源進(jìn)行整理,降低當(dāng)前暫不需要的資源的消耗,是亟需解決的一個(gè)難題。
資源動(dòng)態(tài)調(diào)度及協(xié)同分配的實(shí)現(xiàn),難點(diǎn)在于如何根據(jù)當(dāng)前系統(tǒng)的資源使用情況對(duì)后續(xù)資源需求做出準(zhǔn)確的預(yù)測(cè),以及對(duì)預(yù)測(cè)的資源計(jì)算出當(dāng)前對(duì)系統(tǒng)加權(quán)延遲最低的資源分配比例,同時(shí)動(dòng)態(tài)關(guān)閉未使用的資源,減少資源消耗。本文通過對(duì)流數(shù)據(jù)處理任務(wù)負(fù)載進(jìn)行實(shí)時(shí)監(jiān)測(cè),構(gòu)建一個(gè)雙層資源調(diào)度模型,利用ZooKeeper[12]和YARN進(jìn)行動(dòng)態(tài)資源管理,從而實(shí)時(shí)改變集群資源分布情況,減少系統(tǒng)延遲。
在資源量化的方式上,本文系統(tǒng)是在YARN的基礎(chǔ)上構(gòu)建的,而在YARN中,資源被虛擬化為容器這個(gè)概念,每個(gè)容器資源中包含了相對(duì)應(yīng)的存儲(chǔ)空間和CPU大小等資源。
在資源動(dòng)態(tài)調(diào)度方面,本文將資源調(diào)度的處理分為任務(wù)層和系統(tǒng)層兩個(gè)層面。在任務(wù)層,文獻(xiàn)[13]提出基于排隊(duì)論的動(dòng)態(tài)資源調(diào)度模型,該模型需要在集群大小固定的情況下進(jìn)行調(diào)度。以此為基礎(chǔ),本文提出在資源調(diào)度時(shí)根據(jù)實(shí)時(shí)負(fù)載而改變集群大小減小延遲的模型。在系統(tǒng)層,已知的資源調(diào)度框架有YARN和Mesos等,主要解決了物理資源調(diào)度、將物理資源映射為虛擬化資源以及將虛擬化資源分配給分布式流處理系統(tǒng)進(jìn)行使用這3個(gè)問題。但關(guān)于分布式流數(shù)據(jù)系統(tǒng)如何將不同的資源分配給不同的任務(wù)則沒有涉及。
在Hadoop自帶的YARN資源管理框架中有3種資源調(diào)度算法,即先來先服務(wù)調(diào)度算法、公平調(diào)度算法和Capacity調(diào)度算法。這3種算法在資源調(diào)度上基本可以實(shí)現(xiàn)對(duì)資源的合理分配以應(yīng)對(duì)各種情況。但是YARN的調(diào)度算法是基于人為改變資源需求的前提,無法對(duì)資源的需求進(jìn)行預(yù)測(cè)和動(dòng)態(tài)調(diào)整。針對(duì)上述問題,本文提出一種雙層資源調(diào)度模型,通過中間件溝通系統(tǒng)層和YARN資源調(diào)度框架對(duì)集群資源進(jìn)行動(dòng)態(tài)調(diào)度和協(xié)同分配處理。
在集群管理方面,已知的如Storm on YARN[14]、Flink on YARN[15]、Storm on Mesos[16]、Flink on Mesos[17]等開源框架,都只是實(shí)現(xiàn)了對(duì)固定大小集群在多任務(wù)處理時(shí)的集群管理,但框架仍然需要更多人力進(jìn)行管理,在資源負(fù)載變化時(shí)人為改變集群的大小。
綜上所述,本文系統(tǒng)以DRS動(dòng)態(tài)資源調(diào)度模型[13]作為任務(wù)層,用以監(jiān)測(cè)實(shí)時(shí)延遲和負(fù)載變化等情況,并通過系統(tǒng)層溝通集群資源,通過增減資源實(shí)現(xiàn)對(duì)分布式流數(shù)據(jù)處理任務(wù)的動(dòng)態(tài)資源調(diào)度管理和協(xié)同分配。
Storm on YARN是Yahoo開源的一個(gè)基于Storm的分布式集群資源管理框架[18],為基于Storm的分布式流數(shù)據(jù)處理任務(wù)提供了較完善的資源管理機(jī)制,其主要由ResourceManager、ApplicationManager、NodeManager、Container、Nimbus、Supervisor等部分組成。Storm on YARN系統(tǒng)結(jié)構(gòu)如圖1所示。
圖1 Storm on YARN系統(tǒng)結(jié)構(gòu)Fig.1 Structure of Storm on YARN system
Storm on YARN系統(tǒng)主要有以下優(yōu)點(diǎn):
1)彈性計(jì)算資源。在將Storm運(yùn)行在分布式集群[19]中時(shí),Storm可以和其他的應(yīng)用程序(例如flink流數(shù)據(jù)處理任務(wù)或Spark批處理[20]任務(wù))共享集群資源,當(dāng)Storm流數(shù)據(jù)處理任務(wù)負(fù)載較大時(shí),可以動(dòng)態(tài)地為其增加資源;當(dāng)Strom流數(shù)據(jù)處理任務(wù)負(fù)載較小時(shí),可以為其減少資源,并將資源給予其他需要的應(yīng)用程序。
2)共享底層存儲(chǔ)。YARN可以為集群中運(yùn)行的分布式框架提供共享的HDFS[21]存儲(chǔ),減少管理多個(gè)集群帶來的不便以及傳輸數(shù)據(jù)帶來的時(shí)間延遲。
3)多版本共存。不同版本的Storm都可以運(yùn)行在一個(gè)YARN上,降低了版本維護(hù)的成本。
當(dāng)然,Storm on YARN本身還可以做出一些改進(jìn),目前的Storm on YARN依然需要人為地通過對(duì)分布式流數(shù)據(jù)處理任務(wù)負(fù)載的監(jiān)測(cè),再借由人為操作對(duì)集群資源進(jìn)行管理,這樣當(dāng)集群較大時(shí),無疑對(duì)管理集群來說是一個(gè)十分巨大的挑戰(zhàn),如何在多個(gè)流數(shù)據(jù)處理任務(wù)情況下根據(jù)當(dāng)前任務(wù)的重要程度協(xié)同分配資源,是亟待解決的問題。本文立足于此設(shè)計(jì)解決方案。
DRS是一個(gè)分布式動(dòng)態(tài)資源調(diào)度系統(tǒng),該系統(tǒng)基于Storm分布式流數(shù)據(jù)處理框架,通過排隊(duì)論對(duì)流數(shù)據(jù)處理的時(shí)間延遲進(jìn)行處理和預(yù)測(cè),從而對(duì)集群資源的分配做出合理分配的一個(gè)集群資源管理系統(tǒng)。DRS系統(tǒng)工作流程如圖2所示。
圖2 DRS系統(tǒng)工作流程Fig.2 Workflow of DRS system
DRS系統(tǒng)工作具體步驟如下:
步驟1系統(tǒng)收集預(yù)設(shè)時(shí)間窗口內(nèi)的負(fù)載信息,包括λ(source,m)、λ(i,in)、λ(i,out)、μi、si、ki、pji等。
步驟2數(shù)據(jù)擬合還原,將收集到的時(shí)間窗口內(nèi)的數(shù)據(jù)通過預(yù)設(shè)的數(shù)據(jù)擬合函數(shù)進(jìn)行擬合,使之后對(duì)資源調(diào)度的決策更接近真實(shí)數(shù)據(jù)結(jié)果。
步驟3將擬合后的數(shù)據(jù)提交到資源調(diào)度器以供其做出正確的決策。
在DRS系統(tǒng)中,資源調(diào)度器對(duì)分布式流數(shù)據(jù)處理任務(wù)的負(fù)載情況界定包括3種狀態(tài),即合適(feasible)、過載(overproviding)和資源不足(shortage)。DRS根據(jù)資源調(diào)度器對(duì)負(fù)載情況做出的負(fù)載情況界定,并通過時(shí)間窗口內(nèi)通過數(shù)據(jù)擬合后得到的數(shù)據(jù),對(duì)分布式流數(shù)據(jù)處理任務(wù)后續(xù)需要分配的資源情況做出正確的決策,包括刪減資源或增加資源等。但DRS的不足之處在于,它只對(duì)固定集群大小的資源進(jìn)行分配,無法控制集群本身做出關(guān)閉節(jié)點(diǎn)減少資源或打開節(jié)點(diǎn)提供資源等操作。
由于DRS資源調(diào)度器只能在固定給定物理資源的條件下對(duì)資源進(jìn)行合理分配和調(diào)度,而在分布式流處理任務(wù)中,有時(shí)根據(jù)任務(wù)的負(fù)載情況需要減少集群資源或者增加集群資源,而這需要通過人為幫助才能做到,即系統(tǒng)層資源調(diào)度器無法通過任務(wù)層資源調(diào)度器的分配策略動(dòng)態(tài)調(diào)整集群資源情況。
本文提出雙層資源調(diào)度模型,該模型結(jié)合了任務(wù)層資源調(diào)度器和系統(tǒng)層資源調(diào)度器,并在系統(tǒng)層通過Zookeeper與YARN建立連接,通過YARN對(duì)集群資源進(jìn)行調(diào)整。
如圖3所示,雙層調(diào)度模型分為任務(wù)層資源調(diào)度器和系統(tǒng)層資源調(diào)度器,任務(wù)層資源調(diào)度器的作用是管理集群的所有工作任務(wù),根據(jù)當(dāng)前各個(gè)任務(wù)的負(fù)載情況,以最有效降低整體任務(wù)延遲為目的,制定出符合當(dāng)前情境的資源分配策略。例如,在當(dāng)前集群中,假設(shè)各節(jié)點(diǎn)的資源占用為(5,5,5),此時(shí)節(jié)點(diǎn)1的延遲高于預(yù)設(shè)值,而節(jié)點(diǎn)3的延遲低于預(yù)設(shè)值,則任務(wù)層資源調(diào)度器可能會(huì)將資源占用比設(shè)定為(6,5,4)。任務(wù)層資源調(diào)度器使用文獻(xiàn)[9]基于排隊(duì)論的動(dòng)態(tài)資源調(diào)度模型,用于制定合理的資源分配策略。系統(tǒng)層資源調(diào)度器的作用為依賴Storm on YARN框架對(duì)集群資源的高級(jí)管理能力,根據(jù)中間件調(diào)度器給出的資源信息增加或減少資源。
圖3 雙層資源調(diào)度模型架構(gòu)Fig.3 Architecture of two-tier resource scheduling model
此過程相對(duì)目前的Storm on YARN框架而言,能夠起到動(dòng)態(tài)分配資源的作用,而不需要人為增減資源,并且由于根據(jù)中間件調(diào)度器的能力來控制資源總數(shù),對(duì)于當(dāng)然資源冗余的節(jié)點(diǎn),可以實(shí)時(shí)地關(guān)閉節(jié)點(diǎn),不會(huì)占用集群資源,方便其他集群使用。
本文提出的雙層資源調(diào)度模型主要服務(wù)于分布式流數(shù)據(jù)處理任務(wù)過程,其對(duì)集群資源進(jìn)行增減管理,并在多個(gè)任務(wù)中考慮到不同任務(wù)的重要性不同,根據(jù)任務(wù)權(quán)重協(xié)同地調(diào)配資源比例。該模型工作的具體步驟如下:
步驟1任務(wù)層資源調(diào)度器對(duì)分布式流數(shù)據(jù)處理任務(wù),根據(jù)當(dāng)前負(fù)載做出決策。
步驟2任務(wù)層資源調(diào)度器發(fā)出資源調(diào)度請(qǐng)求,由中間件調(diào)度器接收。
步驟3對(duì)于集群目前的資源情況,中間件調(diào)度器做出決策,當(dāng)資源冗余時(shí),跳至步驟4,當(dāng)資源不足時(shí),跳至步驟5,否則跳至步驟6。
步驟4當(dāng)出現(xiàn)資源冗余時(shí),中間件調(diào)度器與系統(tǒng)層資源調(diào)度器發(fā)起聯(lián)系,并按照資源使用情況,優(yōu)先關(guān)閉資源使用率較低的節(jié)點(diǎn),然后通知中間件調(diào)度器,完成流程。
步驟5當(dāng)出現(xiàn)資源不足時(shí),如果通過打開集群中關(guān)閉節(jié)點(diǎn)可滿足需求時(shí),則中間件資源調(diào)度器直接與系統(tǒng)層資源調(diào)度器溝通;否則,中間件調(diào)度器根據(jù)預(yù)設(shè)的任務(wù)權(quán)重,由計(jì)算出的多個(gè)任務(wù)的權(quán)重延遲比,對(duì)資源進(jìn)行協(xié)同分配,再通過與系統(tǒng)層資源調(diào)度器溝通,分配資源。
步驟6中間件資源調(diào)度器檢測(cè)到目前資源為最優(yōu)狀態(tài),觸發(fā)流數(shù)據(jù)任務(wù)的資源調(diào)整。
基于雙層資源調(diào)度模型的工作流程,本文主要關(guān)注點(diǎn)在于中間件調(diào)度器在不需要協(xié)同分配資源時(shí)對(duì)集群節(jié)點(diǎn)的減增策略。
在算法1中,CallRebalance指Storm改變本身的資源分配平衡,之后中間件調(diào)度器察覺到分布式流處理任務(wù)所需資源發(fā)生變化,則與當(dāng)前系統(tǒng)所有資源做對(duì)比,并通過與系統(tǒng)層資源調(diào)度器的會(huì)話聯(lián)系,做出相應(yīng)的資源調(diào)整,增加或者刪減資源,最后中間件調(diào)度器察覺到當(dāng)前資源處于可提供的最優(yōu)狀態(tài),觸發(fā)流處理任務(wù)的資源調(diào)整。具體步驟如下:
步驟1需要任務(wù)層資源調(diào)度器預(yù)先生成調(diào)度決策。生成資源調(diào)度決策包含信息監(jiān)測(cè)、診斷生成和決策生成3個(gè)部分。其中,系統(tǒng)運(yùn)行時(shí)狀態(tài)收集是第一步,也是很重要的一步,只有收集到足夠多、足夠準(zhǔn)確的系統(tǒng)狀態(tài),才能做出正確的資源調(diào)度決策。而其中對(duì)于數(shù)據(jù)堆積情況的監(jiān)測(cè)尤為重要,因?yàn)樵趯?shí)時(shí)系統(tǒng)中,穩(wěn)定運(yùn)行狀態(tài)下一旦發(fā)生數(shù)據(jù)堆積,系統(tǒng)實(shí)時(shí)性會(huì)降低,延遲會(huì)增大。此時(shí),必然是因?yàn)楣ぷ髫?fù)載變大或者系統(tǒng)本身出現(xiàn)問題,應(yīng)當(dāng)迅速對(duì)此做出反饋,解決數(shù)據(jù)堆積的情況。例如,在判斷數(shù)據(jù)堆積的情況時(shí),可以通過基于排隊(duì)論的方法[9]來實(shí)時(shí)地反映系統(tǒng)不斷變化的狀態(tài)。最后再利用決策算法[9],通過多次迭代來保證資源分配比例最優(yōu)。
步驟2在系統(tǒng)層資源調(diào)度器層面,將系統(tǒng)可利用的物理資源虛擬化為可分配、可量化的虛擬化資源,如容器節(jié)點(diǎn),然后根據(jù)中間件調(diào)度器的指示進(jìn)行動(dòng)態(tài)地增加或者刪減資源,目的是自動(dòng)對(duì)資源進(jìn)行管理,減少人工介入。
在上述步驟中,當(dāng)中間件調(diào)度器與系統(tǒng)層資源調(diào)度器會(huì)話時(shí),如果當(dāng)前集群可啟動(dòng)的節(jié)點(diǎn)無法滿足所需資源,則會(huì)給中間件調(diào)度器返回false,之后中間件調(diào)度器會(huì)一直嘗試獲得資源,直到對(duì)資源需求改變,或者需求得到滿足。
在任務(wù)層資源調(diào)度器中,本文通過預(yù)設(shè)的時(shí)間窗口將分布式流處理任務(wù)的實(shí)際延遲保存到一個(gè)預(yù)設(shè)長(zhǎng)度的隊(duì)列中,根據(jù)該隊(duì)列中的數(shù)據(jù)計(jì)算出預(yù)設(shè)的擬合函數(shù)的參數(shù),并將參數(shù)發(fā)送到中間件調(diào)度器。在中間件調(diào)度器中,利用DRS中的排隊(duì)論算法[9],根據(jù)流處理任務(wù)的目標(biāo)節(jié)點(diǎn)數(shù)獲得改變資源后的預(yù)測(cè)延遲,再通過擬合函數(shù)擬合出更接近真實(shí)數(shù)據(jù)的值。
當(dāng)多個(gè)任務(wù)在系統(tǒng)中并且集群無法滿足這些任務(wù)所需要的的資源時(shí),需要權(quán)衡各個(gè)任務(wù)的重要性,從而協(xié)同地分配資源。通過將獲得的預(yù)測(cè)延遲(已擬合過)與預(yù)設(shè)的每個(gè)任務(wù)的權(quán)重做加權(quán)延遲和,并比較不同資源分配比例時(shí)哪個(gè)加權(quán)延遲和最小,將該結(jié)果為最佳資源協(xié)同分配方案。
在需要協(xié)調(diào)分配資源,即有多個(gè)流數(shù)據(jù)處理任務(wù)要同時(shí)管理(目前只支持兩個(gè))時(shí),將為不同的任務(wù)設(shè)定不同的權(quán)重來代表該任務(wù)的重要性,每個(gè)任務(wù)在任務(wù)層資源調(diào)度器中,依然通過3.2節(jié)中的調(diào)度決策策略來根據(jù)自身負(fù)載改變資源分配比例。由于此時(shí)有多個(gè)任務(wù)需要改變自身節(jié)點(diǎn)數(shù),因此中間件調(diào)度器會(huì)先向系統(tǒng)層查詢當(dāng)前集群的資源數(shù)是否滿足任務(wù)層資源需求。例如,任務(wù)1的資源需求由(5,5,5)->(5,5,6),任務(wù)2的資源需求由(5,5,5)->(5,6,6),此時(shí)中間件調(diào)度器會(huì)向系統(tǒng)層查詢集群資源是否滿足,由于系統(tǒng)資源只能提供30個(gè)節(jié)點(diǎn)資源,因此任務(wù)1和任務(wù)2需要協(xié)同分配資源。中間件調(diào)度器根據(jù)任務(wù)層資源調(diào)度器的反饋,依據(jù)兩個(gè)任務(wù)的權(quán)重不同,通過窮舉的方式計(jì)算出各種分配方案下系統(tǒng)總延遲最佳的分配方案,從而將新的分配方案通知給系統(tǒng)層進(jìn)行資源分配。
通過實(shí)驗(yàn)驗(yàn)證本文系統(tǒng)的資源動(dòng)態(tài)調(diào)節(jié)功能和多任務(wù)協(xié)同分配功能是否運(yùn)行正常。
在由虛擬機(jī)搭建的Hadoop集群上進(jìn)行YARN資源調(diào)度相關(guān)實(shí)驗(yàn)。通過對(duì)虛擬機(jī)配置的統(tǒng)一性,排除在實(shí)驗(yàn)過程中由于機(jī)器配置如內(nèi)存、CPU、網(wǎng)絡(luò)等因素對(duì)實(shí)驗(yàn)結(jié)果造成的不必要的影響。相關(guān)配置如表1所示。
表1 實(shí)驗(yàn)配置Table 1 Configuration of experiment
本文基于YARN對(duì)Storm分布式流數(shù)據(jù)處理任務(wù)進(jìn)行動(dòng)態(tài)資源調(diào)度和協(xié)同分配。通過一個(gè)簡(jiǎn)單的單詞統(tǒng)計(jì)(以下簡(jiǎn)稱WordCount)任務(wù)來驗(yàn)證系統(tǒng)功能是否正常,之所以只通過單詞統(tǒng)計(jì)來進(jìn)行實(shí)驗(yàn),是為了方便在實(shí)驗(yàn)過程中對(duì)任務(wù)負(fù)載通過句子發(fā)送頻率進(jìn)行調(diào)控。
實(shí)驗(yàn)中WordCount任務(wù)以一個(gè)包含100 000條句子的文件作為數(shù)據(jù)源,通過采用Redis做消息隊(duì)列,向Storm的spout以預(yù)設(shè)的速率發(fā)送句子。在Storm的完全延遲趨于穩(wěn)定后,通過改變消息隊(duì)列的發(fā)送速率來調(diào)整分布式流處理任務(wù)的負(fù)載情況,從而使系統(tǒng)對(duì)資源的需求發(fā)生改變。WordCount任務(wù)拓?fù)淙鐖D4所示??梢钥闯?,在WordCount中,當(dāng)切分單元、統(tǒng)計(jì)單元和報(bào)告單元對(duì)當(dāng)前到達(dá)的數(shù)據(jù)的速率(工作負(fù)載)λi超過各個(gè)單元本身算子對(duì)數(shù)據(jù)的處理速率μi乘以單元個(gè)數(shù)ki時(shí),表明該單元的資源數(shù)需要調(diào)整。
圖4 WordCount任務(wù)拓?fù)銯ig.4 WordCount task topology
在介紹關(guān)于分布式流處理任務(wù)的負(fù)載改變?nèi)绾慰刂坪螅O(shè)置如表2所示的一組實(shí)驗(yàn)參數(shù)。其中,數(shù)據(jù)源的輸入速率λs=400 tuple/s,而此時(shí)設(shè)置的切分單元的處理能力為k切分單元×μ切分單元=2×150=300 tuple/s。若切分單元可以處理所有到達(dá)的句子,在本文實(shí)驗(yàn)數(shù)據(jù)集中,每個(gè)句子平均可以切分為6個(gè)單詞,則下一單元到達(dá)率為6×400=2 400 tuple/s,此時(shí)統(tǒng)計(jì)單元的處理能力為k統(tǒng)計(jì)單元×μ統(tǒng)計(jì)單元=5×400=2 000 tuple/s<2 400 tuple/s。此外,報(bào)告單元的處理能力為k報(bào)告單元×μ報(bào)告單元=5×400=2 000 tuple/s,也小于報(bào)告單元數(shù)據(jù)的到達(dá)率。系統(tǒng)將根據(jù)當(dāng)前的負(fù)載情況,判斷出此時(shí)資源處于shotage狀態(tài),從而重新做出新的資源分配決策,中間件調(diào)度器檢測(cè)到分布式任務(wù)的資源請(qǐng)求發(fā)生改變,則會(huì)在通過計(jì)算當(dāng)前資源是否能滿足任務(wù)需求后,再?zèng)Q定是否向系統(tǒng)層資源調(diào)度器申請(qǐng)資源。
表2 實(shí)驗(yàn)參數(shù)Table 2 Parameters of experiment
以上為檢測(cè)系統(tǒng)的資源動(dòng)態(tài)調(diào)度功能,而對(duì)于系統(tǒng)多任務(wù)系統(tǒng)分配資源的功能,本文則通過設(shè)置2個(gè)相同的分布式流處理任務(wù),在2個(gè)任務(wù)同時(shí)改變?nèi)蝿?wù)負(fù)載的情況下,判斷當(dāng)前的集群資源分配是否可協(xié)同分配,根據(jù)權(quán)重和預(yù)測(cè)延遲的改變做出更合理的分配決策。
驗(yàn)證系統(tǒng)的功能是否工作正常,主要關(guān)注2個(gè)指標(biāo)的變化:一個(gè)是在負(fù)載變化后,集群是否做出決策;另一個(gè)是在集群資源重新分配后,分布式流處理任務(wù)的延遲是否發(fā)生改變。實(shí)驗(yàn)以關(guān)閉動(dòng)態(tài)調(diào)度功能的系統(tǒng)作為對(duì)比,驗(yàn)證系統(tǒng)功能。
如圖5所示,基于Storm on YARN的動(dòng)態(tài)資源調(diào)度功能在最初消息隊(duì)列發(fā)送數(shù)據(jù)不穩(wěn)定,因此,本文過濾掉前1 min的latency數(shù)據(jù),之后在第3 min,任務(wù)層資源調(diào)度器做出了對(duì)資源分配的新的決策,即(2,5,5)->(3,6,6),此時(shí)集群資源足夠?qū)θ蝿?wù)分配做出響應(yīng),但集群只使用了6臺(tái)共12個(gè)節(jié)點(diǎn),所以,需要中間件調(diào)度器向系統(tǒng)層資源調(diào)度器申請(qǐng)資源,之后資源分配變?yōu)椋?,6,6)共15個(gè)節(jié)點(diǎn),需要使用8臺(tái)機(jī)器,latency降到預(yù)設(shè)的范圍,并趨于穩(wěn)定。
圖5 WordCount任務(wù)資源動(dòng)態(tài)調(diào)度過程Fig.5 Dynamic scheduling process of WordCount task resource
如圖6所示,在上一個(gè)實(shí)驗(yàn)的基礎(chǔ)上,通過Strom提供的Rebalance機(jī)制,強(qiáng)制調(diào)整分布式流數(shù)據(jù)處理任務(wù)WordCount的節(jié)點(diǎn)分配為(1,4,4),可以看出,任務(wù)節(jié)點(diǎn)分配改變后,WordCount任務(wù)的延遲突增,這是因?yàn)槿蝿?wù)資源無法滿足當(dāng)前的數(shù)據(jù)輸入速率,此時(shí)任務(wù)層資源調(diào)度器會(huì)重新做出資源分配決策,中間件調(diào)度器察覺到任務(wù)層資源調(diào)度器做出決策后,則提供動(dòng)態(tài)調(diào)度功能調(diào)整集群資源分配,之后任務(wù)延遲回到正常水平。
圖6 WorkCount任務(wù)資源調(diào)度的調(diào)整過程Fig.6 Adjustment process of WorkCount task resource scheduling
如圖7所示,設(shè)置WordCount_1的節(jié)點(diǎn)分配為(3,6,6),在有兩個(gè)相同分配的WordCount任務(wù)同時(shí)進(jìn)入集群時(shí),由于WordCount_1先進(jìn)入集群,因此其占據(jù)了15個(gè)節(jié)點(diǎn)數(shù),此時(shí)另一個(gè)任務(wù)WordCount_2只能獲得節(jié)點(diǎn)分配為(1,2,2),設(shè)置2個(gè)任務(wù)的權(quán)重都為0.5,則在中間件調(diào)度器為任務(wù)2修改資源時(shí),會(huì)根據(jù)2個(gè)任務(wù)的權(quán)重公平地分配資源,所以,此時(shí)2個(gè)節(jié)點(diǎn)的資源分配都是(2,4,4)。
圖7 多任務(wù)的資源協(xié)同分配Fig.7 Multi tasks resource collaborative allocation
以上3個(gè)實(shí)驗(yàn)都驗(yàn)證了本文系統(tǒng)具備動(dòng)態(tài)調(diào)度系統(tǒng)資源的功能,在通過動(dòng)態(tài)調(diào)度后,任務(wù)的延遲可以達(dá)到預(yù)設(shè)的延遲,提高流數(shù)據(jù)處理任務(wù)的處理效率。
相比于非動(dòng)態(tài)調(diào)度,雙層資源調(diào)度模型可以分層準(zhǔn)確地對(duì)系統(tǒng)進(jìn)行信息監(jiān)測(cè),診斷生成及生成決策,并通過中間件調(diào)度器的調(diào)度,將分配策略同步給系統(tǒng)層資源調(diào)度器,從而對(duì)系統(tǒng)可分配的虛擬化資源進(jìn)行合理分配,降低系統(tǒng)延遲,這也是雙層調(diào)度框架相對(duì)于非動(dòng)態(tài)調(diào)度可以提高系統(tǒng)資源利用率的原因。
本文提出一種基于YARN的分布式流數(shù)據(jù)處理任務(wù)資源的動(dòng)態(tài)調(diào)度系統(tǒng)。相對(duì)于開源Storm on YARN框架,該系統(tǒng)基于雙層調(diào)度模型,可檢測(cè)流數(shù)據(jù)處理任務(wù)延遲,實(shí)現(xiàn)對(duì)集群負(fù)載的動(dòng)態(tài)調(diào)度,從而實(shí)時(shí)調(diào)整集群大小,避免人工干預(yù)集群,降低集群維護(hù)成本。本文系統(tǒng)只能對(duì)2個(gè)任務(wù)進(jìn)行協(xié)同分配,下一步將在協(xié)同分配多個(gè)任務(wù)資源方面對(duì)其進(jìn)行改進(jìn),并設(shè)計(jì)只通過窮舉方式計(jì)算資源分配的方案。