魯 亮 于 炯 卞 琛 劉月超 廖 彬 李慧娟
1(新疆大學(xué)信息科學(xué)與工程學(xué)院 烏魯木齊 830046)
2(新疆財(cái)經(jīng)大學(xué)統(tǒng)計(jì)與信息學(xué)院 烏魯木齊 830012)
3(國(guó)網(wǎng)烏魯木齊供電公司 烏魯木齊 830011)
(luliang19891108@gmail.com)
近些年來(lái),大數(shù)據(jù)相關(guān)研究及應(yīng)用已成為學(xué)術(shù)界和企業(yè)界關(guān)注的熱點(diǎn),其計(jì)算模式包括批量計(jì)算、流式計(jì)算、交互計(jì)算、圖計(jì)算等[1-5],并以前兩者應(yīng)用居多.批量計(jì)算為先存儲(chǔ)后計(jì)算(如Hadoop生態(tài)系統(tǒng)),適合實(shí)時(shí)性不高且需覆蓋全局?jǐn)?shù)據(jù)的應(yīng)用場(chǎng)景;流式計(jì)算打破了Hadoop中MapReduce[6]框架一統(tǒng)天下的局面,它無(wú)需存儲(chǔ),只要數(shù)據(jù)源處于活動(dòng)狀態(tài),數(shù)據(jù)就會(huì)持續(xù)生成,并以流(由時(shí)間上無(wú)窮的元組序列組成)的形式在各工作節(jié)點(diǎn)的內(nèi)存中進(jìn)行計(jì)算,適合實(shí)時(shí)性要求嚴(yán)格且僅需針對(duì)窗口內(nèi)的局部數(shù)據(jù)進(jìn)行處理的應(yīng)用場(chǎng)景.流式大數(shù)據(jù)處理平臺(tái)大大提高了在線(xiàn)數(shù)據(jù)密集型(on line data intensive, OLDI)應(yīng)用[7]的用戶(hù)體驗(yàn),可廣泛應(yīng)用于金融銀行業(yè)、互聯(lián)網(wǎng)和物聯(lián)網(wǎng)等諸多領(lǐng)域,涵蓋股市實(shí)時(shí)分析、搜索引擎與社交網(wǎng)站、交通流量實(shí)時(shí)預(yù)警等各類(lèi)典型應(yīng)用[8].現(xiàn)有的流式大數(shù)據(jù)處理框架以Twitter公司的Storm系統(tǒng)為代表[9].Storm是一個(gè)采用主從式架構(gòu)的開(kāi)源分布式實(shí)時(shí)計(jì)算平臺(tái),其編程模型簡(jiǎn)單,支持包括Java在內(nèi)的多種編程語(yǔ)言,橫向可擴(kuò)展性良好.相較于目前同樣主流的Flink[10]和Spark Streaming[11],Storm在大數(shù)據(jù)流式處理方面的實(shí)時(shí)性更佳;相較于不開(kāi)源的Puma[12]和社區(qū)冷淡的S4[13],Storm的商用前景更為廣闊.加之新版本特性的加入、更多庫(kù)的支持以及與其他開(kāi)源項(xiàng)目的無(wú)縫融合,Storm逐漸成為學(xué)術(shù)界和工業(yè)界的研究熱點(diǎn),被稱(chēng)為“實(shí)時(shí)處理領(lǐng)域的Hadoop”.
一個(gè)流式計(jì)算作業(yè)及其包含的一系列任務(wù)可用有向無(wú)環(huán)圖(directed acyclic graph, DAG)表示,Storm中稱(chēng)之為拓?fù)?topology).從拓?fù)鋵?shí)例模型的角度來(lái)看,拓?fù)涞?個(gè)頂點(diǎn)代表某一特定任務(wù),1條有向邊代表任務(wù)之間的依賴(lài)關(guān)系.Storm在進(jìn)行任務(wù)分配時(shí)采用輪詢(xún)調(diào)度策略(round-robin scheduling),即先將拓?fù)渲邪拿總€(gè)任務(wù)均勻地分布到各個(gè)工作進(jìn)程中,再將各工作進(jìn)程均勻地分布到各工作節(jié)點(diǎn)上,并未考慮到不同工作節(jié)點(diǎn)的性能和負(fù)載差異,以及工作節(jié)點(diǎn)之間的網(wǎng)絡(luò)傳輸開(kāi)銷(xiāo)和節(jié)點(diǎn)內(nèi)部的進(jìn)程與線(xiàn)程通信開(kāi)銷(xiāo),無(wú)法最大限度地發(fā)揮Storm集群的實(shí)時(shí)計(jì)算能力.本文針對(duì)Storm輪詢(xún)調(diào)度策略存在的不足,主要作了4個(gè)方面研究工作:
1) 分析已有流式計(jì)算框架調(diào)度策略的優(yōu)缺點(diǎn),闡述本文的優(yōu)化方向和實(shí)施思路.
2) 從拓?fù)涞倪壿嬆P汀?shí)例模型和任務(wù)分配模型出發(fā),比較Storm集群中3種不同的通信方式.由此建立資源約束模型和最優(yōu)通信開(kāi)銷(xiāo)模型,提出并證明了最優(yōu)通信開(kāi)銷(xiāo)原則.
3) 為解決異構(gòu)Storm系統(tǒng)中工作節(jié)點(diǎn)任務(wù)過(guò)載和節(jié)點(diǎn)間通信開(kāi)銷(xiāo)大的問(wèn)題,建立任務(wù)遷移模型,提出并證明了遷移優(yōu)化原則和節(jié)點(diǎn)間數(shù)據(jù)流最優(yōu)性原理,并由此推出最優(yōu)遷移原則,為任務(wù)遷移策略的設(shè)計(jì)提供理論依據(jù).
4) 從源節(jié)點(diǎn)選擇、阻尼線(xiàn)程選擇和目的節(jié)點(diǎn)選擇3個(gè)方面出發(fā),提出異構(gòu)Storm環(huán)境下的任務(wù)遷移策略(task migration strategy for heterogeneous Storm cluster, TMSH-Storm),包括源節(jié)點(diǎn)選擇算法和任務(wù)遷移算法,使系統(tǒng)在拓?fù)鋱?zhí)行過(guò)程中根據(jù)各工作節(jié)點(diǎn)和各任務(wù)的實(shí)時(shí)負(fù)載情況以及任務(wù)間的數(shù)據(jù)流大小,實(shí)現(xiàn)任務(wù)的優(yōu)化遷移.實(shí)驗(yàn)通過(guò)4個(gè)基準(zhǔn)測(cè)試從不同角度證明了算法的有效性.
針對(duì)實(shí)時(shí)大規(guī)模數(shù)據(jù)的處理,現(xiàn)有解決方案可大致歸納為3類(lèi):高性能批量計(jì)算模式、流式計(jì)算模式和兩者混合的模式.其中,高性能批量計(jì)算模式的核心思想是修改以Hadoop為代表的批處理框架,通過(guò)減少中間結(jié)果的磁盤(pán)讀寫(xiě)次數(shù)以及增加作業(yè)間的流水化程度來(lái)提高吞吐量[14-19].混合模式的主要思想是基于MapReduce模型增加或修改其中的某些處理步驟,以實(shí)現(xiàn)流式處理[20-25].這2種方案在一定程度上解決了大數(shù)據(jù)處理的實(shí)時(shí)性問(wèn)題,但其性能仍遜色于流式計(jì)算模式,且其研究成果無(wú)法直接移植于現(xiàn)有流式計(jì)算平臺(tái)中.針對(duì)這一問(wèn)題,已有國(guó)內(nèi)外學(xué)者提出了流式計(jì)算框架下的各種性能優(yōu)化方向和實(shí)施策略.文獻(xiàn)[8,26]在大數(shù)據(jù)流式計(jì)算綜述中總結(jié)了流式大數(shù)據(jù)在典型應(yīng)用領(lǐng)域中呈現(xiàn)出的實(shí)時(shí)性、易失性、突發(fā)性、無(wú)序性和無(wú)限性等特征,給出了理想的大數(shù)據(jù)流式計(jì)算系統(tǒng)在系統(tǒng)結(jié)構(gòu)、數(shù)據(jù)傳輸、應(yīng)用接口和高可用性等方面應(yīng)該具備的關(guān)鍵技術(shù)特征,闡述了已有各類(lèi)流式計(jì)算系統(tǒng)在可伸縮性、容錯(cuò)機(jī)制、狀態(tài)一致性、負(fù)載均衡和吞吐量等方面所面臨的技術(shù)挑戰(zhàn).此外,為獲得更好的服務(wù)質(zhì)量,已有學(xué)者提出各類(lèi)性能感知的數(shù)據(jù)流調(diào)度算法,例如混合啟發(fā)式遺傳調(diào)度算法[27]、競(jìng)爭(zhēng)感知的任務(wù)復(fù)制調(diào)度算法[28]、基于支持向量域描述和支持向量聚類(lèi)算法[29]以及基于雙分子結(jié)構(gòu)的化學(xué)反應(yīng)優(yōu)化算法[30]等.
以上研究工作的側(cè)重點(diǎn)各異,但大多只適用于任務(wù)和節(jié)點(diǎn)信息變化不夠迅速的場(chǎng)景.如果數(shù)據(jù)流大小和速率經(jīng)常變化,以上調(diào)度策略的部署可能引起較大的系統(tǒng)波動(dòng).針對(duì)大數(shù)據(jù)流式計(jì)算環(huán)境下數(shù)據(jù)量大、變化迅速且無(wú)法追蹤的特點(diǎn),有學(xué)者提出各類(lèi)流式計(jì)算環(huán)境下的虛擬機(jī)監(jiān)測(cè)和可擴(kuò)展平臺(tái),從監(jiān)測(cè)對(duì)象[31-33](CPU等硬件負(fù)載、工作節(jié)點(diǎn)和線(xiàn)程的數(shù)量、作業(yè)的執(zhí)行情況等)、虛擬機(jī)部署和調(diào)整方式[33-35](人工干預(yù)、半自動(dòng)和全自動(dòng))、優(yōu)化范圍[36](基于當(dāng)前狀態(tài)的局部?jī)?yōu)化、基于預(yù)測(cè)的全局優(yōu)化)等方面彈性調(diào)整虛擬機(jī)的數(shù)量和任務(wù)的部署,以達(dá)到性能優(yōu)化和負(fù)載均衡的效果.為更好地處理復(fù)雜作業(yè)中的大規(guī)模流式數(shù)據(jù),文獻(xiàn)[37]針對(duì)高可擴(kuò)展的分布式中間件System S[38]提出一種分布式應(yīng)用調(diào)度優(yōu)化算法.該算法分為4個(gè)階段:前2個(gè)階段計(jì)算作業(yè)運(yùn)行的備選節(jié)點(diǎn),后2個(gè)階段決定各處理單元在各節(jié)點(diǎn)上的分配.該算法在滿(mǎn)足系統(tǒng)高可擴(kuò)展性的同時(shí),提高了調(diào)度的實(shí)時(shí)性.文獻(xiàn)[39]從有向無(wú)環(huán)圖優(yōu)化的角度出發(fā),提出彈性自適應(yīng)數(shù)據(jù)流圖模型,并使用該策略進(jìn)行資源分配,以尋求最大化吞吐量和最小化響應(yīng)時(shí)間.文獻(xiàn)[40]從虛擬化網(wǎng)絡(luò)數(shù)據(jù)中心(virtualized networked data centers, VNetDCs)的角度,提出云計(jì)算SaaS計(jì)算模型下針對(duì)實(shí)時(shí)流式應(yīng)用的最小化能耗調(diào)度策略.該研究充分考慮到大數(shù)據(jù)實(shí)時(shí)流數(shù)據(jù)量大、數(shù)據(jù)到達(dá)速率不可控和不穩(wěn)定等特性,在響應(yīng)時(shí)間約束的前提下,最小化計(jì)算和網(wǎng)絡(luò)傳輸總能耗.文獻(xiàn)[41]提出大數(shù)據(jù)流式計(jì)算環(huán)境下實(shí)時(shí)且節(jié)能的資源調(diào)度模型Re-Stream.作者建立了能耗、響應(yīng)時(shí)間和CPU利用率之間的數(shù)學(xué)關(guān)系,定義了有向無(wú)環(huán)圖的關(guān)鍵路徑,并綜合運(yùn)用關(guān)鍵路徑上性能感知的任務(wù)調(diào)度策略和非關(guān)鍵路徑上能耗感知的任務(wù)整合策略,達(dá)到了響應(yīng)時(shí)間和能耗雙目標(biāo)優(yōu)化的效果.以上研究均在兼顧了流式大數(shù)據(jù)特征的前提下提出了各類(lèi)流式計(jì)算平臺(tái)下的優(yōu)化策略,但針對(duì)Storm這一具體框架,在資源感知和通信開(kāi)銷(xiāo)的優(yōu)化方面仍存在很大的探索空間.
針對(duì)Storm框架下的調(diào)度優(yōu)化策略,已有少量學(xué)者開(kāi)展了部分研究.針對(duì)地理位置分散的Storm集群而言,文獻(xiàn)[42]提出并實(shí)現(xiàn)了一種服務(wù)質(zhì)量感知的Storm分布式調(diào)度器,其在網(wǎng)絡(luò)時(shí)延和可靠性等方面均優(yōu)于Storm默認(rèn)調(diào)度策略的執(zhí)行結(jié)果,但這與本文集中式數(shù)據(jù)中心的研究背景不符.文獻(xiàn)[43]提出Storm環(huán)境下的自適應(yīng)調(diào)度策略,分為離線(xiàn)調(diào)度和在線(xiàn)調(diào)度2種.離線(xiàn)調(diào)度是在拓?fù)溥\(yùn)行前分析其結(jié)構(gòu),將相互關(guān)聯(lián)的1對(duì)任務(wù)盡量調(diào)度到同一個(gè)工作進(jìn)程中,然后將所有工作進(jìn)程輪詢(xún)調(diào)度到各個(gè)工作節(jié)點(diǎn)上;在線(xiàn)調(diào)度則是通過(guò)插件實(shí)時(shí)監(jiān)測(cè)拓?fù)溥\(yùn)行過(guò)程中系統(tǒng)的CPU負(fù)載和任務(wù)間數(shù)據(jù)流大小,當(dāng)某節(jié)點(diǎn)上的任務(wù)持續(xù)過(guò)載時(shí)觸發(fā)重分配機(jī)制,依次將通信開(kāi)銷(xiāo)較大的1對(duì)任務(wù)分配在CPU負(fù)載較輕的工作進(jìn)程中,同理將通信開(kāi)銷(xiāo)較大的1對(duì)工作進(jìn)程分配在CPU負(fù)載較輕的工作節(jié)點(diǎn)上.這種自適應(yīng)調(diào)度策略很好地解決了Storm環(huán)境中的通信開(kāi)銷(xiāo)問(wèn)題,但仍有4點(diǎn)有待優(yōu)化:1)算法復(fù)雜度很高,在線(xiàn)調(diào)度策略執(zhí)行時(shí)將導(dǎo)致延遲極大,對(duì)性能造成了一定的沖擊;2)算法有效降低了工作節(jié)點(diǎn)間通信開(kāi)銷(xiāo),但對(duì)于同一節(jié)點(diǎn)內(nèi)的進(jìn)程間開(kāi)銷(xiāo)仍比較大;3)Storm的拓?fù)浣Y(jié)構(gòu)錯(cuò)綜復(fù)雜,而實(shí)驗(yàn)中采用的線(xiàn)性拓?fù)淙狈Υ硇裕?)該策略?xún)H考慮CPU這一資源負(fù)載,度量角度較為單一.
文獻(xiàn)[44]提出Storm框架下流量感知的在線(xiàn)調(diào)度策略T-Storm.T-Storm通過(guò)監(jiān)控各任務(wù)的CPU負(fù)載、數(shù)據(jù)流傳輸(流量)負(fù)載以及各工作節(jié)點(diǎn)的CPU負(fù)載,實(shí)現(xiàn)任務(wù)的在線(xiàn)動(dòng)態(tài)重部署,在保證沒(méi)有工作節(jié)點(diǎn)過(guò)載的情況下最小化網(wǎng)絡(luò)傳輸開(kāi)銷(xiāo);同時(shí),針對(duì)運(yùn)行于同一個(gè)工作節(jié)點(diǎn)且屬于同一個(gè)拓?fù)涞乃腥蝿?wù),T-Storm只為其分配1個(gè)工作進(jìn)程(槽),從而消除了進(jìn)程間通信開(kāi)銷(xiāo).此外,T-Storm能通過(guò)參數(shù)調(diào)整增加或減少啟動(dòng)的工作節(jié)點(diǎn)數(shù)量.然而該文獻(xiàn)仍存在2方面不足:1)與文獻(xiàn)[43]類(lèi)似,T-Storm仍存在調(diào)度執(zhí)行開(kāi)銷(xiāo)大和資源負(fù)載度量角度單一的問(wèn)題;2)T-Storm將各任務(wù)發(fā)送和接收的數(shù)據(jù)流大小孤立考慮,忽略了直接通信的1對(duì)任務(wù)之間的數(shù)據(jù)流情況,對(duì)于通信復(fù)雜的拓?fù)涠钥赡軣o(wú)法達(dá)到近似最優(yōu).
文獻(xiàn)[45]提出Storm框架下資源感知的調(diào)度策略R-Storm.R-Storm將資源約束分為針對(duì)內(nèi)存的硬約束以及針對(duì)CPU和網(wǎng)絡(luò)帶寬的軟約束,其中硬約束條件必須滿(mǎn)足,而軟約束條件則盡量滿(mǎn)足.與文獻(xiàn)[44]提出的T-Storm相同,R-Storm不考慮任務(wù)與工作進(jìn)程的映射關(guān)系,而直接考慮拓?fù)渲懈魅蝿?wù)在工作節(jié)點(diǎn)上的分配,在最小化網(wǎng)絡(luò)時(shí)延的同時(shí)最大化資源利用率,最終達(dá)到提高整個(gè)集群吞吐量的效果.R-Storm充分考慮了資源池中各類(lèi)資源的有效利用,但仍存在2點(diǎn)不足:1)雖然R-Storm為編程人員提供了豐富的應(yīng)用程序接口(application programming interface, API),但對(duì)于拓?fù)渲懈魅蝿?wù)的CPU、內(nèi)存和網(wǎng)絡(luò)帶寬需求需要通過(guò)API人為設(shè)置而非實(shí)時(shí)監(jiān)測(cè)獲得,無(wú)法用于數(shù)據(jù)流快速變化場(chǎng)景下的在線(xiàn)調(diào)度.與此同時(shí),Storm編程人員往往專(zhuān)注于業(yè)務(wù)功能的開(kāi)發(fā),對(duì)任務(wù)運(yùn)行時(shí)的資源評(píng)估缺乏經(jīng)驗(yàn),可能對(duì)資源的有效利用造成一定影響.2)R-Storm只適合于同構(gòu)環(huán)境下,不能通用于異構(gòu)環(huán)境.
本文與上述研究成果的不同之處在于:
1) 本文提出的任務(wù)遷移策略針對(duì)以上不足進(jìn)行了改進(jìn),能夠?qū)崟r(shí)監(jiān)測(cè)并統(tǒng)籌兼顧拓?fù)渲懈骶€(xiàn)程的CPU、內(nèi)存和網(wǎng)絡(luò)帶寬負(fù)載,在各類(lèi)資源約束的前提下最小化通信開(kāi)銷(xiāo),保證集群的低延遲.
2) 現(xiàn)有研究[43-45]均是在拓?fù)涞倪\(yùn)行過(guò)程中,發(fā)現(xiàn)資源溢出后針對(duì)所有任務(wù)進(jìn)行重新部署.這種做法雖然能夠帶來(lái)較為明顯的優(yōu)化效果,但算法的執(zhí)行過(guò)程將引起極大的延遲,進(jìn)而影響Storm系統(tǒng)的運(yùn)行效率.本文提出的任務(wù)遷移策略開(kāi)銷(xiāo)較小,能夠保證任意時(shí)刻系統(tǒng)的實(shí)時(shí)性處理需求.
3) 實(shí)驗(yàn)基于Intel公司Zhang等人[46]發(fā)布在GitHub上的storm-benchmark-master基準(zhǔn)測(cè)試,而非已有文獻(xiàn)中作者定義的拓?fù)浣Y(jié)構(gòu),更具代表性.
4) 該策略可適用于異構(gòu)集群環(huán)境中,應(yīng)用范圍更為廣闊.
本節(jié)針對(duì)Storm架構(gòu)和部分概念做簡(jiǎn)要介紹.
如圖1所示,一個(gè)完整的Storm分布式系統(tǒng)由4類(lèi)節(jié)點(diǎn)組成:
1) 主控節(jié)點(diǎn)(master node).它是運(yùn)行Storm Nimbus后臺(tái)服務(wù)的節(jié)點(diǎn).Nimbus是Storm系統(tǒng)的中心,負(fù)責(zé)接收用戶(hù)提交的拓?fù)渥鳂I(yè),向工作節(jié)點(diǎn)分配任務(wù)(進(jìn)程級(jí)和線(xiàn)程級(jí))并傳輸作業(yè)副本,依賴(lài)協(xié)調(diào)節(jié)點(diǎn)的服務(wù)監(jiān)控集群運(yùn)行狀態(tài),提供狀態(tài)獲取接口,在Storm中的地位類(lèi)似于Hadoop中的JobTracker.
2) 工作節(jié)點(diǎn)(work node).它是運(yùn)行Storm Supervisor后臺(tái)服務(wù)的節(jié)點(diǎn).Supervisor負(fù)責(zé)監(jiān)聽(tīng)Nimbus分配的任務(wù)并下載作業(yè)副本,啟動(dòng)、暫?;虺蜂N(xiāo)任務(wù)的工作進(jìn)程(Worker)及工作線(xiàn)程(Executor).Supervisor是分布式部署的,在Storm中的地位類(lèi)似于Hadoop中的TaskTracker.
3) 控制臺(tái)節(jié)點(diǎn)(Web console node).它是運(yùn)行Storm 用戶(hù)界面(user interface, UI)后臺(tái)服務(wù)的節(jié)點(diǎn).它實(shí)際上是一個(gè)Web服務(wù)器,在指定端口提供網(wǎng)頁(yè)服務(wù).用戶(hù)可以使用瀏覽器訪(fǎng)問(wèn)控制臺(tái)節(jié)點(diǎn)的Web頁(yè)面,提交、暫停和撤銷(xiāo)作業(yè),也可以以只讀的形式獲取系統(tǒng)配置、作業(yè)及各個(gè)組件的運(yùn)行時(shí)狀態(tài).
4) 協(xié)調(diào)節(jié)點(diǎn)(coordinate node).它是運(yùn)行Zoo-Keeper進(jìn)程的節(jié)點(diǎn).Nimbus和Supervisor之間所有的協(xié)調(diào),包括分布式狀態(tài)維護(hù)和分布式配置管理,都是通過(guò)該協(xié)調(diào)節(jié)點(diǎn)實(shí)現(xiàn)的.在協(xié)調(diào)節(jié)點(diǎn)的幫助下,Nimbus和Supervisor的無(wú)狀態(tài)服務(wù)可以快速失敗,這也是Storm系統(tǒng)的穩(wěn)定性和可用性較高的原因之一.
為便于后文理解,現(xiàn)對(duì)工作節(jié)點(diǎn)內(nèi)部結(jié)構(gòu)以及拓?fù)湎嚓P(guān)概念做簡(jiǎn)要說(shuō)明:
1) 拓?fù)?topology).即流式作業(yè)本身,可用一個(gè)有向無(wú)環(huán)圖表示,由Spout、Bolt和數(shù)據(jù)流組成.
2) 元組(tuple).Storm數(shù)據(jù)處理的基本單元,是包含了1個(gè)或者多個(gè)鍵值對(duì)的列表.
3) 數(shù)據(jù)流(stream).由無(wú)限的元組組成的序列,是Storm中對(duì)傳遞的數(shù)據(jù)進(jìn)行的抽象.
4) Spout.Storm中的數(shù)據(jù)源編程單元,用于為拓?fù)渖a(chǎn)數(shù)據(jù).一般地,Spout從消息隊(duì)列、關(guān)系型數(shù)據(jù)庫(kù)、非關(guān)系型數(shù)據(jù)庫(kù)(NoSQL)、實(shí)時(shí)日志、Hadoop分布式文件系統(tǒng)(Hadoop distributed file system, HDFS)等外部數(shù)據(jù)源不間斷地讀取數(shù)據(jù),并以元組的形式傳遞給拓?fù)溥M(jìn)行處理.
5) Bolt.Storm中的數(shù)據(jù)處理編程單元,實(shí)現(xiàn)拓?fù)渲械奶幚磉壿?一般地,編程人員可在Bolt中實(shí)現(xiàn)數(shù)據(jù)過(guò)濾、聚合和查詢(xún)數(shù)據(jù)庫(kù)等操作,處理的結(jié)果以元組形式流式傳遞至其下游組件進(jìn)行處理.
6) 組件(component).Spout和Bolt的統(tǒng)稱(chēng).
7) 流組模式(stream grouping).Storm各組件之間的數(shù)據(jù)傳遞模式,目前共有8種,分別是隨機(jī)分組(shuffle grouping)、按域分組(field grouping)、副本分組(all grouping)、全局分組(global grouping)、直接分組(direct grouping)、本地分組(local or shuffle grouping)、不區(qū)分分組(non grouping)和自定義流組.
8) 工作進(jìn)程(Worker).實(shí)際上是一個(gè)Java虛擬機(jī)(Java virtual machine, JVM),由它執(zhí)行指定的子拓?fù)?同一個(gè)拓?fù)淇梢杂啥鄠€(gè)工作進(jìn)程共同完成,但1個(gè)工作進(jìn)程只能執(zhí)行1個(gè)子拓?fù)?
9) 槽(Slot).配置在工作節(jié)點(diǎn)上用于接收數(shù)據(jù)的端口,1個(gè)工作進(jìn)程占用1個(gè)槽,槽的數(shù)量表明該工作節(jié)點(diǎn)最多可容納的工作進(jìn)程的數(shù)量.一般為了拓?fù)涞倪M(jìn)程級(jí)并行,槽的個(gè)數(shù)配置為工作節(jié)點(diǎn)CPU的核心數(shù).
10) 任務(wù)(task).每個(gè)任務(wù)為其對(duì)應(yīng)組件的1個(gè)實(shí)例,是拓?fù)鋱?zhí)行的代碼單元.
11) 工作線(xiàn)程(Executor).每個(gè)工作進(jìn)程由多個(gè)工作線(xiàn)程組成,并且運(yùn)行1個(gè)組件中包含的1個(gè)或多個(gè)任務(wù),因此系統(tǒng)中工作線(xiàn)程的數(shù)量總是小于或等于任務(wù)的數(shù)量.一般地,為了實(shí)現(xiàn)任務(wù)的線(xiàn)程級(jí)并行,1個(gè)工作線(xiàn)程只包含1個(gè)任務(wù).在這種情況下,Storm任務(wù)的調(diào)度即相當(dāng)于該任務(wù)所對(duì)應(yīng)的工作線(xiàn)程的調(diào)度.本文即在此場(chǎng)景下研究Storm的任務(wù)遷移策略,下文簡(jiǎn)稱(chēng)為線(xiàn)程.
為簡(jiǎn)便起見(jiàn),除第5節(jié)之外,下文出現(xiàn)“節(jié)點(diǎn)”之處均指工作節(jié)點(diǎn),出現(xiàn)“進(jìn)程”之處均指工作進(jìn)程;第5節(jié)實(shí)驗(yàn)部分為避免混淆,均對(duì)具體意義上的“節(jié)點(diǎn)”和“進(jìn)程”做了說(shuō)明.
本節(jié)從Storm拓?fù)涞倪壿嬆P?、?shí)例模型和任務(wù)分配模型出發(fā),比較Storm集群中3種不同的通信方式.由此建立資源約束模型和最優(yōu)通信開(kāi)銷(xiāo)模型,為任務(wù)遷移策略的設(shè)計(jì)與實(shí)現(xiàn)提供了理論依據(jù).
定義1. 拓?fù)溥壿嬆P?任意拓?fù)淇墒褂枚M(C,S)表示,其中C={c1,c2,…,c|C|}為拓?fù)涞捻旤c(diǎn)集合,每個(gè)元素表示1個(gè)組件,即Spout或Bolt;S={s1,2,s1,3,…,s|C|-i,|C|}為拓?fù)涞挠邢蜻吋?,每個(gè)元素表示2個(gè)組件間傳遞的數(shù)據(jù)流.如果存在si,j∈S且i≠j,則ci,cj∈C,表示數(shù)據(jù)由組件ci發(fā)出并由cj接收.這樣的有向無(wú)環(huán)圖表示的拓?fù)浞Q(chēng)為拓?fù)溥壿嬆P?如圖2所示,其中{ca,cb,…,cg}為組件集,{sa,c,sa,d,…,se,g}為數(shù)據(jù)流集.ca和cb為數(shù)據(jù)源編程單元Spout,負(fù)責(zé)讀取外部數(shù)據(jù)源并發(fā)送至流式計(jì)算集群進(jìn)行處理;其余的組件為數(shù)據(jù)處理編程單元Bolt,負(fù)責(zé)處理上游組件發(fā)送的數(shù)據(jù)并以某種流組模式將結(jié)果發(fā)送至其下游組件;特別地,組件cf和cg為數(shù)據(jù)終點(diǎn),通常用于將最終結(jié)果展示至終端或持久化至數(shù)據(jù)庫(kù).
Fig. 3 Instance model of a topology圖3 拓?fù)鋵?shí)例模型
定義3. 子拓?fù)?由拓?fù)鋵?shí)例模型中原拓?fù)涞木€(xiàn)程的子集以及這些線(xiàn)程之間的數(shù)據(jù)流構(gòu)成.設(shè)原拓?fù)涞木€(xiàn)程集合為E,拓?fù)涞木€(xiàn)程的子集為E′,對(duì)于emn∈E和ei j,ek l∈E′,若在原拓?fù)渲写嬖趕i j,mn和smn,k l,則必然有emn∈E′.例如在圖3中,eb,ed2,ef2以及它們之間的數(shù)據(jù)流可構(gòu)成子拓?fù)?,而僅由eb和ef2則無(wú)法構(gòu)成子拓?fù)?
定義4. 任務(wù)分配模型.在Storm集群中,資源池由一系列節(jié)點(diǎn)構(gòu)成,定義該集合為N={n1,n2,…,n|N|}.每個(gè)節(jié)點(diǎn)內(nèi)配置有若干個(gè)槽,對(duì)于任意ni∈N,有Sloti={sloti1,sloti2,…,sloti|Sloti|},其中sloti j表示第i個(gè)節(jié)點(diǎn)的第j個(gè)槽.對(duì)于任意1個(gè)拓?fù)?,用?hù)將在代碼中設(shè)置其運(yùn)行所需的進(jìn)程數(shù)量和每個(gè)組件的線(xiàn)程數(shù)量.設(shè)某拓?fù)涞倪M(jìn)程集合為Wi={wi1,wi2,…,wi|Wi|},其中wi j表示第i個(gè)節(jié)點(diǎn)上運(yùn)行的第j個(gè)進(jìn)程.由槽和工作進(jìn)程的定義可知,有Wi?Sloti.若組件ci運(yùn)行的第j個(gè)線(xiàn)程ei j分配到了第k個(gè)節(jié)點(diǎn)上,則記f(ei j)=nk;若組件ci運(yùn)行的第j個(gè)線(xiàn)程ei j分配到了第k個(gè)節(jié)點(diǎn)的第l個(gè)槽上,則記g(ei j)=slotk l.其中f和g均為分配函數(shù).如圖4為圖3的拓?fù)溥\(yùn)行于Storm集群中的真實(shí)場(chǎng)景,其中橢圓形虛線(xiàn)表示進(jìn)程,矩形實(shí)線(xiàn)表示節(jié)點(diǎn),圖4的含義為拓?fù)涞?0個(gè)線(xiàn)程分布在2個(gè)節(jié)點(diǎn)的3個(gè)進(jìn)程中,以ed2和ee為例可表示為:f(ed2)=n1,g(ed2)=slot12,f(ee)=n2,g(ee)=slot21.
Fig. 4 Model for task assignment圖4 任務(wù)分配模型
由此可見(jiàn),在Storm系統(tǒng)中存在3種通信方式:
1) 節(jié)點(diǎn)間通信,即節(jié)點(diǎn)間的直接通信.這種通信開(kāi)銷(xiāo)最大,需要占用大量的網(wǎng)絡(luò)資源.如果網(wǎng)絡(luò)負(fù)載過(guò)高或帶寬過(guò)小,將對(duì)數(shù)據(jù)處理的實(shí)時(shí)性產(chǎn)生很大影響.如圖4所示,線(xiàn)程eb和ed3之間的通信即屬于節(jié)點(diǎn)間通信,數(shù)據(jù)流sb,d3為節(jié)點(diǎn)間數(shù)據(jù)流.
2) 進(jìn)程間通信,即同一個(gè)節(jié)點(diǎn)內(nèi)的進(jìn)程間直接通信.這種通信開(kāi)銷(xiāo)介于節(jié)點(diǎn)間通信開(kāi)銷(xiāo)和線(xiàn)程間通信開(kāi)銷(xiāo)之間.如圖4所示,線(xiàn)程ed1和ef2之間的通信即屬于進(jìn)程間通信,數(shù)據(jù)流sd1,f2為進(jìn)程間數(shù)據(jù)流.
3) 線(xiàn)程間通信,即同一個(gè)節(jié)點(diǎn)且同一進(jìn)程內(nèi)的線(xiàn)程間直接通信.這種通信開(kāi)銷(xiāo)最小且不可避免.如圖4所示,線(xiàn)程ed1和ef1之間的通信即屬于線(xiàn)程間通信,數(shù)據(jù)流sd1,f1為線(xiàn)程間數(shù)據(jù)流.
Storm默認(rèn)調(diào)度器EvenScheduler采用輪詢(xún)策略進(jìn)行任務(wù)分配.EvenScheduler首先遍歷拓?fù)渲械乃袑?shí)例并為每個(gè)實(shí)例分配1個(gè)線(xiàn)程;然后將其均勻分配到各個(gè)進(jìn)程中;接著根據(jù)各個(gè)節(jié)點(diǎn)上槽的空閑情況,將進(jìn)程均勻分配到各個(gè)節(jié)點(diǎn)上.這樣的調(diào)度策略并未考慮到節(jié)點(diǎn)間和進(jìn)程間較大的通信開(kāi)銷(xiāo)以及異構(gòu)節(jié)點(diǎn)間的性能差異.針對(duì)這一問(wèn)題,本文從以下3個(gè)方面提出優(yōu)化構(gòu)思:
1) 文獻(xiàn)[44]通過(guò)Storm吞吐量測(cè)試[47]表明,針對(duì)運(yùn)行在1個(gè)節(jié)點(diǎn)上的子拓?fù)涠?,若為其分配多個(gè)進(jìn)程(即占用多個(gè)槽),將會(huì)增加進(jìn)程間通信開(kāi)銷(xiāo)進(jìn)而影響性能.因此,在節(jié)點(diǎn)數(shù)量不變的情況下,僅為位于每個(gè)節(jié)點(diǎn)的子拓?fù)涓鞣峙?個(gè)進(jìn)程,能夠達(dá)到性能最優(yōu)的效果,這為解決最小化進(jìn)程間通信開(kāi)銷(xiāo)問(wèn)題提供了思路.
2) 對(duì)于節(jié)點(diǎn)間通信問(wèn)題,需在充分利用有限資源的基礎(chǔ)上,盡量將彼此間數(shù)據(jù)流較大的1對(duì)任務(wù)部署到同一節(jié)點(diǎn),即將節(jié)點(diǎn)間通信轉(zhuǎn)化為節(jié)點(diǎn)內(nèi)的線(xiàn)程間通信,從而達(dá)到最小化節(jié)點(diǎn)間通信開(kāi)銷(xiāo)的目的.
3) 針對(duì)異構(gòu)節(jié)點(diǎn)間的性能差異問(wèn)題,需實(shí)時(shí)監(jiān)測(cè)不同節(jié)點(diǎn)上各線(xiàn)程的負(fù)載,并預(yù)測(cè)某線(xiàn)程遷移到目的節(jié)點(diǎn)上各資源的變化情況,為線(xiàn)程的遷移提供決策.
(1)
(2)
在Storm應(yīng)用環(huán)境中,為保證集群的可靠性目標(biāo),各節(jié)點(diǎn)不能滿(mǎn)負(fù)荷運(yùn)行,一般需由運(yùn)維人員設(shè)定相應(yīng)閾值以預(yù)留少量的計(jì)算資源,當(dāng)超出閾值后則發(fā)出警告.設(shè)CPU占用率閾值為α,內(nèi)存占用率閾值為β,網(wǎng)絡(luò)帶寬占用率閾值為γ,則式(1)~(3)可進(jìn)一步完善為
(4)
(5)
在Storm集群中,各節(jié)點(diǎn)的資源占用由運(yùn)行在該節(jié)點(diǎn)上的子拓?fù)渌乃芯€(xiàn)程占用的各類(lèi)資源共同構(gòu)成,則剩余資源為各資源總量與資源占用的差值,有:
(7)
(8)
(10)
(11)
(12)
轉(zhuǎn)化后的資源約束模型將為后文線(xiàn)程遷移的決策過(guò)程提供理論依據(jù).
由3.1節(jié)可知,拓?fù)渲邪?種通信開(kāi)銷(xiāo),節(jié)點(diǎn)間通信開(kāi)銷(xiāo)最大,進(jìn)程間通信開(kāi)銷(xiāo)次之,線(xiàn)程間通信開(kāi)銷(xiāo)最小.因此需要在滿(mǎn)足資源約束模型的同時(shí)最小化這3類(lèi)開(kāi)銷(xiāo).然而拓?fù)湟坏┨峤?,線(xiàn)程數(shù)量和數(shù)據(jù)流數(shù)量即固定下來(lái),且為保證元組的流式傳遞,各線(xiàn)程之間的通信開(kāi)銷(xiāo)不可避免.若要達(dá)到最小化通信開(kāi)銷(xiāo)的效果,需最小化節(jié)點(diǎn)間通信開(kāi)銷(xiāo)和進(jìn)程間通信開(kāi)銷(xiāo);換言之,即需盡可能地將節(jié)點(diǎn)間通信開(kāi)銷(xiāo)和進(jìn)程間通信開(kāi)銷(xiāo)轉(zhuǎn)化為線(xiàn)程間通信開(kāi)銷(xiāo).同一類(lèi)型的通信開(kāi)銷(xiāo)可由數(shù)據(jù)流大小體現(xiàn),數(shù)據(jù)流越大,則傳輸時(shí)間越長(zhǎng),即通信開(kāi)銷(xiāo)越大,反之亦然.設(shè)拓?fù)渲芯€(xiàn)程ei j與ek l之間的數(shù)據(jù)流大小為vi j,k l或vk l,i j(v的下標(biāo)與數(shù)據(jù)流向無(wú)關(guān)),則可建立優(yōu)化模型:
(13)
約束條件為式(10)~(12).
定理1. 最優(yōu)通信開(kāi)銷(xiāo)原則.最小化節(jié)點(diǎn)間通信開(kāi)銷(xiāo)和進(jìn)程間通信開(kāi)銷(xiāo)等價(jià)于最大化線(xiàn)程間通信開(kāi)銷(xiāo),即上述目標(biāo)函數(shù)等價(jià)于
(15)
證明. 根據(jù)Storm流式計(jì)算模型,拓?fù)湟坏┨峤坏郊?,其包含的線(xiàn)程數(shù)量和數(shù)據(jù)流數(shù)量即不可改變.因此在不發(fā)生擁塞的情況下,總數(shù)據(jù)流大小為一定值C,即
(16)
證畢.
為了消除進(jìn)程間通信開(kāi)銷(xiāo),本文僅為調(diào)度到各節(jié)點(diǎn)上的子拓?fù)涓鞣峙?個(gè)進(jìn)程,可看作是將節(jié)點(diǎn)上原有的多個(gè)進(jìn)程合并,這樣就將原來(lái)存在的進(jìn)程間通信開(kāi)銷(xiāo)轉(zhuǎn)化為線(xiàn)程間通信開(kāi)銷(xiāo),在節(jié)點(diǎn)數(shù)量不變的前提下優(yōu)化了通信效率.因此下面將著重解決節(jié)點(diǎn)間通信開(kāi)銷(xiāo)問(wèn)題.
為滿(mǎn)足最優(yōu)通信開(kāi)銷(xiāo)模型的要求,需修改Storm默認(rèn)調(diào)度策略,將已提交拓?fù)涞母鱾€(gè)線(xiàn)程重新分配至各個(gè)節(jié)點(diǎn).若將各節(jié)點(diǎn)理解成不同容量的背包,拓?fù)渲械木€(xiàn)程理解為不同的物品,可使用0-1背包問(wèn)題的思想分析和解決此類(lèi)NP問(wèn)題.然而為同時(shí)滿(mǎn)足3.2節(jié)資源約束模型的要求,各背包應(yīng)同時(shí)存在CPU、內(nèi)存和網(wǎng)絡(luò)帶寬3類(lèi)不同的資源約束,且為保證最小化節(jié)點(diǎn)間開(kāi)銷(xiāo),需成對(duì)考慮線(xiàn)程的放置,這便構(gòu)成了一個(gè)二次型三維約束條件的多背包問(wèn)題.已有學(xué)者提出多種方法解決背包問(wèn)題的變形,例如動(dòng)態(tài)規(guī)劃[48]、搜索樹(shù)(如A*算法)[49]、近似算法[50-51]等.然而,在分布式流式計(jì)算框架中,此類(lèi)算法時(shí)間復(fù)雜度較高,拓?fù)涞牟渴疬^(guò)程開(kāi)銷(xiāo)很大,將對(duì)集群的運(yùn)行效率帶來(lái)較大的負(fù)面影響;此外,在拓?fù)涓骶€(xiàn)程的調(diào)度過(guò)程中,數(shù)據(jù)源仍在源源不斷地產(chǎn)生數(shù)據(jù),若無(wú)法及時(shí)得到處理,可能導(dǎo)致元組積壓甚至因超時(shí)而處理失敗,無(wú)法滿(mǎn)足Storm用戶(hù)的實(shí)時(shí)性業(yè)務(wù)需求.可見(jiàn)調(diào)度策略的優(yōu)化設(shè)計(jì)應(yīng)盡可能減少受影響的線(xiàn)程數(shù)量,由此提出Storm框架下的任務(wù)遷移模型.
定理2. 遷移優(yōu)化原則1.若存在這樣1個(gè)節(jié)點(diǎn),使得某線(xiàn)程與該節(jié)點(diǎn)的節(jié)點(diǎn)間數(shù)據(jù)流大于該線(xiàn)程與其所在節(jié)點(diǎn)內(nèi)的線(xiàn)程間數(shù)據(jù)流,則該線(xiàn)程遷移后將獲得更優(yōu)的通信開(kāi)銷(xiāo).
證明. 設(shè)待遷移線(xiàn)程為節(jié)點(diǎn)ns上的線(xiàn)程ei j,與ei j存在節(jié)點(diǎn)間數(shù)據(jù)流且剩余資源充裕的節(jié)點(diǎn)集合為Nd,其中nd為Nd中任意節(jié)點(diǎn).則線(xiàn)程ei j與其所在節(jié)點(diǎn)ns內(nèi)的線(xiàn)程形成的數(shù)據(jù)流大小為
(17)
線(xiàn)程ei j與其他節(jié)點(diǎn)之間形成的數(shù)據(jù)流大小總和為
(18)
線(xiàn)程ei j與nd之間的數(shù)據(jù)流大小為
(19)
因此,對(duì)于線(xiàn)程ei j,與其相關(guān)的數(shù)據(jù)流總量為
vi j=vinterExecutor+vinterNode=
vinterExecutor+vinter_nd+(vinterNode-vinter_nd).
(20)
若將線(xiàn)程ei j遷移至節(jié)點(diǎn)nd,則原來(lái)的vinterExecutor必然由節(jié)點(diǎn)ns內(nèi)的線(xiàn)程間數(shù)據(jù)流變?yōu)閚d與ns之間的節(jié)點(diǎn)間數(shù)據(jù)流,即:
(21)
而原來(lái)的vinter_nd變?yōu)榱斯?jié)點(diǎn)nd內(nèi)部的線(xiàn)程間數(shù)據(jù)流,即:
(22)
此時(shí)遷移至節(jié)點(diǎn)nd上的線(xiàn)程ei j形成的節(jié)點(diǎn)間數(shù)據(jù)流總和為
(23)
因此對(duì)于遷移后的線(xiàn)程ei j,與其相關(guān)的數(shù)據(jù)流總量為
(24)
線(xiàn)程遷移前后拓?fù)鋵?shí)例模型并未改變,因此有:
(25)
要使得遷移后的拓?fù)渚哂懈鼉?yōu)的通信開(kāi)銷(xiāo),則根據(jù)定理1需獲得更大的線(xiàn)程間數(shù)據(jù)流,即令:
(26)
(27)
故式(26)等價(jià)于
vinter_nd-vinterExecutor>0.
(28)
因此,若存在這樣的節(jié)點(diǎn)nd,使得線(xiàn)程ei j與nd的節(jié)點(diǎn)間數(shù)據(jù)流大于ei j與其所在節(jié)點(diǎn)ns內(nèi)的線(xiàn)程間數(shù)據(jù)流,則該線(xiàn)程遷移至節(jié)點(diǎn)nd后將獲得更優(yōu)的通信開(kāi)銷(xiāo).
證畢.
定理2闡述了為滿(mǎn)足最優(yōu)通信開(kāi)銷(xiāo)模型的要求,選擇遷移目的節(jié)點(diǎn)時(shí)的理論依據(jù).實(shí)際上,定理2還可以從選擇待遷移線(xiàn)程的角度出發(fā)進(jìn)行如下描述:
定理2′. 遷移優(yōu)化原則2.若存在這樣1個(gè)線(xiàn)程,使得該線(xiàn)程與某節(jié)點(diǎn)的節(jié)點(diǎn)間數(shù)據(jù)流大于該線(xiàn)程與其所在節(jié)點(diǎn)內(nèi)的線(xiàn)程間數(shù)據(jù)流,則該線(xiàn)程遷移后將獲得更優(yōu)的通信開(kāi)銷(xiāo).
定理2′的證明過(guò)程與定理2類(lèi)似,在此不再贅述.
引理1. 節(jié)點(diǎn)間數(shù)據(jù)流最優(yōu)性原理.若某線(xiàn)程遷移前與某一節(jié)點(diǎn)之間的數(shù)據(jù)流最大,則遷移到該節(jié)點(diǎn)后將轉(zhuǎn)化為最大的線(xiàn)程間數(shù)據(jù)流,且與其他節(jié)點(diǎn)之間的數(shù)據(jù)流達(dá)到最小.
證畢.
由定理2和引理1可以得出以下結(jié)論:
結(jié)論1. 最優(yōu)遷移原則.為了通過(guò)線(xiàn)程遷移達(dá)到最優(yōu)通信開(kāi)銷(xiāo)原則的要求,需在存在若干節(jié)點(diǎn)與待遷移線(xiàn)程之間的數(shù)據(jù)流大于該線(xiàn)程與其所在節(jié)點(diǎn)內(nèi)的線(xiàn)程形成的數(shù)據(jù)流的基礎(chǔ)上,將線(xiàn)程遷移至具有最大節(jié)點(diǎn)間數(shù)據(jù)流的節(jié)點(diǎn),即
(29)
s.t.
(30)
為保證Storm的性能需求,任務(wù)遷移策略需滿(mǎn)足3個(gè)原則:1)為減少線(xiàn)程的遷移對(duì)Storm運(yùn)行效率的沖擊,需最小化遷移開(kāi)銷(xiāo);2)遷移后各節(jié)點(diǎn)的各類(lèi)資源占用不超過(guò)閾值;3)遷移后集群的運(yùn)行效率得以提高,即令通信開(kāi)銷(xiāo)達(dá)到最優(yōu).為同時(shí)滿(mǎn)足以上3個(gè)原則的要求,需要實(shí)時(shí)監(jiān)測(cè)各節(jié)點(diǎn)的各類(lèi)資源剩余情況,當(dāng)可用資源不足時(shí),選擇對(duì)該節(jié)點(diǎn)運(yùn)行效率影響最大的線(xiàn)程執(zhí)行遷移,且同時(shí)兼顧降低節(jié)點(diǎn)之間的通信開(kāi)銷(xiāo).本節(jié)將圍繞任務(wù)遷移策略的具體設(shè)計(jì)過(guò)程展開(kāi)討論.
當(dāng)某節(jié)點(diǎn)的某類(lèi)資源占用率持續(xù)一段時(shí)間超出閾值后,則將該節(jié)點(diǎn)標(biāo)記為源節(jié)點(diǎn),意味著運(yùn)行在該節(jié)點(diǎn)上的某些線(xiàn)程將被遷出.這里需要考慮2種特殊情況:
1) 多個(gè)節(jié)點(diǎn)的資源占用率同時(shí)超出閾值.當(dāng)這種情況發(fā)生時(shí),不便于同時(shí)將各節(jié)點(diǎn)均標(biāo)記為源節(jié)點(diǎn).這是因?yàn)樵垂?jié)點(diǎn)上的線(xiàn)程在遷移之前需要預(yù)測(cè)遷移后目的節(jié)點(diǎn)的資源剩余情況,而為了尋求最優(yōu)遷移目標(biāo),符合本節(jié)提出的3個(gè)原則的節(jié)點(diǎn)均可作為各源節(jié)點(diǎn)上待遷出線(xiàn)程的目的節(jié)點(diǎn),不具有排他性.若多個(gè)源節(jié)點(diǎn)同時(shí)選擇了相同的目的節(jié)點(diǎn)且并發(fā)地將部分線(xiàn)程遷移至此,則遷移后目的節(jié)點(diǎn)的資源剩余情況將遠(yuǎn)小于線(xiàn)程遷移前預(yù)測(cè)的結(jié)果,并可能發(fā)生資源溢出.因此就本文研究背景來(lái)看,需在時(shí)間開(kāi)銷(xiāo)可接受范圍內(nèi)將資源占用率超出閾值的節(jié)點(diǎn)分別處理.
2) 同一節(jié)點(diǎn)上不同類(lèi)資源(CPU、內(nèi)存和網(wǎng)絡(luò)帶寬)的占用率均超出閾值.當(dāng)某2類(lèi)資源或者這3類(lèi)資源的占用率同時(shí)超過(guò)閾值時(shí),需根據(jù)流式計(jì)算的特點(diǎn)為各類(lèi)資源分配不同的優(yōu)先級(jí).在Storm系統(tǒng)中,內(nèi)存作為數(shù)據(jù)臨時(shí)性存儲(chǔ)的唯一介質(zhì),一旦溢出將會(huì)造成災(zāi)難性后果,所以解決內(nèi)存資源的緊缺性問(wèn)題最為迫切,優(yōu)先級(jí)應(yīng)設(shè)為最高.其次,CPU執(zhí)行具體任務(wù),并不可避免地產(chǎn)生序列化、反序列化和保證消息可靠傳輸?shù)阮~外開(kāi)銷(xiāo),網(wǎng)絡(luò)則負(fù)責(zé)將CPU計(jì)算后的結(jié)果在節(jié)點(diǎn)之間進(jìn)行傳輸,這2類(lèi)資源均會(huì)對(duì)拓?fù)涞膱?zhí)行效率產(chǎn)生直接影響,其優(yōu)先級(jí)需針對(duì)不同應(yīng)用中資源占用的傾向不同而人為設(shè)定.
綜合以上2種情況,我們?cè)谥骺毓?jié)點(diǎn)中采用了LinkedHashMap這一數(shù)據(jù)結(jié)構(gòu).與HashMap不同的是,LinkedHashMap保存了記錄的插入順序,讀取時(shí)可按序輸出,保證最先發(fā)生任務(wù)過(guò)載的節(jié)點(diǎn)被最先處理.當(dāng)某個(gè)節(jié)點(diǎn)的某類(lèi)資源占用率超出閾值后,將節(jié)點(diǎn)ID,資源類(lèi)型這一鍵值對(duì)發(fā)送并插入到位于主控節(jié)點(diǎn)的LinkedHashMap中;當(dāng)同一節(jié)點(diǎn)上不同類(lèi)型的資源占用率超出閾值后,執(zhí)行插入操作時(shí)則需根據(jù)資源優(yōu)先級(jí)類(lèi)型進(jìn)行判斷:若當(dāng)前資源的優(yōu)先級(jí)高于LinkedHashMap中已存有的資源類(lèi)型,則自動(dòng)執(zhí)行替換;否則保持原狀.由此設(shè)計(jì)算法1.
算法1. 源節(jié)點(diǎn)選擇算法.
輸入:節(jié)點(diǎn)n1,n2,…,ni,…;CPU占用率閾值α,內(nèi)存占用率閾值β,網(wǎng)絡(luò)帶寬占用率閾值γ;CPU優(yōu)先級(jí)pr_cpu,內(nèi)存優(yōu)先級(jí)pr_ram,網(wǎng)絡(luò)帶寬優(yōu)先級(jí)pr_bandwidth;
輸出:源節(jié)點(diǎn)集Ns;
初始化:Ns←newLinkedHashMapNode,Resource. /*源節(jié)點(diǎn)集*/
① if節(jié)點(diǎn)ni的內(nèi)存占用率在時(shí)間間隔T內(nèi)持續(xù)大于βthen
②Ns.put(ni, “RAM”);
③ end if
④ if節(jié)點(diǎn)ni的CPU占用率在時(shí)間間隔T內(nèi)持續(xù)大于αthen
⑤ ifNs.containsKey(ni)=false then
/*ni為新加入源節(jié)點(diǎn)集的節(jié)點(diǎn)*/
⑥Ns.put(ni, “CPU”);
⑦ else
⑧pr←getPriority(Ns.get(ni));
/*獲取ni已超出閾值資源的優(yōu)先級(jí)*/
⑨ ifpr_cpu>prthen
/*CPU優(yōu)先級(jí)更高*/
⑩Ns.put(ni, “CPU”);
出于篇幅原因,算法1僅以CPU和內(nèi)存資源為例說(shuō)明了不同優(yōu)先級(jí)的資源占用率超出閾值時(shí)源節(jié)點(diǎn)的選擇情況.若某一節(jié)點(diǎn)的網(wǎng)絡(luò)帶寬資源占用率超出閾值時(shí),其算法邏輯與行④~相同,只需把CPU資源替換為網(wǎng)絡(luò)帶寬資源即可.可見(jiàn),當(dāng)同一節(jié)點(diǎn)上不同類(lèi)資源占用率均超出閾值時(shí),算法1能夠保證優(yōu)先級(jí)高的資源被優(yōu)先考慮,而優(yōu)先級(jí)低的資源則暫被取代.其可行的原因是后文采用了線(xiàn)程遷移策略,隨著線(xiàn)程的遷出,源節(jié)點(diǎn)上優(yōu)先級(jí)高的資源問(wèn)題解決之后,其余類(lèi)型的資源問(wèn)題可能隨之解決;若沒(méi)有解決,該節(jié)點(diǎn)將由于某類(lèi)資源占用率超出閾值而繼續(xù)觸發(fā)行①和行④的判定條件,進(jìn)而重新加入源節(jié)點(diǎn)集等候處理.各源節(jié)點(diǎn)的處理時(shí)間很短,不會(huì)因某源節(jié)點(diǎn)未及時(shí)處理而導(dǎo)致資源溢出等嚴(yán)重后果,這將在4.3節(jié)的算法評(píng)估中詳細(xì)介紹.
任務(wù)遷移分為遷移決策和遷移執(zhí)行2個(gè)過(guò)程,其中執(zhí)行僅作為決策結(jié)果的實(shí)施,因此本節(jié)的重點(diǎn)在于論述任務(wù)遷移的決策過(guò)程,分為阻尼線(xiàn)程的選擇和遷移目的節(jié)點(diǎn)的選擇2個(gè)步驟.
1) 阻尼線(xiàn)程的選擇
源節(jié)點(diǎn)資源占用率超出閾值后,應(yīng)遷出運(yùn)行在該節(jié)點(diǎn)上的部分線(xiàn)程以降低負(fù)載.為選擇合適的待遷移線(xiàn)程,首先引入阻尼線(xiàn)程的概念.
定義5. 阻尼線(xiàn)程.即決定從源節(jié)點(diǎn)遷移出的線(xiàn)程.由于這樣的線(xiàn)程對(duì)源節(jié)點(diǎn)運(yùn)行效率起到了一定的阻礙作用,故稱(chēng)之為“阻尼”.為使得Storm集群的性能達(dá)到最優(yōu),需選擇阻尼線(xiàn)程進(jìn)行遷移.
從表面上看,資源占用越大的線(xiàn)程則阻尼越大,為使得某節(jié)點(diǎn)的資源占用率迅速降至閾值以下,應(yīng)采用貪心策略依次將該節(jié)點(diǎn)上資源占用最大的線(xiàn)程遷出,直到滿(mǎn)足資源約束模型為止.這種做法雖然可以減少遷移次數(shù)從而最小化遷移開(kāi)銷(xiāo),但未將線(xiàn)程之間的通信類(lèi)型考慮在內(nèi),可能導(dǎo)致遷移后節(jié)點(diǎn)間通信開(kāi)銷(xiāo)的增加.因此,為同時(shí)滿(mǎn)足最優(yōu)通信開(kāi)銷(xiāo)模型的要求,應(yīng)在盡可能減少遷移次數(shù)的前提下,選擇那些能夠使得節(jié)點(diǎn)間通信開(kāi)銷(xiāo)轉(zhuǎn)化為更多節(jié)點(diǎn)內(nèi)線(xiàn)程間通信開(kāi)銷(xiāo)的線(xiàn)程.這樣的線(xiàn)程才滿(mǎn)足阻尼線(xiàn)程的標(biāo)準(zhǔn).
2) 目的節(jié)點(diǎn)的選擇
阻尼線(xiàn)程選定之后,另一個(gè)關(guān)鍵問(wèn)題是目的節(jié)點(diǎn)的選擇,由結(jié)論1可知,選擇的目的節(jié)點(diǎn)需與阻尼線(xiàn)程之間存在最大節(jié)點(diǎn)間數(shù)據(jù)流,且需滿(mǎn)足這樣的節(jié)點(diǎn)間數(shù)據(jù)流大于與阻尼線(xiàn)程相關(guān)的源節(jié)點(diǎn)內(nèi)線(xiàn)程間數(shù)據(jù)流.這種做法雖然優(yōu)化了通信開(kāi)銷(xiāo),但需在滿(mǎn)足資源約束模型的前提下進(jìn)行遷移,否則可能導(dǎo)致遷移后目的節(jié)點(diǎn)負(fù)載過(guò)重形成新的瓶頸.為了保證容納阻尼線(xiàn)程遷入的節(jié)點(diǎn)資源占用不超出閾值,在阻尼線(xiàn)程遷出源節(jié)點(diǎn)之前務(wù)必完成剩余資源的估算,下面將以CPU資源為例展開(kāi)討論.
(31)
(32)
例如,若源節(jié)點(diǎn)采用2.5 GHz的雙核CPU,備選目的節(jié)點(diǎn)采用2.0 GHz的4核CPU,某線(xiàn)程在源節(jié)點(diǎn)上的CPU占用率為10%,則根據(jù)式(32),得該線(xiàn)程在備選目的節(jié)點(diǎn)上的CPU占用率為6.25%.
在遷移的決策過(guò)程中,可通過(guò)式(33)(34)預(yù)測(cè)線(xiàn)程遷移后源節(jié)點(diǎn)和備選目的節(jié)點(diǎn)的資源剩余情況:
(33)
(34)
算法2. 任務(wù)遷移算法.
輸入:節(jié)點(diǎn)n1,n2,…,ni,…;源節(jié)點(diǎn)集Ns;
初始化:ns←newNode; /*源節(jié)點(diǎn)*/
/*分配在源節(jié)點(diǎn)上的有序線(xiàn)程集合*/
vinterExecutor←0; /*當(dāng)前線(xiàn)程與源節(jié)點(diǎn)內(nèi)的前驅(qū)和后繼線(xiàn)程構(gòu)成的線(xiàn)程間數(shù)據(jù)流大小*/
Nd←newArrayNode; /*備選目的節(jié)點(diǎn)集*/
vinter_nd←0; /*當(dāng)前線(xiàn)程與某備選目的節(jié)點(diǎn)上的前驅(qū)和后繼線(xiàn)程構(gòu)成的節(jié)點(diǎn)間數(shù)據(jù)流大小*/
vmax←0. /*最大節(jié)點(diǎn)間數(shù)據(jù)流*/
①ns←獲取Ns中的第1個(gè)元素;
④ 若源節(jié)點(diǎn)剩余資源充裕則跳出該循環(huán),不再遷移下一個(gè)線(xiàn)程,否則執(zhí)行下述語(yǔ)句;
⑤ 根據(jù)式(17)計(jì)算vinterExecutor;
⑥Nd←與線(xiàn)程i存在直接節(jié)點(diǎn)間通信且當(dāng)前剩余資源充裕的節(jié)點(diǎn);
⑦ fornd=Nd[0] toNd.Length-1 do
⑧ if預(yù)測(cè)節(jié)點(diǎn)nd容納線(xiàn)程i后依然滿(mǎn)足
資源約束模型then
⑨ 根據(jù)式(19)計(jì)算vinter_nd;
⑩ ifvinter_nd>vmaxthen
/*當(dāng)前線(xiàn)程與目的節(jié)點(diǎn)的節(jié)點(diǎn)間數(shù)據(jù)流大于該線(xiàn)程與所在源節(jié)點(diǎn)內(nèi)的線(xiàn)程間數(shù)據(jù)流*/
/*該源節(jié)點(diǎn)上所有阻尼線(xiàn)程及其遷出的目的節(jié)點(diǎn)決策完成,從源節(jié)點(diǎn)集中刪去該節(jié)點(diǎn)*/
4.3.1 算法復(fù)雜度分析
算法1實(shí)質(zhì)上為L(zhǎng)inkedHashMap的插入和替換算法,時(shí)間復(fù)雜度為O(1).下面將針對(duì)算法2中遷移決策的計(jì)算開(kāi)銷(xiāo)和遷移執(zhí)行開(kāi)銷(xiāo)進(jìn)行分析.
4.3.2 算法執(zhí)行效果分析
如3.3節(jié)所述,Storm拓?fù)湓诟鞴?jié)點(diǎn)上的分配是一個(gè)二次型三維約束條件的多背包問(wèn)題,無(wú)法在多項(xiàng)式時(shí)間內(nèi)找到全局最優(yōu)解,但可以采用已有研究的方法將拓?fù)渲夭渴鹨詫で蠼迫肿顑?yōu)[43-45].而本文提出的線(xiàn)程遷移僅針對(duì)阻尼線(xiàn)程重新調(diào)度,可看作是任務(wù)分配模型的局部?jī)?yōu)化.本節(jié)將通過(guò)對(duì)比分析,針對(duì)局部?jī)?yōu)化后的效果展開(kāi)討論.
流式計(jì)算的性能可使用各任務(wù)計(jì)算的時(shí)間開(kāi)銷(xiāo)和任務(wù)間傳輸?shù)臅r(shí)間開(kāi)銷(xiāo)進(jìn)行衡量.為了實(shí)現(xiàn)任務(wù)的線(xiàn)程級(jí)并行,通常1個(gè)線(xiàn)程只包含1個(gè)任務(wù),因此文中線(xiàn)程與任務(wù)的含義一致.根據(jù)文獻(xiàn)[41]中流式計(jì)算模式下計(jì)算和通信開(kāi)銷(xiāo)的評(píng)估方法可知,對(duì)于運(yùn)行在源節(jié)點(diǎn)ns上的任意1個(gè)線(xiàn)程ei j,其計(jì)算開(kāi)銷(xiāo)可表示為
(35)
(36)
(37)
(38)
隨著遷移次數(shù)k的增加,每次遷移后源節(jié)點(diǎn)ns可獲得的計(jì)算能力提升將愈加有限,即:
且
(39)
同理可分析各對(duì)線(xiàn)程之間的通信開(kāi)銷(xiāo),其函數(shù)關(guān)系為
(40)
(41)
(42)
式(42)為遷移前線(xiàn)程ei j與ek l之間的通信開(kāi)銷(xiāo)與遷移后線(xiàn)程ei j與ek l之間的通信開(kāi)銷(xiāo)的差值,表明了線(xiàn)程因遷移而降低的通信開(kāi)銷(xiāo),可見(jiàn)Δti j,k l僅與線(xiàn)程ei j與ek l之間剩余的網(wǎng)絡(luò)帶寬資源有關(guān).接下來(lái)即可采用與上述評(píng)估計(jì)算開(kāi)銷(xiāo)相同的方法進(jìn)行各次遷移過(guò)程中通信開(kāi)銷(xiāo)的評(píng)估.結(jié)果表明,若在第m次遷移完成后,各對(duì)線(xiàn)程之間的通信開(kāi)銷(xiāo)已達(dá)最優(yōu),則在共計(jì)m次的線(xiàn)程遷移過(guò)程中,通信開(kāi)銷(xiāo)逐漸降低且下降的速率逐漸變緩,能夠由節(jié)點(diǎn)間通信轉(zhuǎn)化為節(jié)點(diǎn)內(nèi)線(xiàn)程間通信的數(shù)據(jù)流將越來(lái)越少.
設(shè)采用Storm默認(rèn)調(diào)度策略的線(xiàn)程分配結(jié)果為G,采用本文策略進(jìn)行局部遷移后的線(xiàn)程分配結(jié)果為G′,拓?fù)渲夭渴鸷筮_(dá)到的近似全局最優(yōu)的線(xiàn)程分配結(jié)果為G″.如4.3.1節(jié)所述,在G向G′演變的過(guò)程中,對(duì)于?ns∈N,線(xiàn)程的遷移次數(shù)最多為Δhns;若要使得G′與G″相等,則需在G′的基礎(chǔ)上繼續(xù)遷移位于ns上的m-Δhns個(gè)線(xiàn)程.設(shè)第i次線(xiàn)程遷移可降低的計(jì)算和通信開(kāi)銷(xiāo)共為Δti,則由G演變到G′的Δhns次遷移過(guò)程中可降低的開(kāi)銷(xiāo)總和為
(43)
平均每次遷移可降低的開(kāi)銷(xiāo)為
(44)
在此基礎(chǔ)上若再進(jìn)行m-Δhns次遷移,即由G′演變到G″的過(guò)程中可降低的開(kāi)銷(xiāo)總和為
(45)
平均每次遷移可降低的開(kāi)銷(xiāo)為
(46)
由G到G″的m次遷移過(guò)程中可降低的開(kāi)銷(xiāo)總和為
(47)
平均每次遷移可降低的開(kāi)銷(xiāo)為
(48)
Storm為編程人員提供了可插拔的自定義調(diào)度器.為部署本文提出的任務(wù)遷移策略,需實(shí)現(xiàn)backtype.storm.scheduler.IScheduler接口中的schedule方法,其原型為public voidschedule(Topologiestopologies, Clustercluster).其中對(duì)象topologies包含當(dāng)前集群運(yùn)行的所有拓?fù)湫畔ⅲǜ黝?lèi)參數(shù)的配置信息以及線(xiàn)程到組件ID的映射關(guān)系等;對(duì)象cluster包含當(dāng)前集群的所有狀態(tài)信息,包括拓?fù)渲懈骶€(xiàn)程在節(jié)點(diǎn)和進(jìn)程上的映射信息、節(jié)點(diǎn)和槽的使用與空閑信息等.以上信息均可通過(guò)各對(duì)象的API獲得.對(duì)于拓?fù)渲懈骶€(xiàn)程的CPU資源占用信息,可通過(guò)Java API中ThreadMXBean類(lèi)的getThreadCpuTime(longid)方法獲取,其中id為線(xiàn)程ID;對(duì)于各線(xiàn)程的網(wǎng)絡(luò)帶寬資源占用信息,可通過(guò)Storm UI提供的REST API獲取節(jié)點(diǎn)間各線(xiàn)程的元組傳輸速率,并結(jié)合實(shí)驗(yàn)中設(shè)置的元組大小,通過(guò)累加簡(jiǎn)單估算求得;而由于各線(xiàn)程存在共享內(nèi)存,則對(duì)于各線(xiàn)程的內(nèi)存資源占用情況,僅能結(jié)合storm.yaml文件中配置的worker.childopts參數(shù)和jstack等JVM性能監(jiān)控工具進(jìn)行粗略估計(jì);此外,操作系統(tǒng)中硬件相關(guān)參數(shù)和負(fù)載信息可通過(guò)/proc目錄下相關(guān)文件獲取.代碼編寫(xiě)完畢后,將其打jar包至${STORM_HOME}/lib目錄下,并在主控節(jié)點(diǎn)的storm.yaml文件中配置參數(shù)storm.scheduler即可運(yùn)行.
改進(jìn)的Storm架構(gòu)如圖5所示.需要說(shuō)明的是,運(yùn)行進(jìn)程UI的控制臺(tái)節(jié)點(diǎn)和運(yùn)行進(jìn)程ZooKeeper的協(xié)調(diào)節(jié)點(diǎn)仍保持原狀,故圖5中將其相關(guān)部分省去.改進(jìn)的Storm系統(tǒng)架構(gòu)中新增4個(gè)模塊:
1) 負(fù)載監(jiān)視器(load monitor).在一定時(shí)間窗口內(nèi),收集各線(xiàn)程占用的CPU、內(nèi)存和網(wǎng)絡(luò)帶寬負(fù)載信息及各線(xiàn)程之間的數(shù)據(jù)流大小.部署時(shí)需在各Spout的open()和nextTuple()方法以及各Bolt的prepare()和execute()方法中調(diào)用該模塊.
2) MySQL數(shù)據(jù)庫(kù)(MySQL Database).存儲(chǔ)任務(wù)分配信息和負(fù)載監(jiān)視器傳來(lái)的負(fù)載信息,并實(shí)時(shí)更新.
3) 遷移發(fā)生器(migration generator).部署算法1和算法2.負(fù)責(zé)讀取數(shù)據(jù)庫(kù)中的負(fù)載信息,并作出任務(wù)遷移決策.
4) 自定義調(diào)度器(custom scheduler).覆蓋主控節(jié)點(diǎn)的默認(rèn)調(diào)度策略,讀取遷移發(fā)生器的調(diào)度決策并執(zhí)行遷移.
Fig. 5 Improved architecture of Storm圖5 改進(jìn)的Storm系統(tǒng)架構(gòu)
為驗(yàn)證任務(wù)遷移策略的有效性,本節(jié)將通過(guò)下述實(shí)驗(yàn)進(jìn)行比較和評(píng)價(jià).
為更好地觀測(cè)資源有限且節(jié)點(diǎn)異構(gòu)的情況下任務(wù)遷移策略的執(zhí)行過(guò)程,實(shí)驗(yàn)環(huán)境采用不同硬件配置的PC機(jī)搭建1個(gè)包含有20個(gè)節(jié)點(diǎn)的Storm集群,其中運(yùn)行進(jìn)程N(yùn)imbus、進(jìn)程UI和數(shù)據(jù)庫(kù)服務(wù)MySQL的主控節(jié)點(diǎn)1個(gè),運(yùn)行進(jìn)程ZooKeeper的協(xié)調(diào)節(jié)點(diǎn)3個(gè),其余16個(gè)為運(yùn)行進(jìn)程Supervisor的工作節(jié)點(diǎn).表1列出了各節(jié)點(diǎn)具體的硬件配置,其中各工作節(jié)點(diǎn)的CPU僅使用其單核的處理能力,硬盤(pán)容量為250 GB,轉(zhuǎn)速為7 200 r/min,接口為SATA3.0.在表1中,根據(jù)運(yùn)行進(jìn)程Supervisor的16個(gè)工作節(jié)點(diǎn)的硬件配置的高低,可大體將工作節(jié)點(diǎn)分為低端、中端和高端3類(lèi),為簡(jiǎn)便起見(jiàn),下文將運(yùn)行進(jìn)程Supervisor 1~3的工作節(jié)點(diǎn)簡(jiǎn)稱(chēng)為低配節(jié)點(diǎn),將運(yùn)行進(jìn)程Supervisor 4~13的工作節(jié)點(diǎn)簡(jiǎn)稱(chēng)為中配節(jié)點(diǎn),將運(yùn)行進(jìn)程Supervisor 14~16的工作節(jié)點(diǎn)簡(jiǎn)稱(chēng)為高配節(jié)點(diǎn).除硬件配置之外,各節(jié)點(diǎn)軟件方面配置相同,如表2所示.
Table 1 Hardware Configuration of Storm Cluster表1 Storm集群硬件配置
Table 2 Software Configuration of Storm Cluster表2 Storm集群軟件配置
為全面測(cè)試任務(wù)遷移策略在各類(lèi)不同資源開(kāi)銷(xiāo)下的有效性,實(shí)驗(yàn)數(shù)據(jù)選取GitHub上storm-benchmark-master提供的4組基準(zhǔn)測(cè)試用例,分別是CPU敏感型(CPU-sensitive)的WordCount、內(nèi)存敏感型(memory-sensitive)的RollingSort、網(wǎng)絡(luò)帶寬敏感型(network-sensitive)的SOL以及Storm真實(shí)場(chǎng)景下的應(yīng)用RollingCount[46].各基準(zhǔn)測(cè)試均采用其自帶的文本文檔作為輸入數(shù)據(jù).表3列出了各項(xiàng)參數(shù)配置,需要進(jìn)一步解釋的參數(shù)如下:1)component.xxx_num為該基準(zhǔn)測(cè)試中設(shè)置的組件并行度,即1個(gè)Spout或Bolt運(yùn)行的實(shí)例(線(xiàn)程)數(shù)量.2)topology.pr_xxx為運(yùn)行該項(xiàng)基準(zhǔn)測(cè)試時(shí)設(shè)置的資源優(yōu)先級(jí),其值越大表示該類(lèi)資源的優(yōu)先級(jí)越高,在源節(jié)點(diǎn)選擇算法運(yùn)行時(shí)將被優(yōu)先考慮.3)SOL中的topology.level表示拓?fù)涞膶哟?,即其包含的組件數(shù)量,需設(shè)置為大于或等于2的整數(shù);本文設(shè)置該值為3,結(jié)合component.xxx_num參數(shù)配置來(lái)看,該拓?fù)鋺?yīng)包含有1個(gè)運(yùn)行著64個(gè)實(shí)例的
Table 3 Configuration of Benchmarks表3 基準(zhǔn)測(cè)試參數(shù)配置
Spout和2個(gè)運(yùn)行著128個(gè)實(shí)例的Bolt,其包含的線(xiàn)程總數(shù)與WordCount和RollingCount一致,但與RollingSort不同.除表3所示配置之外還需進(jìn)行一些通用配置:1)為消除進(jìn)程間通信開(kāi)銷(xiāo),各基準(zhǔn)測(cè)試運(yùn)行時(shí)僅在1個(gè)工作節(jié)點(diǎn)內(nèi)分配1個(gè)工作進(jìn)程,即設(shè)置topology.workers為16;2)為保證數(shù)據(jù)流的可靠傳輸,各工作進(jìn)程除了運(yùn)行分配給它的線(xiàn)程之外,還額外運(yùn)行1個(gè)Acker Bolt實(shí)例,即設(shè)置topology.acker.executors為16;3)為方便實(shí)驗(yàn)觀測(cè)而有意提高工作節(jié)點(diǎn)負(fù)載,但需保證在表3的配置下防止元組傳輸因超時(shí)而重傳,通過(guò)多次實(shí)驗(yàn)結(jié)果設(shè)置topology.max.spout.pending的合適值為100;4)為結(jié)合以上配置而適時(shí)觸發(fā)任務(wù)遷移,設(shè)定CPU占用率閾值α、內(nèi)存占用率閾值β和網(wǎng)絡(luò)帶寬占用率閾值γ均為70%,設(shè)定任務(wù)遷移策略的觸發(fā)周期T為30 s,表示系統(tǒng)在趨于穩(wěn)定后,若某類(lèi)資源占用率在30 s內(nèi)持續(xù)超過(guò)70%,則觸發(fā)任務(wù)遷移策略.
為驗(yàn)證本文遷移策略的有效性,文中除了與Storm默認(rèn)輪詢(xún)策略進(jìn)行對(duì)比之外,還部署了文獻(xiàn)[43]的Storm自適應(yīng)在線(xiàn)調(diào)度策略.其核心思想是實(shí)時(shí)監(jiān)測(cè)CPU負(fù)載情況和各對(duì)線(xiàn)程之間的數(shù)據(jù)流大小,當(dāng)CPU負(fù)載持續(xù)超出閾值時(shí)觸發(fā)任務(wù)重部署機(jī)制,即首先按照大小遞減的順序排列拓?fù)渲懈鲗?duì)線(xiàn)程之間的數(shù)據(jù)流,然后將線(xiàn)程逐對(duì)調(diào)度至那些能夠令其部署后產(chǎn)生最低CPU負(fù)載的工作進(jìn)程和工作節(jié)點(diǎn)中.該策略可看作是任務(wù)重部署策略的代表,下文簡(jiǎn)稱(chēng)為在線(xiàn)策略.表4列出了采用在線(xiàn)策略時(shí)的各項(xiàng)參數(shù)配置.需要說(shuō)明的是,表4中的reschedule.timeout為在線(xiàn)策略的觸發(fā)周期,capacity為在線(xiàn)策略中CPU的使用率上限,這2個(gè)值分別與本文算法中的T和α值設(shè)置一致,目的是在同等CPU負(fù)載條件下觸發(fā)任務(wù)調(diào)度;此外,最后4項(xiàng)為在線(xiàn)策略中為優(yōu)化拓?fù)鋱?zhí)行效率而人為設(shè)定的值,與本文提到的資源占用率閾值α,β,γ無(wú)關(guān).
Table 4 Configuration of Online Scheduler表4 在線(xiàn)策略參數(shù)設(shè)置
本節(jié)首先使用WordCount,RollingSort,SOL這3組資源敏感型的基準(zhǔn)測(cè)試在延遲、資源占用和工作節(jié)點(diǎn)間通信開(kāi)銷(xiāo)這3個(gè)方面進(jìn)行任務(wù)遷移策略的評(píng)估,最后使用RollingCount這一Storm環(huán)境下的真實(shí)場(chǎng)景在延遲方面進(jìn)行測(cè)試.為便于數(shù)據(jù)統(tǒng)計(jì),以下測(cè)試均設(shè)置metrics.poll=5 s,metrics.time=300 s,即每組實(shí)驗(yàn)每5 s進(jìn)行1次采樣,時(shí)長(zhǎng)為5 min.
5.2.1 延遲測(cè)試
Fig. 6 Comparison of latency among different task scheduling strategies圖6 不同任務(wù)調(diào)度策略下的系統(tǒng)延遲對(duì)比
延遲表明了1個(gè)元組從Spout發(fā)射到最終被成功處理的時(shí)間消耗,反應(yīng)了拓?fù)鋱?zhí)行1次的響應(yīng)時(shí)間,刻畫(huà)了系統(tǒng)的運(yùn)行效率.圖6統(tǒng)計(jì)了基準(zhǔn)測(cè)試WordCount,RollingSort,SOL在Storm默認(rèn)策略(Default)、在線(xiàn)策略(Online)和任務(wù)遷移策略(TMSH-Storm)下的系統(tǒng)延遲.
如圖6所示,從0開(kāi)始到第1個(gè)峰值結(jié)束時(shí)間段表明各拓?fù)涮峤粫r(shí)的部署過(guò)程,此時(shí)的調(diào)度均遵循Storm默認(rèn)策略,故在同一個(gè)基準(zhǔn)測(cè)試中,各策略均需在這一階段耗費(fèi)幾乎相同的時(shí)間,元組的處理延遲也大體相同.第1個(gè)峰值過(guò)后,系統(tǒng)延遲逐漸趨于收斂,在線(xiàn)策略與任務(wù)遷移策略開(kāi)始收集集群中各工作節(jié)點(diǎn)以及工作節(jié)點(diǎn)上各線(xiàn)程占用的CPU、內(nèi)存和網(wǎng)絡(luò)帶寬負(fù)載信息及各線(xiàn)程之間的數(shù)據(jù)流大小,為各線(xiàn)程的優(yōu)化配置提供決策依據(jù).
圖6(a)展示了各策略執(zhí)行1次WordCount過(guò)程中的系統(tǒng)延遲.可見(jiàn)在第160 s時(shí)在線(xiàn)策略觸發(fā),延遲出現(xiàn)極高峰值.這是由于在線(xiàn)策略需根據(jù)各線(xiàn)程的CPU負(fù)載和各對(duì)線(xiàn)程之間數(shù)據(jù)流的大小情況,將拓?fù)渲邪乃芯€(xiàn)程在各工作節(jié)點(diǎn)上重新分配,整個(gè)過(guò)程相當(dāng)于拓?fù)涮峤粫r(shí)的初始化任務(wù)分配,執(zhí)行開(kāi)銷(xiāo)較大,此時(shí)數(shù)據(jù)流因無(wú)法及時(shí)處理而導(dǎo)致延遲出現(xiàn)極高峰值.由圖6(a)可知,該策略執(zhí)行對(duì)系統(tǒng)延遲的影響時(shí)長(zhǎng)約在第160~205 s范圍內(nèi),共耗時(shí)約45 s,平均延遲約2.369 s,這將影響Storm系統(tǒng)處理數(shù)據(jù)的實(shí)時(shí)性并易導(dǎo)致數(shù)據(jù)流處理失敗.而任務(wù)遷移策略因只影響超出閾值的一小部分線(xiàn)程,執(zhí)行開(kāi)銷(xiāo)較小.遷移發(fā)生在第165~185 s之間,共耗時(shí)約20 s,平均延遲約920.1 ms,有效降低了算法執(zhí)行過(guò)程對(duì)集群性能的沖擊.
接下來(lái)對(duì)內(nèi)存敏感型的RollingSort和網(wǎng)絡(luò)帶寬敏感型的SOL進(jìn)行測(cè)試.需要說(shuō)明的是,僅當(dāng)CPU負(fù)載在30 s內(nèi)持續(xù)超過(guò)70%時(shí)在線(xiàn)策略才會(huì)被觸發(fā)生效,對(duì)于其他資源占用率超出閾值的情況并不作任何處理.本文為了更全面地開(kāi)展對(duì)比實(shí)驗(yàn),特經(jīng)過(guò)反復(fù)調(diào)參設(shè)定各值(如表3所示),以保證在RollingSort和SOL的運(yùn)行過(guò)程中,存在1個(gè)以上的工作節(jié)點(diǎn)在超出內(nèi)存和網(wǎng)絡(luò)帶寬閾值但不發(fā)生資源溢出的同時(shí),令其CPU占用率也超出閾值,這樣即滿(mǎn)足了在線(xiàn)策略觸發(fā)的條件.由圖6(b)和圖6(c)可以看出,RollingSort和SOL的執(zhí)行過(guò)程與Word-Count類(lèi)似,分別約在第140~170 s和第155~195 s期間執(zhí)行在線(xiàn)策略,平均延遲分別約為471.4 ms和983.6 ms;在第140~160 s和160~185 s期間執(zhí)行任務(wù)遷移策略,平均延遲分別約為217.4 ms和408.1 ms,分別僅為在線(xiàn)策略的46.1%和41.5%.其中Rolling-Sort的初次部署和重部署消耗的時(shí)間都略小于其他基準(zhǔn)測(cè)試,這是因?yàn)樗慕M件中僅存在1個(gè)包含有64個(gè)線(xiàn)程的Spout和1個(gè)包含有128個(gè)線(xiàn)程的Bolt,相對(duì)于WordCount和SOL,其包含的線(xiàn)程數(shù)更少,因此具有更低的部署開(kāi)銷(xiāo);而SOL的遷移時(shí)間略大于其他的基準(zhǔn)測(cè)試,這是因?yàn)樗枰w移的線(xiàn)程數(shù)量更多,具體原因?qū)⒃?.2.2節(jié)中進(jìn)行闡述.
在線(xiàn)策略和任務(wù)遷移策略執(zhí)行完畢后,系統(tǒng)延遲再次趨于收斂,且表現(xiàn)明顯低于默認(rèn)策略.在WordCount中,2種策略的系統(tǒng)延遲分別穩(wěn)定在332.4 ms和340.8 ms,相對(duì)于默認(rèn)策略分別降低約28.7%和26.9%;在RollingSort中,延遲分別穩(wěn)定在93.3 ms和88.4 ms,相比默認(rèn)策略降低約21.1%和25.3%;在SOL中,延遲分別穩(wěn)定在123.1 ms和118.6 ms,相比默認(rèn)策略降低約24.6%和27.4%.可以看到,在WordCount基準(zhǔn)測(cè)試中,任務(wù)遷移策略穩(wěn)定后的表現(xiàn)稍遜色于在線(xiàn)策略;而在RollingSort和SOL基準(zhǔn)測(cè)試中,任務(wù)遷移策略穩(wěn)定后的表現(xiàn)甚至略?xún)?yōu)于在線(xiàn)策略.這是因?yàn)樵诰€(xiàn)策略在進(jìn)行拓?fù)渲懈骶€(xiàn)程重部署時(shí)僅考慮到CPU資源剩余情況,對(duì)于集群中各工作節(jié)點(diǎn)而言,這樣的分配策略?xún)H可在一定程度上滿(mǎn)足CPU這一資源層面上的負(fù)載均衡,適合于WordCount這類(lèi)CPU敏感型的應(yīng)用;而就本實(shí)驗(yàn)中各工作節(jié)點(diǎn)的硬件配置來(lái)看,10個(gè)中配節(jié)點(diǎn)和3個(gè)高配節(jié)點(diǎn)均具有相同的內(nèi)存大小和網(wǎng)絡(luò)帶寬,若僅從CPU資源方面考慮,3個(gè)高配節(jié)點(diǎn)必將承載更多的線(xiàn)程放置,進(jìn)而導(dǎo)致內(nèi)存和網(wǎng)絡(luò)帶寬剩余資源緊缺形成性能瓶頸.而任務(wù)遷移策略充分考慮到各工作節(jié)點(diǎn)中各類(lèi)資源的剩余情況,優(yōu)化遷移負(fù)載超出閾值的線(xiàn)程,保證各類(lèi)資源的剩余情況均在閾值設(shè)定范圍之內(nèi),因此在RollingSort和SOL測(cè)試中表現(xiàn)更佳.
綜上所述,相對(duì)于Storm默認(rèn)調(diào)度機(jī)制,在線(xiàn)策略和任務(wù)遷移策略均能有效降低系統(tǒng)延遲.而由于任務(wù)遷移策略能夠統(tǒng)籌兼顧工作節(jié)點(diǎn)中各類(lèi)資源的剩余情況,且只針對(duì)負(fù)載超出閾值的少量線(xiàn)程進(jìn)行遷移,因此更加適用于不同種類(lèi)的應(yīng)用場(chǎng)景,且執(zhí)行過(guò)程不會(huì)對(duì)集群運(yùn)行效率造成較大影響,保證了大數(shù)據(jù)流式處理的實(shí)時(shí)性.
5.2.2 資源占用測(cè)試
本節(jié)討論在Storm默認(rèn)調(diào)度策略、在線(xiàn)策略和任務(wù)遷移策略下,CPU敏感型的WordCount、內(nèi)存敏感型的RollingSort和網(wǎng)絡(luò)帶寬敏感型的SOL分別運(yùn)行時(shí)的資源占用情況.由于這3組基準(zhǔn)測(cè)試具有明顯不同的資源占用傾向,因此只需分別測(cè)試其傾向于占用的資源類(lèi)型即可.圖7展示了各基準(zhǔn)測(cè)試在3類(lèi)調(diào)度策略下運(yùn)行穩(wěn)定后各工作節(jié)點(diǎn)的資源占用情況.
Fig. 7 Comparison of resource occupancy rate among different task scheduling strategies圖7 不同任務(wù)調(diào)度策略下的資源占用率對(duì)比
由圖7可知,由于Storm輪詢(xún)的調(diào)度機(jī)制為各工作節(jié)點(diǎn)分配相同的線(xiàn)程個(gè)數(shù),忽略了彼此之間的性能差異,因此各工作節(jié)點(diǎn)上的負(fù)載分配并不均勻,3個(gè)低配節(jié)點(diǎn)上的各類(lèi)資源占用均已超出閾值.圖7(a)表示W(wǎng)ordCount運(yùn)行時(shí)各工作節(jié)點(diǎn)的CPU占用率.由于低配、中配和高配3類(lèi)節(jié)點(diǎn)具有明顯的CPU性能差異,因此在默認(rèn)策略中顯示出了明顯的階梯特征,性能越低的工作節(jié)點(diǎn)CPU占用率越高,而同等配置的若干工作節(jié)點(diǎn)之間CPU占用率差距不大.圖7(b)(c)分別表示RollingSort運(yùn)行時(shí)各工作節(jié)點(diǎn)的內(nèi)存占用率和SOL運(yùn)行時(shí)各工作節(jié)點(diǎn)的網(wǎng)絡(luò)帶寬占用率.由于在表1的硬件配置中,中高配節(jié)點(diǎn)的內(nèi)存和網(wǎng)絡(luò)帶寬性能相同,僅與3個(gè)低配節(jié)點(diǎn)存在性能差異,因此中高配節(jié)點(diǎn)上這2類(lèi)資源的占用率相差不大,而低配節(jié)點(diǎn)的占用率明顯更高,且資源占用與其擁有的資源總量基本呈現(xiàn)反比關(guān)系.以圖7(b)為例,對(duì)于配置了1 GB內(nèi)存的3個(gè)低配節(jié)點(diǎn),其內(nèi)存占用率平均值為78.3%;而對(duì)于配置了2 GB內(nèi)存的其他13個(gè)工作節(jié)點(diǎn),其內(nèi)存占用平均值僅為39.2%,約為低配節(jié)點(diǎn)的一半左右.可見(jiàn)默認(rèn)策略?xún)H適用于Storm同構(gòu)環(huán)境,異構(gòu)環(huán)境中則極易造成嚴(yán)重的資源占用傾斜甚至溢出.
在線(xiàn)策略根據(jù)CPU負(fù)載和各對(duì)線(xiàn)程之間的數(shù)據(jù)流大小情況實(shí)現(xiàn)在線(xiàn)任務(wù)重部署.這種做法執(zhí)行開(kāi)銷(xiāo)較大,但針對(duì)CPU敏感型的拓?fù)涠裕軌蜻_(dá)到異構(gòu)環(huán)境下CPU層面上負(fù)載均衡的效果.圖7(a)充分說(shuō)明了這一點(diǎn),可以看出在線(xiàn)策略運(yùn)行穩(wěn)定后各工作節(jié)點(diǎn)的CPU占用率基本均衡.但在圖7(b)和(c)中,高配節(jié)點(diǎn)的內(nèi)存和網(wǎng)絡(luò)帶寬占用率明顯更高,甚至在圖7(c)的Supervisor 15中,網(wǎng)絡(luò)帶寬占用率已超出設(shè)定的閾值.這是由于在線(xiàn)策略?xún)H孤立地考慮CPU負(fù)載,而忽略了其他資源的剩余情況.在本實(shí)驗(yàn)的硬件配置環(huán)境下,10個(gè)中配節(jié)點(diǎn)和3個(gè)高配節(jié)點(diǎn)的CPU配置不同,而內(nèi)存和網(wǎng)絡(luò)帶寬配置一致,當(dāng)在線(xiàn)策略執(zhí)行后,勢(shì)必導(dǎo)致更多的線(xiàn)程分配至高配節(jié)點(diǎn),內(nèi)存和網(wǎng)絡(luò)帶寬占用率必將大幅上升.特別地,當(dāng)高配節(jié)點(diǎn)CPU性能更高,而其他類(lèi)型的硬件配置與中配節(jié)點(diǎn)持平甚至更低的情況下,內(nèi)存與網(wǎng)絡(luò)帶寬資源可能發(fā)生溢出并導(dǎo)致拓?fù)錈o(wú)法執(zhí)行.這是在線(xiàn)策略的另一個(gè)缺陷.
探討本文提出的任務(wù)遷移策略.當(dāng)某工作節(jié)點(diǎn)上某類(lèi)資源的占用率持續(xù)30 s超出閾值時(shí),遷移發(fā)生器根據(jù)本文提出的源節(jié)點(diǎn)選擇算法和任務(wù)遷移算法,選擇少量線(xiàn)程執(zhí)行遷移,直到集群中不存在任何工作節(jié)點(diǎn)的任意類(lèi)型資源負(fù)載超出閾值為止.由于任務(wù)遷移策略觸發(fā)前采用的依舊是Storm默認(rèn)輪詢(xún)的調(diào)度機(jī)制,因此可將圖中的Default策略看成是遷移策略執(zhí)行前各工作節(jié)點(diǎn)的資源占用情況.由圖7可知,WordCount,RollingSort,SOL在默認(rèn)策略下運(yùn)行時(shí),3個(gè)低配節(jié)點(diǎn)的CPU、內(nèi)存和網(wǎng)絡(luò)帶寬占用已分別超出閾值.當(dāng)任務(wù)遷移策略執(zhí)行完畢并趨于穩(wěn)定后,低配節(jié)點(diǎn)上的若干線(xiàn)程遷移到了其他節(jié)點(diǎn),任務(wù)過(guò)載問(wèn)題均得以解決.此時(shí)通過(guò)Storm UI觀測(cè)發(fā)現(xiàn),在圖7(a)所示的WordCount運(yùn)行過(guò)程中,原分布在3個(gè)低配節(jié)點(diǎn)上的11個(gè)線(xiàn)程分別遷移到了節(jié)點(diǎn)Supervisor 7,8,9,11,14上,其中節(jié)點(diǎn)Supervisor 9容納3個(gè)線(xiàn)程,其余節(jié)點(diǎn)各容納2個(gè)線(xiàn)程;同理,在圖7(b)所示的RollingSort運(yùn)行過(guò)程中,原分布在3個(gè)低配節(jié)點(diǎn)上的8個(gè)線(xiàn)程分別遷移到了節(jié)點(diǎn)Supervisor 4,5,6,11,16上,其中節(jié)點(diǎn)Supervisor 4和11分別容納1個(gè)線(xiàn)程,其余節(jié)點(diǎn)各容納2個(gè)線(xiàn)程.在圖7(c)所示的SOL運(yùn)行過(guò)程中,3個(gè)低配節(jié)點(diǎn)的網(wǎng)絡(luò)帶寬占用率分別為87.9%,92.1%,90%,已較大程度超出閾值,因此遷移的線(xiàn)程數(shù)量較多.據(jù)統(tǒng)計(jì),低配節(jié)點(diǎn)中共計(jì)27個(gè)線(xiàn)程分別遷移到了節(jié)點(diǎn)Supervisor 5,9,12,14上,各節(jié)點(diǎn)分別容納8個(gè)、6個(gè)、8個(gè)和5個(gè)線(xiàn)程.由此可見(jiàn),任務(wù)遷移策略能夠統(tǒng)籌兼顧Storm異構(gòu)環(huán)境下各類(lèi)資源的剩余情況,有效解決任務(wù)過(guò)載問(wèn)題,但出于最小遷移開(kāi)銷(xiāo)考慮,尚無(wú)法實(shí)現(xiàn)集群中各工作節(jié)點(diǎn)的負(fù)載均衡.為更好地解決這一問(wèn)題,需在拓?fù)溥\(yùn)行前充分分析其內(nèi)部結(jié)構(gòu),使用改進(jìn)的任務(wù)分配方式取代輪詢(xún)方式的初次部署,未來(lái)將繼續(xù)開(kāi)展研究.
5.2.3 節(jié)點(diǎn)間通信開(kāi)銷(xiāo)測(cè)試
本節(jié)討論在Storm默認(rèn)策略、在線(xiàn)策略和任務(wù)遷移策略下,WordCount,RollingSort,SOL運(yùn)行時(shí)的工作節(jié)點(diǎn)間通信開(kāi)銷(xiāo).圖8展示了10次實(shí)驗(yàn)中各基準(zhǔn)測(cè)試運(yùn)行穩(wěn)定后工作節(jié)點(diǎn)間單位時(shí)間通信總量的均值.
Fig. 8 Comparison of inter-node communication overhead among different task scheduling strategies圖8 不同任務(wù)調(diào)度策略下的節(jié)點(diǎn)間通信開(kāi)銷(xiāo)對(duì)比
由圖8可知,使用在線(xiàn)策略和任務(wù)遷移策略執(zhí)行3組基準(zhǔn)測(cè)試之后,工作節(jié)點(diǎn)間傳輸?shù)臄?shù)據(jù)流大小均有所下降.在線(xiàn)策略執(zhí)行后,工作節(jié)點(diǎn)間數(shù)據(jù)流大小的平均值分別為61 743 tuple/s,27 504 tuple/s,33 046 tuple/s,相對(duì)于默認(rèn)策略分別降低了13.8%,19.6%,23.8%;任務(wù)遷移策略執(zhí)行后,工作節(jié)點(diǎn)間數(shù)據(jù)流大小的平均值分別為64 130 tuple/s,29 665 tuple/s,35 213 tuple/s,相比默認(rèn)策略的運(yùn)行結(jié)果分別降低了10.4%,13.3%,18.8%,效果稍落后于在線(xiàn)策略.這是因?yàn)樵诰€(xiàn)策略是以降低工作節(jié)點(diǎn)間通信開(kāi)銷(xiāo)為目的進(jìn)行拓?fù)渲懈骶€(xiàn)程的重新部署,雖然執(zhí)行開(kāi)銷(xiāo)大且易導(dǎo)致資源占用不均,但優(yōu)化的范圍更廣.然而從優(yōu)化效率的角度來(lái)看,當(dāng)任務(wù)遷移策略執(zhí)行結(jié)束之后,各基準(zhǔn)測(cè)試中遷移的線(xiàn)程數(shù)量分別為11個(gè)、8個(gè)和27個(gè),平均遷移1個(gè)線(xiàn)程可降低的工作節(jié)點(diǎn)間通信開(kāi)銷(xiāo)約為0.9%,1.6%,0.7%;而對(duì)于在線(xiàn)策略而言,所需重部署的線(xiàn)程數(shù)量即為該拓?fù)渲邪木€(xiàn)程總數(shù),分別為336個(gè)、208個(gè)和336個(gè),平均遷移1個(gè)線(xiàn)程可降低的工作節(jié)點(diǎn)間通信開(kāi)銷(xiāo)微乎其微,遠(yuǎn)小于任務(wù)遷移策略的優(yōu)化效率,這與4.3.2節(jié)中算法執(zhí)行效果分析的結(jié)果是一致的.
5.2.4 真實(shí)應(yīng)用場(chǎng)景下的測(cè)試
Fig. 9 Comparison of latency on RollingCount among different task scheduling strategies圖9 RollingCount在不同任務(wù)調(diào)度策略下的系統(tǒng)延遲對(duì)比
RollingCount是Storm環(huán)境下的一個(gè)典型大數(shù)據(jù)應(yīng)用程序,它用于在內(nèi)存中持續(xù)按照某個(gè)統(tǒng)計(jì)指標(biāo)(如出現(xiàn)次數(shù))計(jì)算窗口內(nèi)的TopN,然后每隔一段時(shí)間輸出實(shí)時(shí)計(jì)算后的TopN結(jié)果,能夠廣泛應(yīng)用于各類(lèi)大數(shù)據(jù)實(shí)時(shí)排序需求的場(chǎng)景,例如實(shí)時(shí)熱門(mén)微博、廣告和商品等的統(tǒng)計(jì).表3中的參數(shù)window.length和emit.frequency即為設(shè)定的窗口長(zhǎng)度和統(tǒng)計(jì)頻率,單位為s.本組實(shí)驗(yàn)采用與5.2.1節(jié)中描述相同的方法統(tǒng)計(jì)RollingCount分別在Storm默認(rèn)策略、在線(xiàn)策略和任務(wù)遷移策略下運(yùn)行的系統(tǒng)延遲,結(jié)果如圖9所示:
由圖9可知,與之前3個(gè)基準(zhǔn)測(cè)試結(jié)果類(lèi)似,RollingCount的部署需要一個(gè)過(guò)程.第1個(gè)峰值過(guò)后,系統(tǒng)延遲趨于平穩(wěn),在線(xiàn)策略和任務(wù)遷移策略開(kāi)始統(tǒng)計(jì)集群中各工作節(jié)點(diǎn)以及工作節(jié)點(diǎn)上各線(xiàn)程占用的CPU、內(nèi)存和網(wǎng)絡(luò)帶寬負(fù)載信息及各線(xiàn)程之間的數(shù)據(jù)流大小.第155 s時(shí)在線(xiàn)策略觸發(fā),拓?fù)渲懈魅蝿?wù)在各工作節(jié)點(diǎn)上重新部署,約耗時(shí)40 s,平均延遲約2.145 s;任務(wù)遷移策略觸發(fā)于第155 s,約耗時(shí)20 s,平均延遲約877.8 ms,僅為在線(xiàn)策略的40.9%左右,執(zhí)行過(guò)程中共有17個(gè)線(xiàn)程發(fā)生遷移.由此可見(jiàn),任務(wù)遷移策略有效降低了調(diào)度的執(zhí)行過(guò)程對(duì)系統(tǒng)實(shí)時(shí)性造成的負(fù)面影響.2種策略執(zhí)行完畢后,系統(tǒng)延遲再次趨于收斂并分別穩(wěn)定在約331.1 ms和339.9 ms,相對(duì)于默認(rèn)策略分別降低約23.7%和21.7%,兩者差距很小,實(shí)驗(yàn)結(jié)果較為理想.可見(jiàn),在數(shù)據(jù)流大小變化迅速且任務(wù)過(guò)載時(shí)有發(fā)生的Storm商業(yè)應(yīng)用領(lǐng)域中,使用任務(wù)遷移策略平滑調(diào)整將更有利于保證Storm處理的實(shí)時(shí)性.
Storm作為大數(shù)據(jù)流式計(jì)算的主流框架,已逐漸引起學(xué)術(shù)界和工業(yè)界的廣泛關(guān)注.然而其默認(rèn)的輪詢(xún)調(diào)度機(jī)制并未考慮到不同工作節(jié)點(diǎn)的自身性能和負(fù)載差異,以及工作節(jié)點(diǎn)之間的網(wǎng)絡(luò)傳輸開(kāi)銷(xiāo)和節(jié)點(diǎn)內(nèi)部的進(jìn)程與線(xiàn)程通信開(kāi)銷(xiāo),無(wú)法最大化發(fā)揮Storm集群的性能.近年來(lái)已有研究改進(jìn)了Storm默認(rèn)調(diào)度機(jī)制存在的不足,但仍存在應(yīng)用場(chǎng)景單一和算法開(kāi)銷(xiāo)過(guò)大等問(wèn)題.本文通過(guò)分析Storm基本模型和3種不同的通信方式,建立了Storm異構(gòu)環(huán)境下的資源約束模型、最優(yōu)通信開(kāi)銷(xiāo)模型和任務(wù)遷移模型,并在此基礎(chǔ)上提出了包含源節(jié)點(diǎn)選擇算法和任務(wù)遷移算法的任務(wù)遷移策略,使系統(tǒng)能夠根據(jù)各工作節(jié)點(diǎn)和各任務(wù)的實(shí)時(shí)負(fù)載情況和任務(wù)間的數(shù)據(jù)流大小,決策并實(shí)施任務(wù)的優(yōu)化遷移.最后通過(guò)4個(gè)基準(zhǔn)測(cè)試從延遲、資源占用、通信開(kāi)銷(xiāo)角度證明了算法的有效性.
下一步研究工作主要集中在3個(gè)方面:
1) 將本文提出的任務(wù)遷移策略進(jìn)一步推廣至更為復(fù)雜的Storm商業(yè)應(yīng)用領(lǐng)域,使其適用于多租戶(hù)且種類(lèi)更多的業(yè)務(wù)場(chǎng)景.
2) 目前拓?fù)鋱?zhí)行需要的進(jìn)程和線(xiàn)程數(shù)量完全由用戶(hù)(程序員)設(shè)置,研究拓?fù)渲懈鹘M件的自適應(yīng)并行度調(diào)節(jié)機(jī)制將能在提高節(jié)點(diǎn)資源利用率的同時(shí),有效提高拓?fù)涞膱?zhí)行效率.
3) 從拓?fù)渥陨淼慕Y(jié)構(gòu)特征出發(fā)優(yōu)化算法,在保證異構(gòu)Storm集群高效運(yùn)行的同時(shí)達(dá)到負(fù)載均衡的效果.
[1] Meng Xiaofeng, Ci Xiang. Big data management: Concepts, techniques and challenges[J]. Journal of Computer Research and Development, 2013, 50(1): 146-169 (in Chinese)
(孟小峰, 慈祥. 大數(shù)據(jù)管理: 概念、技術(shù)與挑戰(zhàn)[J]. 計(jì)算機(jī)研究與發(fā)展, 2013, 50(1): 146-169)
[2] Chen C L P, Zhang Chunyang. Data-intensive applications, challenges, techniques and technologies: A survey on big data[J]. Information Sciences, 2014, 275(11): 314-347
[3] Kambatla K, Kollias G, Kumar V, et al. Trends in big data analytics[J]. Journal of Parallel and Distributed Computing, 2014, 74(7): 2561-2573
[4] Sun Dawei. Big data stream comuting: Features and challenges[J]. Big Data Research, 2015,1(3): 99-105 (in Chinese)
(孫大為. 大數(shù)據(jù)流式計(jì)算: 應(yīng)用特征和技術(shù)挑戰(zhàn)[J]. 大數(shù)據(jù), 2015,1(3): 99-105)
[5] Ranjan R. Streaming big data processing in datacenter clouds[J]. IEEE Cloud Computing, 2014, 1(1): 78-83
[6] Apache. Apache Hadoop[EB/OL]. [2016-08-05]. http://hadoop.apache.org
[7] Vamanan B, Sohail H B, Hasan J, et al. Timetrader: Exploiting latency tail to save datacenter energy for online search[C] //Proc of the 48th Int Symp on Microarchitecture. New York: ACM, 2015: 585-597
[8] Sun Dawei, Zhang Guangyan, Zheng Weimin. Big data stream computing: Technologies and instances[J]. Journal of Software, 2014, 25(4): 839-862 (in Chinese)
(孫大為, 張廣艷, 鄭緯民. 大數(shù)據(jù)流式計(jì)算: 關(guān)鍵技術(shù)及系統(tǒng)實(shí)例[J]. 軟件學(xué)報(bào), 2014, 25(4): 839-862)
[9] Toshniwal A, Taneja S, Shukla A, et al. Storm@Twitter[C] //Proc of the 2014 ACM SIGMOD Int Conf on Management of Data. New York: ACM, 2014: 147-156
[10] Alexandrov A, Bergmann R, Ewen S, et al. The stratosphere platform for big data analytics[J]. The VLDB Journal, 2014, 23(6): 939-964
[11] Zaharia M, Das T, Li Haoyuan, et al. Discretized streams: An efficient and fault-tolerant model for stream processing on large clusters[C] //Proc of the 4th USENIX Conf on Hot Topics in Cloud Computing. Berkeley, CA: USENIX Association, 2012: 1-6
[12] Borthakur D, Gray J, Sarma J S, et al. Apache Hadoop goes realtime at Facebook[C] //Proc of the 2011 ACM SIGMOD Int Conf on Management of Data. New York: ACM, 2011: 1071-1080
[13] Neumeyer L, Robbins B, Nair A, et al. S4: Distributed stream computing platform[C] //Proc of the 10th IEEE Int Conf on Data Mining Workshops (ICDMW 2010). Piscataway, NJ: IEEE, 2010: 170-177
[14] Fischer M J, Su Xueyuan, Yin Yitong. Assigning tasks for efficiency in Hadoop[C] //Proc of the 22nd Annual ACM Symp on Parallelism in Algorithms and Architectures. New York: ACM, 2010: 30-39
[15] Bhatotia P, Wieder A, Rodrigues R, et al. Incoop: MapReduce for incremental computations[C] //Proc of the 2nd ACM Symp on Cloud Computing. New York: ACM, 2011: 1-14
[16] Borkar V, Carey M, Grover R, et al. Hyracks: A flexible and extensible foundation for data-intensive computing[C] //Proc of the 27th Int Conf on Data Engineering. Piscataway, NJ: IEEE, 2011: 1151-1162
[17] Chen Fangfei, Kodialam M, Lakshman T V. Joint scheduling of processing and shuffle phases in MapReduce systems[C] //Proc of the 2012 IEEE INFOCOM. Piscataway, NJ: IEEE, 2012: 1143-1151
[18] Chen Gang, Chen Ke, Jiang Dawei, et al. E3: An elastic execution engine for scalable data processing[J]. Journal of Information Processing, 2012, 20(1): 65-76
[19] Jin Hui, Yang Xi, Sun Xianhe, et al. ADAPT: Availability-aware MapReduce data placement for non-dedicated distributed computing[C] //Proc of the 32nd Int Conf on Distributed Computing Systems (ICDCS). Piscataway, NJ: IEEE, 2012: 516-525
[20] Kumar V, Andrade H, Gedik B, et al. DEDUCE: At the intersection of MapReduce and stream processing[C] //Proc of the 13th Int Conf on Extending Database Technology. New York: ACM, 2010: 657-662
[21] Condie T, Conway N, Alvaro P, et al. Online aggregation and continuous query support in MapReduce[C] //Proc of the 2010 ACM SIGMOD Int Conf on Management of Data. New York: ACM, 2010: 1115-1118
[22] Karve R, Dahiphale D, Chhajer A. Optimizing cloud MapReduce for processing stream data using pipelining[C] //Proc of the 5th UKSim European Symp on Computer Modeling and Simulation (EMS). Piscataway, NJ: IEEE, 2011: 344-349
[23] Backman N, Pattabiraman K, Fonseca R, et al. C-MR: Continuously executing MapReduce workflows on multi-core processors[C] //Proc of the 3rd Int Workshop on MapReduce and Its Applications Date. New York: ACM, 2012: 1-8
[24] Lam W, Liu Lu, Prasad S T S, et al. Muppet: MapReduce-style processing of fast data[J]. Proceedings of the VLDB Endowment, 2012, 5(12): 1814-1825
[25] Aly A M, Sallam A, Gnanasekaran B M, et al. M3: Stream processing on main-memory MapReduce[C] //Proc of the 28th Int Conf on Data Engineering. Piscataway, NJ: IEEE, 2012: 1253-1256
[26] Li K C, Jiang Hai, Yang L T, et al. Big Data: Algorithms, Analytics, and Applications[M]. Boca Raton, FL: CRC Press, 2015: 193-214
[27] Daoud M I, Kharma N. A hybrid heuristic-genetic algorithm for task scheduling in heterogeneous processor networks[J]. Journal of Parallel & Distributed Computing, 2011, 71(11): 1518-1531
[28] Sinnen O, To A, Kaur M. Contention-aware scheduling with task duplication[J]. Journal of Parallel and Distributed Computing, 2011, 71(1): 77-86
[29] Wang Changdong, Lai Jianhuang, Huang Dong, et al. SVStream: A support vector-based algorithm for clustering data streams[J]. IEEE Trans on Knowledge & Data Engineering, 2013, 25(6): 1410-1424
[30] Xu Yuming, Li Kenli, He Ligang, et al. A DAG scheduling scheme on heterogeneous computing systems using double molecular structure-based chemical reaction optimization[J]. Journal of Parallel & Distributed Computing, 2013, 73(9): 1306-1322
[31] Saikrishna P S, Pasumarthy R. Automated control of webserver performance in a cloud environment[C] //Proc of the 2013 IEEE Recent Advances in Intelligent Computational Systems (RAICS). Piscataway, NJ: IEEE, 2013: 239-244
[32] Al-Haidari F, Sqalli M, Salah K. Impact of CPU utilization thresholds and scaling size on autoscaling cloud resources[C] //Proc of the 5th IEEE Int Conf on Cloud Computing Technology and Science (CloudCom). Piscataway, NJ: IEEE, 2013: 256-261
[33] Van d V J S, Van D W B, Lazovik E, et al. Dynamically scaling Apache Storm for the analysis of streaming data[C] //Proc of the 1st IEEE Int Conf on Big Data Computing Service and Applications. Piscataway, NJ: IEEE, 2015: 154-161
[34] Lorido-Botran T, Miguel-Alonso J, Lozano J A. A review of auto-scaling techniques for elastic applications in cloud environments[J]. Journal of Grid Computing, 2014, 12(4): 559-592
[35] Trihinas D, Pallis G, Dikaiakos M D. JCatascopia: Monitoring elastically adaptive applications in the cloud[C] //Proc of the 14th IEEE/ACM Int Symp on Cluster, Cloud and Grid Computing (CCGrid). Piscataway, NJ: IEEE, 2014: 226-235
[36] Nikravesh A Y, Ajila S A, Lung C H. Cloud resource auto-scaling system based on hidden Markov model (HMM)[C] //Proc of the 2014 IEEE Int Conf on Semantic Computing (ICSC). Piscataway, NJ: IEEE, 2014: 124-127
[37] Wolf J, Bansal N, Hildrum K, et al. SODA: An optimizing scheduler for large-scale stream-based distributed computer systems[C] //Proc of the 9th ACM/IFIP/USENIX Int Conf on Middleware. Berlin: Springer, 2008: 306-325
[38] Amini L, Andrade H, Bhagwan R, et al. SPC: A distributed, scalable platform for data mining[C] //Proc of the 4th Int Workshop on Data Mining Standards, Services and Platforms. New York: ACM, 2006: 27-37
[39] Sun Dawei, Fu Ge, Liu Xinran, et al. Optimizing data stream graph for big data stream computing in cloud datacenter environments[J]. International Journal of Advancements in Computing Technology, 2014, 6(5): 53-65
[40] Cordeschi N, Shojafar M, Amendola D, et al. Energy-efficient adaptive networked datacenters for the QoS support of real-time applications[J]. The Journal of Supercomputing, 2014, 71(2): 448-478
[41] Sun Dawei, Zhang Guangyan, Yang Songlin, et al. Re-Stream: Real-time and energy-efficient resource scheduling in big data stream computing environments[J]. Information Sciences, 2015, 319: 92-112
[42] Cardellini V, Grassi V, Lo Presti F, et al. Distributed QoS-aware scheduling in Storm[C] //Proc of the 9th ACM Int Conf on Distributed Event-Based Systems. New York: ACM, 2015: 344-347
[43] Aniello L, Baldoni R, Querzoni L. Adaptive online scheduling in Storm[C] //Proc of the 7th ACM Int Conf on Distributed Event-Based Systems. New York: ACM, 2013: 207-218
[44] Xu Jielong, Chen Zhenhua, Tang Jian, et al. T-Storm: Traffic-aware online scheduling in Storm[C] //Proc of the 34th IEEE Int Conf on Distributed Computing Systems. Piscataway, NJ: IEEE, 2014: 535-544
[45] Peng Boyang, Hosseini M, Hong Zhihao, et al. R-Storm: Resource-aware scheduling in Storm[C] //Proc of the 16th Annual Middleware Conf. New York: ACM, 2015: 149-161
[46] Zhang Manu. Intel-hadoop/storm-benchmark forked from manuzhang/storm-benchmark[EB/OL]. (2015-11-02) [2016-08-05]. https://github.com/intel-hadoop/storm-benchmark
[47] Marz N. Public stormprocessor/storm-benchmark[EB/OL]. (2012-08-20) [2016-08-05]. https://github.com/stormprocessor/storm-benchmark
[48] Martello S, Toth P. Dynamic programming and strong bounds for the 0-1 knapsack problem[J]. Management Science, 1999, 45(3): 414-424
[49] Sarkar U K, Chakrabarti P P, Ghose S, et al. Reducing reexpansions in iterative-deepening search by controlling cutoff bounds[J]. Artificial Intelligence, 1991, 50(2): 207-221
[50] Chekuri C, Khanna S. A polynomial time approximation scheme for the multiple knapsack problem[J]. SIAM Journal on Computing, 2005, 35(3): 713-728
[51] Fayard D, Zissimopoulos V. An approximation algorithm for solving unconstrained two-dimensional knapsack problems[J]. European Journal of Operational Research, 1995, 84(3): 618-632