肖穎
摘 要: 重分區(qū)連接查詢是基于傳統(tǒng)MapReduce框架的最常用的連接查詢算法之一。在討論基于傳統(tǒng)MapReduce框架的標(biāo)準(zhǔn)重分區(qū)連接算法及減小數(shù)據(jù)緩存的改進(jìn)算法的基礎(chǔ)上,提出了在數(shù)據(jù)文件分塊階段進(jìn)行預(yù)篩選以精簡MapReduce框架中處理的數(shù)據(jù)量的方法。該方法能有效減少框架內(nèi)部各個(gè)階段處理的數(shù)據(jù)總量,進(jìn)一步壓縮緩存的使用空間并降低不同階段之間數(shù)據(jù)傳輸?shù)木W(wǎng)絡(luò)開銷。
關(guān)鍵詞: MapReduce; 連接查詢; 重分區(qū)連接; 預(yù)篩選
中圖分類號:TP312 文獻(xiàn)標(biāo)志碼:A 文章編號:1006-8228(2016)04-09-03
0 引言
近年來,隨著移動(dòng)互聯(lián)網(wǎng)、電子商務(wù)及社交媒體快速發(fā)展,網(wǎng)絡(luò)的數(shù)據(jù)信息量呈指數(shù)型增長。為了能更快更好地分析處理這些龐大的數(shù)據(jù)信息,很多企業(yè)選擇將數(shù)據(jù)遷移到價(jià)格相對低廉且容錯(cuò)性能較強(qiáng)的云環(huán)境[1]中進(jìn)行處理。MapReduce框架[2]是云計(jì)算最為核心的技術(shù)之一。作為海量數(shù)據(jù)的并行處理平臺,MapReduce編程模型[3]簡單,隱藏了并發(fā)、容錯(cuò)、分布式計(jì)算和負(fù)載平衡等復(fù)雜繁瑣的細(xì)節(jié),并具有較高的可擴(kuò)展性和容錯(cuò)性,現(xiàn)已廣泛應(yīng)用于海量數(shù)據(jù)的分析和處理領(lǐng)域。
但在MapReduce框架中,連接查詢運(yùn)算仍然過程復(fù)雜、工序繁瑣,同時(shí)面臨數(shù)據(jù)傾斜、分布式環(huán)境數(shù)據(jù)傳輸?shù)葐栴},效率較低。如果能提高M(jìn)apReduce的連接查詢效率,則可進(jìn)一步提高數(shù)據(jù)分析效率和用戶體驗(yàn)滿意度。
本文就現(xiàn)有的基于傳統(tǒng)MapReduce框架的重分區(qū)連接查詢方法進(jìn)行深入探討和研究,并進(jìn)一步討論可能的優(yōu)化策略。
1 傳統(tǒng)MapReduce框架實(shí)現(xiàn)機(jī)制
傳統(tǒng)MapReduce框架將所有面向海量數(shù)據(jù)的計(jì)算劃分成兩個(gè)階段:Map階段和Reduce階段,每個(gè)階段可由用戶自行定義其處理函數(shù),且都以(K,V)二元組的形式進(jìn)行輸入和輸出。但由于大部分Mapper與Reducer并非執(zhí)行在相同節(jié)點(diǎn)上,因此MapReduce框架需要一個(gè)介于Map函數(shù)和Reduce函數(shù)之間的Shuffle過程來實(shí)現(xiàn)它們之間的數(shù)據(jù)整理和傳輸。以下是傳統(tǒng)MapReduce框架的具體工作步驟[4]。
⑴ 準(zhǔn)備工作
MapReduce框架將大量輸入數(shù)據(jù)分割成M個(gè)大小固定的塊。
⑵ Map階段
Mapper讀取分配給它的塊信息,并從中分離出各條記錄。
Mapper從每條記錄中抽象出二元組(K1,V1),并傳遞給用戶自定義的Map函數(shù)執(zhí)行生成二元組(K1',V1')。由此塊信息經(jīng)由Map階段處理得到一個(gè)輸出序列{(K1',V1'),(K2',V2'),…,(Kn',Vn')},同時(shí)這些數(shù)據(jù)將被存入緩存。
⑶ MapReduce框架的Shuffle過程
(a) 為了使Reduce函數(shù)獲得有序的輸入信息,Shuffle過程負(fù)責(zé)將Map階段的輸出序列進(jìn)行排序分組歸并,使得具有相同鍵值K'的數(shù)據(jù)V'集中在一起,形成(K',list(V')),且list(V')中的值按V'進(jìn)行排序。因?yàn)閿?shù)據(jù)量巨大,所以該階段可能使用外部排序。
(b) 將處理好的(K',list(V'))發(fā)送給Reduce函數(shù)。
⑷ Reduce階段
執(zhí)行Reduce函數(shù),生成最終的執(zhí)行結(jié)果(K'',V''),并作為輸出結(jié)果寫入文件。
2 重分區(qū)連接查詢算法及其優(yōu)化探討
在數(shù)據(jù)爆炸的今天,有些大型的互聯(lián)網(wǎng)公司每天需要利用高達(dá)TB甚至PB級別的日志信息來分析數(shù)據(jù),以獲取有利于其發(fā)展的統(tǒng)計(jì)信息。但其中大部分操作都是對巨型數(shù)據(jù)表(例如用戶表User和日志表Log)進(jìn)行連接查詢操作:
SELECT User.Col1, User.Col2, …, User.Coln,
Log.Col1, Log.Col2, …, Log.Colm
FROM User, Log
WHERE User.UserID=Log.UserID AND CUser AND CLog
其中CUser表示僅和表User相關(guān)的篩選條件,CLog表示僅和表Log相關(guān)的篩選條件,User.Col1,User.Col2,…,User.Coln表示表User中的n個(gè)列(表User的列數(shù)≥n),Log.Col1,Log.Col2,…,Log.Colm表示表Log中的m個(gè)列(表Log的列數(shù)≥m)。假設(shè)若表User 共有mU行,每行的數(shù)據(jù)量為lenU字節(jié);表Log共有mL行,每行的數(shù)據(jù)量為lenL字節(jié),則執(zhí)行該連接查詢將面臨為(mU×lenU)×(mL×lenL)級別的巨大數(shù)據(jù)量。
在此我們討論基于傳統(tǒng)MapReduce框架的最常用的連接查詢算法之一——重分區(qū)連接查詢算法[5]。該算法類似于并發(fā)數(shù)據(jù)庫管理系統(tǒng)中的分塊歸并排序連接,同時(shí)繼承了傳統(tǒng)MapReduce框架的容錯(cuò)性能和負(fù)載均衡性。
2.1 標(biāo)準(zhǔn)重分區(qū)連接算法
標(biāo)準(zhǔn)重分區(qū)連接操作在一個(gè)單獨(dú)的MapReduce工作中完成:Map階段進(jìn)行數(shù)據(jù)的預(yù)處理,Reduce階段進(jìn)行連接查詢操作。其具體執(zhí)行步驟如下。
⑴ MapReduce框架將巨型表User和Log分割成M個(gè)大小固定的塊。
⑵ 在Map階段,每個(gè)Mapper讀取一個(gè)塊,繼而提取出該塊中每個(gè)記錄的連接鍵值join-key;同時(shí)生成含有表標(biāo)記tag的記錄tagrecord,用以識別該記錄來自于哪一張表。Mapper輸出該塊的(join-key, tagrecord)序列并存入緩存。
⑶ MapReduce框架的Shuffle過程對(join-key, tagrecord)序列進(jìn)行分組、排序和歸并。相同join-key的記錄被分到一組,并輸出給Reducer。
⑷ 在Reduce階段,每個(gè)Reducer首先按tagrecord信息(該記錄來自于表User或Log)將輸入的記錄分為兩組,并分別存入各自的緩存BU和BL中,然后將兩組信息進(jìn)行笛卡爾積運(yùn)算,進(jìn)而實(shí)現(xiàn)查詢。
標(biāo)準(zhǔn)重分區(qū)連接中存在的問題是:User或Log表中的所有記錄都必須寫入緩存。然而若|User|<<|Log|時(shí),來自于Log中的記錄可能導(dǎo)致內(nèi)存溢出。
2.2 改進(jìn)的重分區(qū)連接算法
為解決標(biāo)準(zhǔn)重分區(qū)連接中可能存在的緩存溢出問題,標(biāo)準(zhǔn)重分區(qū)算法可做如下改進(jìn)。
⑴ 在map函數(shù)中,將輸出的二元組序列( join-key, tagrecord )改為(join-key-tag, tagrecord),加入表標(biāo)識tag保證來自于User的記錄一定排列在Log的記錄之前。
⑵ 在MapReduce框架的shuffle階段自定義分區(qū)函數(shù),使得后續(xù)所有計(jì)算只根據(jù)join-key-tag中的join-key部分來進(jìn)行。
做出改進(jìn)后,表User中的記錄一定會在Log記錄之前,所以只有User中的記錄需要存入緩存BU,而Log中的記錄則以數(shù)據(jù)流的形式快速讀出并與相關(guān)的User中的記錄進(jìn)行連接并輸出結(jié)果。
改進(jìn)的重分區(qū)連接算法雖然有效地改進(jìn)了標(biāo)準(zhǔn)算法中的緩存問題,降低了內(nèi)存溢出的可能,但在mapreduce的shuffle階段仍需對表User和Log進(jìn)行排序并通過網(wǎng)絡(luò)傳輸數(shù)據(jù)信息,該操作是連接查詢的主要執(zhí)行開銷,會大幅降低其執(zhí)行效率。
2.3 改進(jìn)重分區(qū)連接算法的預(yù)處理
在重分區(qū)連接中,如果表User和Log中的數(shù)據(jù)信息在進(jìn)行連接操作之前已經(jīng)按連接鍵值分區(qū)完成,則shuffle階段的開銷就能實(shí)現(xiàn)有效降低。該預(yù)處理可以通過以下方式實(shí)現(xiàn):表Log在日志記錄生成時(shí)根據(jù)join-key進(jìn)行分區(qū),而User表則在將其加載到分布式文件系統(tǒng)中時(shí)根據(jù)join-key進(jìn)行預(yù)分區(qū)。從而在查詢時(shí),User和Log中相互匹配的分區(qū)就能直接進(jìn)行連接查詢。
對比平行關(guān)系數(shù)據(jù)庫管理系統(tǒng),由于分布式文件系統(tǒng)獨(dú)立決定每一個(gè)數(shù)據(jù)塊的存放位置,所以上述方法不能保證表User和Log的相互匹配的分區(qū)存放在同一個(gè)物理節(jié)點(diǎn)中。因此,查詢時(shí)必須使用直接連接策略。即每個(gè)map任務(wù)在Log的一個(gè)片段Li上進(jìn)行。在初始化階段,Mapper從分布式文件系統(tǒng)中取出表User的一個(gè)片段Ui,若其尚未進(jìn)入本地存儲系統(tǒng)則將為其建立內(nèi)存哈希表HUi;然后map函數(shù)掃描Li中的每個(gè)記錄并嘗試連接哈希表HUi。由于分區(qū)的數(shù)量是可選的,因此該方法確保每一個(gè)Ui都能裝入內(nèi)存。
3 精簡連接查詢數(shù)據(jù)量的預(yù)篩選
上述三種重分區(qū)連接算法都是從如何減少運(yùn)算過程中產(chǎn)生的緩存及傳輸?shù)臄?shù)據(jù)量的角度來提高連接查詢效率,但卻忽略了連接查詢本身的計(jì)算數(shù)據(jù)量的精簡,即無論使用上述哪一種算法進(jìn)行重分區(qū)連接查詢,其對應(yīng)的關(guān)系代數(shù)都沒發(fā)生實(shí)質(zhì)性的優(yōu)化,而始終為:
換言之,進(jìn)入MapReduce框架的數(shù)據(jù)量即最初分塊處理和Mapper都仍然面臨著(mU×lenU)+(mL×lenL)字節(jié)的數(shù)據(jù)量,而Reduce階段的笛卡爾積運(yùn)算仍將產(chǎn)生具有 (mU×lenU)×(mL×lenL)字節(jié)的龐大的中間結(jié)果,并需對其進(jìn)行最終結(jié)果篩選。
但根據(jù)現(xiàn)實(shí)數(shù)據(jù)的處理情況可知,在MapReduce框架上實(shí)現(xiàn)的多個(gè)大型表之間的連接運(yùn)算在大多數(shù)情況下仍是等值連接,并且最終從查詢結(jié)果中獲取的也只是其中某幾個(gè)列的信息。因此,基于MapReduce框架的重分區(qū)連接算法還可以通過對大量數(shù)據(jù)信息進(jìn)行篩預(yù)選處理的方法來降低進(jìn)行連接查詢的數(shù)據(jù)量,從而進(jìn)一步減少緩存的使用空間并有效降低shuffle階段的數(shù)據(jù)傳輸造成的網(wǎng)絡(luò)開銷。
根據(jù)關(guān)系代數(shù)優(yōu)化的典型啟發(fā)式規(guī)則,上述關(guān)系代數(shù)表達(dá)式可優(yōu)化為:
若查詢結(jié)果中不包含重復(fù)列的信息,則該關(guān)系代數(shù)能進(jìn)一步優(yōu)化為自然連接運(yùn)算:
其中,為自然連接運(yùn)算符。
上述優(yōu)化表達(dá)式說明表User和Log在進(jìn)入MapReduce框架進(jìn)行連接查詢之前,可以先對大量數(shù)據(jù)進(jìn)行數(shù)據(jù)的預(yù)篩選,使與結(jié)果無關(guān)的數(shù)據(jù)不參與龐大的連接運(yùn)算。根據(jù)傳統(tǒng)MapReduce框架的工作原理,該篩選操作可以加載在該框架最初的文件分塊階段中。具體操作步驟如下。
⑴ 將表User分塊的同時(shí)將其進(jìn)行一遍掃描:篩選出滿足查詢條件CUser的行的同時(shí),投影出該行中最終查詢結(jié)果所需的分量Col1,Col2,…,Coln和連接列分量UserID,構(gòu)成一個(gè)中間結(jié)果行,將其存入到分塊中。當(dāng)一個(gè)塊放滿后,將中間結(jié)果寫入下一個(gè)塊。
⑵ 同理地,將表Log分塊的同時(shí)將其進(jìn)行一遍掃描:篩選出滿足查詢條件CLog的行的同時(shí),投影出該行數(shù)據(jù)中最終查詢結(jié)果所需的分量Col1,Col2,…,Colm和連接列分量UserID構(gòu)成一個(gè)中間結(jié)果行,將其存入到分塊中。
⑶ 而后再將這些塊分配給Mapper進(jìn)行后續(xù)操作。
若表User中滿足查詢條件CUser的元組共有mU'行,每行所需的分量Col1,Col2,…,Coln和UserID的數(shù)據(jù)量為lenU'字節(jié);表Log中滿足查詢條件CLog的元組共有mL'行,每行所需的分量Col1,Col2,…,Colm和UserID的數(shù)據(jù)量為lenL'字節(jié)。則由此可知,進(jìn)入MapReduce框架的數(shù)據(jù)量減少為(mU'×lenU')+(mL'×lenL')字節(jié),而最終的連接查詢面對的中間結(jié)果也減少為(mU'×lenU')×(mL'×lenL')字節(jié)。
若有mU'< 4 未來工作展望 本文提出的預(yù)篩選的方法在一定程度上能提高整個(gè)MapReduce框架的連接查詢的執(zhí)行效率,但其算法復(fù)雜度并沒有得到質(zhì)的提升。即若表User或Log中所有的行都分別滿足查詢條件CUser和CLog,且要求查詢兩張表連接之后所有列,則預(yù)篩選方法對數(shù)據(jù)信息量的降低將起不到明顯作用。后續(xù)的研究是對該問題進(jìn)行深入探討,以找出降低算法復(fù)雜度的方式,從本質(zhì)上提高整個(gè)查詢運(yùn)算的效率。 參考文獻(xiàn)(References): [1] VMware vCAT團(tuán)隊(duì).VMware vCAT權(quán)威指南:成功構(gòu)建云 環(huán)境的核心技術(shù)和方法[M].機(jī)械工業(yè)出版社,2014. [2] 董西成.Hadoop技術(shù)內(nèi)幕:深入解析MapReduce架構(gòu)設(shè)計(jì) 與實(shí)現(xiàn)原理[M].機(jī)械工業(yè)出版社,2013. [3] Donald Miner, Adam Shook. MapReduce設(shè)計(jì)模式[M].人民 郵電出版社,2014. [4] Dean J, Ghemawat S. MapReduce: Simplified Data Processing on Large Clusters[C]. Proc. of OSDI'04. San Francisco: [S. n.],2004. [5] Blanas S, Rao J, TianY, et a1. A comparison of joinalgorithms for log processing in MapReduce[C]. Proceedings of the 2010 ACM SIGM0D International Conference on Management of Data,2010.