王曉桐,房俊華,張蓉
(華東師范大學數(shù)據(jù)科學與工程研究院上海高可信計算重點實驗室,上海200062)
分布“可擴展數(shù)據(jù)流連接算法
王曉桐,房俊華,張蓉
(華東師范大學數(shù)據(jù)科學與工程研究院上海高可信計算重點實驗室,上海200062)
Join-Matrix是一種高性能的連接矩陣模型,方B部署于分布式環(huán)境下,支持任意連接謂詞的數(shù)據(jù)流連接操作.由于采取隨機分發(fā)元組作為路由策略,Join-Matrix可利用對元組內容的不敏感性來有效抵御數(shù)據(jù)傾斜.為了實現(xiàn)工作節(jié)點的負載均衡以及網(wǎng)絡傳輸代價的最小化,基于連接矩陣模型設計一種高效的數(shù)據(jù)劃分方案尤為重要.針對數(shù)據(jù)流連接處理,本文設計并實現(xiàn)了一種新穎的連接算子,可靈活地進行劃分方案的自適應調整,以應對實時動態(tài)變化的數(shù)據(jù)分布.具體來說,我們根據(jù)數(shù)據(jù)流流量的采樣信息和系統(tǒng)額定負載,通過一個輕量級的決策器制定出一個數(shù)據(jù)劃分方案和相應的數(shù)據(jù)遷移計劃,在保證輸出結果完整性與正確性的情況下,實現(xiàn)遷移代價的最小化.本文在多種不同的數(shù)據(jù)集上進行了大量對比實驗,結果證明,在資源利用率、系統(tǒng)吞吐率與時間延遲等方面,該連接算子較對比系統(tǒng)具有更高的性能體現(xiàn).
數(shù)據(jù)流連接;Join-Matrix;數(shù)據(jù)劃分;分布式計算
隨著在線實時分析連續(xù)數(shù)據(jù)流的需求日益增多,處理時刻變化數(shù)據(jù)流的新型應用越來越普遍,包括傳感器網(wǎng)絡、金融數(shù)據(jù)在線分析和網(wǎng)絡入侵檢測等.諸如此類的應用具有以下特征:①在海量數(shù)據(jù)上執(zhí)行包含復雜謂詞的連接操作;②在保持高效率和快速響應時間的同時進行實時數(shù)據(jù)分析;③需要維持大量依賴于歷史數(shù)據(jù)的狀態(tài)信息.因此,為了提高數(shù)據(jù)流系統(tǒng)的處理性能,設計高效的數(shù)據(jù)流連接算法尤為重要.
研究者對數(shù)據(jù)流連接算法已投入了大量相關的研究工作,提出了若干集中式連接算法[1-3].這些連接算法均在一個中心節(jié)點對數(shù)據(jù)流進行連接處理,不能高效處理數(shù)據(jù)量巨大的流式數(shù)據(jù).由于目前主流的數(shù)據(jù)流處理系統(tǒng)均是分布式的,將連接操作分布式處理更符合數(shù)據(jù)流系統(tǒng)的特點.因此分布式數(shù)據(jù)流連接算法應運而生.大多數(shù)分布式連接算法主要針對等值連接,處理高選擇性的θ連接性能欠佳.除此之外,分布式算法大多采用哈希函數(shù)進行數(shù)據(jù)劃分,對數(shù)據(jù)內容的不敏感性導致不能靈活地進行系統(tǒng)結構的擴展.
本文的主要研究目標是基于連接矩陣模型,設計并實現(xiàn)一種新穎的分布式數(shù)據(jù)流連接算法,旨在提高連接矩陣的可擴展性與靈活性,從以下兩方面實現(xiàn):①探索適當?shù)臄?shù)據(jù)劃分方案,以充分利用系統(tǒng)資源;②設計高效的狀態(tài)重分配和數(shù)據(jù)路由策略,以降低自適應調整代價和網(wǎng)絡傳輸開銷.文獻[4]指出目前已有的自適應技術只依賴于啟發(fā)式模型,缺乏理論證明.本文繼承了傳統(tǒng)數(shù)據(jù)劃分方案的特性,并提出了相應的改進措施.
本文的主要貢獻是:①基于連接矩陣設計一種高效的數(shù)據(jù)劃分方案,打破節(jié)點個數(shù)的規(guī)整性限制,可根據(jù)數(shù)據(jù)動態(tài)分布靈活地增刪物理計算節(jié)點,提高系統(tǒng)架構的可擴展性與靈活性;②頻繁進行自適應調整的策略會導致巨大的網(wǎng)絡傳輸成本,而保守策略不能根據(jù)數(shù)據(jù)動態(tài)變化進行自適應調整,從而降低系統(tǒng)的處理性能.為了在二者之間達到平衡,本文提出一個在線算法,高效地決定何時探索和觸發(fā)新的劃分方案;③提出一種統(tǒng)一的、位置感知的遷移機制,實現(xiàn)遷移代價的最小化;④傳統(tǒng)的自適應技術[5-6]以阻塞方式進行狀態(tài)數(shù)據(jù)重分配,本文以非阻塞的方式在進行數(shù)據(jù)遷移的同時處理新流入的元組;⑤通過在多種不同數(shù)據(jù)集上的大量對比實驗證明,本文提出的連接算子具有良好的性能.
近年來,研究人員利用Join-Matrix矩陣模型進行分布式連接查詢處理,在類似MapReduce的系統(tǒng)與數(shù)據(jù)流系統(tǒng)均有涉足.Join-Matrix模型將兩個數(shù)據(jù)集間的連接操作建模成一個矩陣,矩陣的每一條邊分別代表一個數(shù)據(jù)集,每個矩陣單元代表一個潛在的連接輸出結果. Stomos等人在文獻[7]中首次引進連接矩陣的概念,在FR算法[8]的基礎上提出了“對稱片段與復制”算法(symmetric fragment and replicate),以解決FR算法帶來的計算代價與通信代價龐大的問題.在MapReduce的編程框架下,文獻[9]基于連接矩陣提出了兩種數(shù)據(jù)劃分方案,分別是1-Bucket與M-Bucket.1-Bucket采取隨機分發(fā)元組的路由策略,即內容不敏感,在輸出結果方面可以很好地實現(xiàn)負載均衡,但由于過多地輸入元組復制存儲,在處理低選擇性的連接操作時性能欠佳.另一方面,M-Bucket根據(jù)輸入元組的內容進行數(shù)據(jù)劃分,即內容敏感,盡管解決了元組冗余存儲的問題,但可能導致某些計算節(jié)點出現(xiàn)過載的現(xiàn)象.
與本文設計思路最相似的研究是Elseidy等人提出的Dynamic連接算子[10].Dynamic連接算子采用“網(wǎng)格劃分方案”將連接矩陣劃分成2n(n∈N*)個面積相等的區(qū)域,并采用隨機路由策略分發(fā)輸入元組.由于要維持矩陣的結構特性,當架構需要進行擴展或縮減時,必須同時增加或刪除一行或一列的所有處理單元,由此引發(fā)遷移代價的劇增和資源利用率的降低,算子結構的靈活性與可擴展性也深受影響.為了解決這個問題,本文設計了一種更為靈活的數(shù)據(jù)劃分方案,達到更好的效果.
2.1 預備知識
Join-Matrix以矩陣的形式處理R??S,矩陣的每一條邊代表一條數(shù)據(jù)流,矩陣單元代表潛在的連接輸出結果.如圖1(a)所示,在連接矩陣中進行不等值連接操作,圖中的數(shù)字代表連接屬性,7色的單元格代表符合連接謂詞的輸出結果.基于連接矩陣M的數(shù)據(jù)劃分方案將矩陣切分成n×m個面積相等的處理單元Cij,每個處理單元分配一臺物理計算節(jié)點,并存儲數(shù)據(jù)流的子集〈Ri,Sj〉,其中i∈[0,n一1],j∈[0,m一1],并用[b,e]代表子數(shù)據(jù)集對于數(shù)據(jù)流的位置范圍.如圖1(b)所示,將圖1(a)中的矩陣切分成2×4個處理單元,每個處理單元分別存儲1/2的R流數(shù)據(jù)和1/4的S流數(shù)據(jù).
圖1 連接矩陣及劃分方案示例Fig.1Example of join-matrix and partitioning scheme
處理θ連接操作的代價主要與系統(tǒng)的內存開銷、CPU計算成本以及網(wǎng)絡通信代價有關,其中CPU的計算成本與連接矩陣的計算區(qū)域面積|R|·|S|成正比(|R|和|S|分別代表兩條數(shù)據(jù)流的流量,即元組的數(shù)量),與選取的數(shù)據(jù)劃分方案無關,即獨立于矩陣的行數(shù)n和列數(shù)m.根據(jù)文獻[10],內存開銷與單個處理單元的半周長|Ri|+|Sj|成比例,而|Ri|+|Sj|取決于矩陣的行數(shù)與列數(shù).對于網(wǎng)絡通信代價同樣成立.因此,本文旨在尋求合適的數(shù)據(jù)劃分方案n×m,使得系統(tǒng)的資源使用量最低.假設單個處理單元的額定內存大小為V,則本文的目標可以形式化定義成以下優(yōu)化問題:
2.2 劃分方案
由于連接矩陣的行數(shù)與列數(shù)決定了內存開銷,關于矩陣的面積與周長,我們已知兩個常識:①給定面積的所有矩陣中,正方形的周長最小;②給定周長的所有矩陣中,正方形的面積最大.基于上述常識,我們得出以下定理.
證明首先假設單個處理單元的CPU計算資源為定值,為了確保兩條流的任意元組均可相遇,則R??S的計算復雜度為O(|R|·|S|).當時,系統(tǒng)的內存使用量最小;其次假設單個處理單元的內存空間是定值,當時,連接矩陣使用的處理單元總數(shù)最小.由于網(wǎng)絡傳輸代價與內存開銷相關,因此定理1成立.
根據(jù)定理1,如果數(shù)據(jù)流流量|R|和|S|均可被Vh整除,則由此生成的數(shù)據(jù)劃分方案是最優(yōu)的.但是大多數(shù)情況下,數(shù)據(jù)流流量不能被Vh整除,考慮到矩陣的行數(shù)與列數(shù)必須為整數(shù),我們令,則連接矩陣使用的處理單元總數(shù)N為:
由于數(shù)據(jù)流流量不能被Vh整除,在裝載輸入元組的過程中,矩陣最后一行或者一列的處理單元中會產(chǎn)生數(shù)據(jù)碎片,我們稱這些處理單元為“碎片單元”.我們假設V=8 GB,|R|= 9 GB,|S|=9 GB,則R??S對應的計算區(qū)域如圖1(a)所示.根據(jù)公式(2),將矩陣劃分成9個處理單元,各個處理單元存儲數(shù)據(jù)的情況分別是C00=〈4 GB,4 GB〉,C01=〈4 GB,4 GB〉, C02=〈4 GB,1 GB〉,C10=〈4 GB,4 GB〉,C11=〈4 GB,4 GB〉,C12=〈4 GB,1 GB〉,C20=〈1 GB,4 GB〉,C21=〈1 GB,4 GB〉,C22=〈1 GB,1 GB〉.顯然,C02、C12、C20、C21和C22均為碎片單元,因為存儲的R流和S流數(shù)據(jù)總量低于額定內存空間.
圖2 數(shù)據(jù)劃分方案Fig.2Partitioning scheme
為了充分利用系統(tǒng)資源,實現(xiàn)處理單元之間的負載均衡十分重要.我們將兩條數(shù)據(jù)流定義為主流P和副流D以作區(qū)分.主流P可以為數(shù)據(jù)流R或者S.首先保證主流P的元組數(shù)據(jù)分配到足夠的內存空間,將P切分成Pγ個子集分發(fā)到處理單元中;其次將單個處理單元中剩余的內存分配給副流D的元組數(shù)據(jù),則劃分副流D得到的子集個數(shù).因此,處理單元總數(shù)N為:
算法1闡述了基于連接矩陣制定數(shù)據(jù)劃分方案的具體過程.首先,將Pγ中的四個元素依次代入等式(3)計算出對應的處理單元個數(shù)Ni(i 6 4),選擇值最小的Ni作為處理單元總數(shù),并將對應的賦值給于Pγ和Dγ(第1~5行).其次根據(jù)主流P的流量和子集個數(shù)Pγ,計算出連接矩陣的行數(shù)n和列數(shù)m:如果,則數(shù)據(jù)流R為主流P,n=Pγ,m=Dγ;否則數(shù)據(jù)流S為主流P,m=Pγ,n=Dγ(第6~10行).
2.3 遷移計劃
在進行劃分方案的切換之前,需要先確定新舊矩陣中處理單元的對應關系.假設Cij和Ckl分別是舊矩陣M0和新矩陣Mn中的處理單元,我們利用一個相關系數(shù)來衡量兩個處理單元Cij和Ckl之間數(shù)據(jù)集重疊度,給出如下定義:
給定矩陣M0和Mn,定義處理單元之間的關系映射條目.更新處理單元映射關系表可分為兩步驟:①枚舉出所有可能的npi;②選取值最大的npi作為最終條目插入到關系映射表NP.
遷移計劃決定了矩陣變換期間數(shù)據(jù)是如何在處理單元之間重新分配的.為了方B描述,下面我們將只討論R流的數(shù)據(jù)遷移,對于S流采取類似的操作.我們將需要遷入處理單元Ckl的 R流數(shù)據(jù)集定義如下:
圖3 數(shù)據(jù)遷移示例Fig.3Example of data migration
3.1 實驗環(huán)境
實驗設備:22個處理節(jié)點的刀片機服務器集群,單個節(jié)點有2個四核四線程處理器,型號為Intel Xeon E5335,主頻2.00 GHZ,并配有共計16 GB的RAM以及2 TB的硬盤.所有節(jié)點運行CentOS 6.5 Linux操作系統(tǒng),Apache Storm 0.10.0[12]以及Java 1.7.0.
數(shù)據(jù)集:使用TPC-benchmark[13]的數(shù)據(jù)生成器dbgen生成不同規(guī)模的數(shù)據(jù)集.我們對這些數(shù)據(jù)集進行預處理,即將其調整為在連接屬性上具有Zipf分布的形式,通過參數(shù)z調整數(shù)據(jù)傾斜程度,默認情況下,我們將數(shù)據(jù)集的傾斜度設置為1.
查詢語句:我們使用[10]中的等值查詢語句EQ5和范圍查詢語句BNCI.其中EQ5是[10]重定義的TPC-H中Q5查詢中代價較高的幾個連接謂詞組成的查詢語句;BNCI是按照某一屬性范圍查找其在另一個數(shù)據(jù)集中的匹配記錄.
3.2 評估指標
我們將通過以下四個指標對系統(tǒng)的資源利用率和處理性能進行評估:①處理單元數(shù):系統(tǒng)運行過程中,連接算子使用到的處理單元的總數(shù),單個處理單元分配額定大小的內存空間;②吞吐率:單位時間內系統(tǒng)成功接收并處理的元組數(shù)量;③遷移量:新舊連接矩陣進行轉換期間,需要拷貝和移動的元組總量;④計劃耗時:根據(jù)當前系統(tǒng)的工作負載制定數(shù)據(jù)劃分方案、更新單元映射表以及生成遷移計劃的總耗時.
3.3 對比系統(tǒng)
我們使用了三種不同的連接算子來進行對比實驗:①MFM.本文提出的自適應連接算子,根據(jù)等式3計算出最優(yōu)的連接矩陣及數(shù)據(jù)劃分方案;②Dynamic.文獻[10]設計的連接算子,限制連接矩陣個數(shù)必須為2的冪次方個,以單個處理單元×4的形式進行矩陣的擴展;③Readj.文獻[11]設計的連接算子,以key為粒度,通過一個哈希函數(shù)重新調整各處理單元的工作負載以實現(xiàn)負載均衡.
3.4 結果與分析
實驗在全歷史模式下進行,并通過調整輸入數(shù)據(jù)的傾斜度驗證連接算法的靈活性和自適應性.設置V=8·105,并連續(xù)地將6·106條元組數(shù)據(jù)裝載入系統(tǒng)中.圖4展示了執(zhí)行BNCI時處理單元數(shù)與遷移代價的變化趨勢.隨著數(shù)據(jù)的不斷流入,Dynamic算子占用的處理單元數(shù)大幅度遞增,導致消耗的內存空間也急劇增加.相反,MFM根據(jù)當前系統(tǒng)的負載情況按需分配資源,占用的處理單元數(shù)遠遠少于Dynamic.相應地,為了維持連接矩陣的結構特性,Dynamic需要進行大規(guī)模的數(shù)據(jù)備份;而MFM算子使用較少的處理單元數(shù),充分利用系統(tǒng)資源.因此, Dynamic算子在數(shù)據(jù)遷移期間產(chǎn)生的遷移代價遠遠超過MFM.
圖4 BNCI無窗口模式Fig.4Full-history join with BNCI
圖5 EQ5全歷史模式Fig.5Full-history join with EQ5
為了保證系統(tǒng)的負載均衡,對于單個處理單元和單位時間間隔t,定義均衡度標識,其中為所有處理單元的平均負載.在本組實驗中,執(zhí)行查詢語句EQ5,并設置θt6 0.05.如圖5(a)所示,Readj的計劃耗時高于其余兩種連接算子三個數(shù)量級.究其原因可知, Readj通過一個哈希函數(shù)調整所有處理單元中的工作負載,因此在進行擴容操作時,Readj需要重新計算全局的均衡狀態(tài),而其余兩種連接算子均采用內容不敏感性的隨機路由策略,無需進行平衡調度.圖5(b)給出了三種連接算子在不同數(shù)據(jù)傾斜程度下的吞吐率.一方面,隨著傾斜參數(shù)的遞增,由于計劃耗時長,Readj的吞吐率呈現(xiàn)遞減趨勢.另一方面,盡管Dynamic連接算子占用的處理單元遠多于MFM,但是由于其龐大的數(shù)據(jù)遷移量,MFM的吞吐率略高.
為在數(shù)據(jù)流系統(tǒng)上高效地執(zhí)行分布式θ連接操作,本文基于連接矩陣模型提出可靈活地進行自適應調整的連接算法,利用對其內容不敏感性抵御數(shù)據(jù)傾斜,根據(jù)當前系統(tǒng)負載按需分配資源,采用非阻塞的方式處理數(shù)據(jù)遷移并保證連接結果的完整性與正確性.實驗證明,對比目前已有的連接算法,本文提出的連接算法性能更為優(yōu)越且穩(wěn)定.未來的工作將會考慮對連接矩陣模型進一步優(yōu)化,打破矩陣單元個數(shù)規(guī)整性的限制以實現(xiàn)更為優(yōu)良的性能.
[1]DITTRICH J-P,SEEGER B,TAYLOR D S,et al.Progressive merge join:A generic and non-blocking sort-based join algorithm[C]//Proceedings of the 28th VLDB Conference.2002:299-310.
[2]URHAN T,FRANKLIN M J.XJoin:A reactively-scheduled pipelined join operator[J].IEEE Data Eng Bull, 2000,23(2):27-33.
[3]WANG S,RUNDENSTEINER E.Scalable stream join processing with expensive predicates:Workload distribution and adaptation by time-slicing[C]//Proceedings of the 12th Conference on EDBT.2009:299-310.
[4]GOUNARIS A,TSAMOURA E,MANOLOPOULOS Y.Adaptive query processing in distributed settings[J]. Intelligent Systems Reference Library,2013,36:211-236.
[5]LIU B,JBANTOVA M,RUNDENSTEINER E A.Optimizing state-intensive non-blocking queries using run-time adaptation[C]//Proceedings of the 2007 IEEE 23rd ICDEW.IEEE,2007:614-623.
[6]PATON N W,BUENABAD-CHAVEZ J,CHEN M,et al.Autonomic query parallelization using non-dedicated computers:An evaluation of adaptivity options[J].The VLDB Journal,2009,18(1):119-140.
[7]STAMOS J W,YOUNG H C.A symmetric fragment and replicate algorithm for distributed joins[J].IEEE Transactions on Parallel&Distributed Systems,1993,4(12):1345-1354.
[8]EPSTEIN R,STONEBRAKER M,WONG E.Distributed query processing in a relational data base system [C]//Proceedings of ACM SIGMOD Conference on Management of Data.1978:169-180.
[9]OKCAN A,RIEDEWALD M.Processing theta-joins using MapReduce[C]//Proceedings of ACM SIGMOD Conference on Management of Data.2011:949-960.
[10]ELSEIDY M,ELGUINDY A.Scalable and adaptive online joins[J].The VLDB Endowment,2014,7(6):441-452.
[11]GEDIK B.Partitioning functions for stateful data parallelism in stream processing[J].The VLDB Journal,2013, 23(4):517-539.
[12]Apache storm[EB/OL].[2016-06-10].http://storm.apache.org.
[13]The TPC-H benchmark[EB/OL].[2016-06-10].http://www.tpc.org/tpch.
(責任編輯:林磊)
Distributed and scalable stream join algorithm
WANG Xiao-tong,FANG Jun-hua,ZHANG Rong
(Institute for Data Science and Engineering,Shanghai Key Laboratory of Trustworthy Computing,East China Normal University,Shanghai200062,China)
Join-Matrix is a high-performance model for stream join processing in a parallel shared-nothing environment,which supports arbitrary join operations and is resilient to data skew for taking random tuple distribution as its routing policy.To evenly distribute workload and minimize network communication cost,designing an efficient partitioning policy on the matrix is particularly essential.In this paper,we propose a novel stream join operator that continuously adjust its partitioning scheme to real-time data dynamics.Specifically,based on the sample statistics of streams and rated load of each physical machine,a lightweight scheme generator produces a partitioning scheme; then the corresponding solutions for state relocation are generated by a migration plan generator to minimize migration cost while ensuring result correctness.Our experiments on different kinds of data sets demonstrate that our operator outperforms the static-of-the-artstrategies in resource utilization,throughput and system latency.
stream join processing;Join-Matrix;partitioning scheme;distributed computing
TP391
A
10.3969/j.issn.1000-5641.2016.05.010
1000-5641(2016)05-0081-08
2016-05
國家863計劃項目(2015AA015307);國家自然科學基金重點項目(61232002,61332006);國家自然科學基金(61432006)
王曉桐,女,碩士研究生,研究方向為數(shù)據(jù)流處理.E-mail:51164500121@stu.ecnu.edu.cn.
張蓉,女,博士,副教授,研究方向為分布式數(shù)據(jù)管理.E-mail:rzhang@sei.ecnu.edu.cn.