摘 要:針對(duì)電信大數(shù)據(jù)在流動(dòng)人口統(tǒng)計(jì)中的處理需求,采用Intel?Hadoop發(fā)行版,設(shè)計(jì)Hive數(shù)據(jù)倉庫并進(jìn)行優(yōu)化,重點(diǎn)對(duì)性能影響較大的join連接和數(shù)據(jù)傾斜問題進(jìn)行了優(yōu)化。實(shí)驗(yàn)表明,對(duì)于TB級(jí)數(shù)據(jù),簡(jiǎn)單統(tǒng)計(jì)如count、sum等可在10分鐘以內(nèi)完成,聚合統(tǒng)計(jì)如join、group by等可在30分鐘左右完成,能有效支撐大數(shù)據(jù)環(huán)境下的流動(dòng)人口統(tǒng)計(jì)和監(jiān)測(cè)。
關(guān)鍵詞:Hive;優(yōu)化;join;數(shù)據(jù)傾斜
中圖分類號(hào):TP301 文獻(xiàn)標(biāo)識(shí)碼:A
1 引言(Introduction)
電信運(yùn)營商在移動(dòng)通信業(yè)務(wù)運(yùn)營過程中,獲取了大量客觀、真實(shí)的用戶歷史數(shù)據(jù),這些歷史數(shù)據(jù)可以客觀反映用戶的消費(fèi)行為,也可以反映影響用戶消費(fèi)行為的內(nèi)外部因素的變化情況[1]。根據(jù)移動(dòng)通信客戶的來話與去話等話務(wù)信息,結(jié)合客戶身份資料,可以實(shí)現(xiàn)對(duì)特定區(qū)域人口的流入、流出情況及流動(dòng)類型等進(jìn)行分析。
然而,基于移動(dòng)通信數(shù)據(jù)的流動(dòng)人口統(tǒng)計(jì)面臨諸多挑戰(zhàn):①數(shù)據(jù)源多樣化:CDR(語音、SMS、GPRS、3G、4G等)、計(jì)費(fèi)信息、客戶信息、基站參數(shù)等;②數(shù)據(jù)量大:高達(dá)360TB原始數(shù)據(jù)(某省電信公司);③數(shù)據(jù)增長快速:2TB/天。通信數(shù)據(jù)呈現(xiàn)出大數(shù)據(jù)的特征,既有的技術(shù)架構(gòu)和路線,已不能處理如此海量的電信數(shù)據(jù)。
近年來,涌現(xiàn)出了眾多的大數(shù)據(jù)處理架構(gòu),其中Hadoop開源架構(gòu)應(yīng)用最廣泛,在移動(dòng)、電信等部門通過部署Hadoop架構(gòu)開展電信大數(shù)據(jù)服務(wù)取得了一定的成效。本文針對(duì)電信大數(shù)據(jù)在流動(dòng)人口統(tǒng)計(jì)中的處理需求,采用Intel?Hadoop發(fā)行版,設(shè)計(jì)Hive數(shù)據(jù)倉庫并進(jìn)行優(yōu)化,重點(diǎn)對(duì)性能影響較大的join連接和數(shù)據(jù)傾斜問題進(jìn)行了優(yōu)化,實(shí)現(xiàn)海量數(shù)據(jù)的高效查詢和統(tǒng)計(jì),滿足流動(dòng)人口的快速統(tǒng)計(jì)和分析。
2 Hive數(shù)據(jù)倉庫設(shè)計(jì)(Hive data warehouse design)
移動(dòng)通信大數(shù)據(jù)的流動(dòng)人口業(yè)務(wù)需求分析:移動(dòng)通信數(shù)據(jù)的抽取、轉(zhuǎn)換和導(dǎo)入;基于日、月、年的報(bào)表統(tǒng)計(jì)和數(shù)據(jù)規(guī)模;數(shù)據(jù)倉庫30TB數(shù)據(jù)?,F(xiàn)方案采用10臺(tái)服務(wù)器,以實(shí)現(xiàn)數(shù)據(jù)的高速裝載、查詢和統(tǒng)計(jì)分析,如圖1所示。
圖1 Hive數(shù)據(jù)倉庫設(shè)計(jì)
Fig.1 Hive data warehouse design
Hive是一個(gè)建立在Hadoop之上的數(shù)據(jù)倉庫,用于查詢和分析結(jié)構(gòu)化海量數(shù)據(jù)。采用HDFS進(jìn)行數(shù)據(jù)存儲(chǔ)和Map/Reduce進(jìn)行數(shù)據(jù)操作。基本特點(diǎn)包括:
(1)提供類似于SQL的查詢語言。
(2)高擴(kuò)展性(scale-out),動(dòng)態(tài)擴(kuò)容無須停機(jī)。
(3)針對(duì)海量數(shù)據(jù)的高性能查詢和分析系統(tǒng)。
(4)提供靈活的擴(kuò)展性。
(5)復(fù)雜數(shù)據(jù)類型,擴(kuò)展函數(shù)和腳本等。
在運(yùn)行count、sum等聚合函數(shù)進(jìn)行統(tǒng)計(jì)計(jì)算時(shí)發(fā)現(xiàn),將數(shù)據(jù)從普通數(shù)據(jù)庫導(dǎo)入Hive中,分區(qū)的個(gè)數(shù)以及各分區(qū)數(shù)據(jù)量的均衡性會(huì)影響Hive的性能。解決辦法就是給導(dǎo)入的表增加一個(gè)自增的int類型的字段,用這個(gè)字段來進(jìn)行數(shù)據(jù)分割,最后得到的分區(qū)就是均衡的,如圖2所示。
圖2 數(shù)據(jù)分區(qū)
Fig.2 Data partition
3 Hive性能分析和優(yōu)化(Hive performance analysis
and optimization)
Hadoop的分配優(yōu)化主要包含以下三個(gè)層面:①底層Map和Reduce的參數(shù)調(diào)優(yōu);②Hive內(nèi)部邏輯優(yōu)化;③SQL代碼邏輯優(yōu)化。
3.1 Map/Reduce端的優(yōu)化
Map/Reduce端的優(yōu)化主要通過分析各個(gè)可調(diào)參數(shù)在Map/Reduce任務(wù)運(yùn)行過程中起到的作用,通過改變參數(shù)大小優(yōu)化底層分配策略。
表1 Map side調(diào)優(yōu)參數(shù)表
Tab.1 Tuning parameter table of map side
選項(xiàng) 類型 默認(rèn)值 描述
io.sort.mb Int 100 緩存Map中間結(jié)果的buffer大?。╥n MB)
io.sort.record.percent float 0.05 io.sort.mb中間來保存Map output記錄邊界的百分比
io.sort.spill.percent float 0.80 Map開始做spill操作的閥值
io.sort.factor Int 10 做merge操作時(shí)同時(shí)操作的stream數(shù)的上限
min.num.spill.for.combine Int 3 Combiner函數(shù)運(yùn)行的最小spill數(shù)
Mapred.compress.map.output Boolean False Map中間結(jié)果是否采用壓縮
Mapred.map.output.compression.codec Class name Org.apache.Haddoop.io.compress.defaultcodec Map中間結(jié)果的壓縮格式
表2 Reduce side調(diào)優(yōu)參數(shù)表
Tab.2 Tuning parameter table of Reduce side
選項(xiàng) 類型 默認(rèn)值 描述
Mapred.reduce.parallel.copies Int 5 每個(gè)Reduce并行下載Map結(jié)果的最大線程數(shù)
Mapred.reduce.copy.backoff Int 300 Reduce下載線程最大等待時(shí)間(in sec)
io.sort.factor Int 10 同上
Mapred.job.shuffle.input.buffer.percent Float 0.7 用來緩存shuffle數(shù)據(jù)的Reduce task heap百分比
Mapred.job.reduce.input.buffer.percent Float 0.0 Sort完成后Reduce計(jì)算階段緩存數(shù)據(jù)的百分比
Mapred.job.shuffle.merge.percent Float 0.66 緩存占內(nèi)存多少百分比后做merge操作
3.2 Hive內(nèi)部邏輯優(yōu)化和代碼邏輯優(yōu)化
Hive使用HQL(Hibernate Query Language),HQL不僅提供了類似標(biāo)準(zhǔn)SQL語句的查詢方式,而且提供更加豐富靈活、更為強(qiáng)大的查詢能力,允許用戶自定義Mapper和Reducer來處理更為復(fù)雜的查詢分析任務(wù)。導(dǎo)致Hive性能不佳的原因有兩個(gè):①?zèng)]有索引支持,查詢需要暴力掃描全表;②在處理小量數(shù)據(jù)時(shí)Map/Reduce框架耗費(fèi)資源比例過大,即Map/Reduce框架本身具有較高的延遲,導(dǎo)致基于此框架下的HQL查詢也體現(xiàn)高延遲性。優(yōu)化思路:
由于Hive的HQL語言是自動(dòng)轉(zhuǎn)化為Map/Reduce程序進(jìn)行執(zhí)行的。每個(gè)job對(duì)應(yīng)一個(gè)Map/Reduce框架,所以盡可能減少job的個(gè)數(shù)可以減少執(zhí)行時(shí)間。
Map/Reduce有其數(shù)據(jù)特性,Hive也有優(yōu)化約定,所以編寫Hive語言時(shí)需注意一些規(guī)則,才能提高查詢效率。
本文對(duì)性能影響較大的join多表連接和數(shù)據(jù)傾斜等問題實(shí)施優(yōu)化。
3.2.1 join優(yōu)化
Hive只支持等值連接(equality joins)、外連接(outer joins)和左半連接(left semi joins)。
join時(shí),每次Map/Reduce任務(wù)的執(zhí)行過程如下:reducer會(huì)緩存join序列中前面所有表的記錄,然后通過最后一個(gè)表將結(jié)果序列化到HDFS。這有助于減少在reduce端內(nèi)存的使用量。無論是外關(guān)聯(lián)outer join還是內(nèi)關(guān)聯(lián)inner join,如果join的key相同,無論jion多少個(gè)表都會(huì)合并成一個(gè)Map/Reduce任務(wù)。
查詢時(shí),應(yīng)該盡量將小表放在join的左邊,否則會(huì)因?yàn)榫彺胬速M(fèi)大量內(nèi)存。例如:
SELECT x.val,y.val,z.val FROM x JOIN y ON(x.key=y.key1)JOIN z ON (z.key=y.key1)
三個(gè)表使用同一個(gè)join key,生成一次Map/Reduce任務(wù)計(jì)算。Reduce端先緩存x表和y表的記錄,然后每次取得z表中的一個(gè)記錄就計(jì)算一次join結(jié)果。
兩次Map/Reduce任務(wù):
SELECT x.val,y.val,z.val FROM x JOIN y ON(x.key=y.key1)JOIN z ON (z.key=y.key2)
生成兩次Map/Reduce任務(wù):第一次緩存x表,用y表序列化;第二次緩存第一次Map/Reduce計(jì)算的結(jié)果,然后用z表序列化。
Hive不支持where子句中的子查詢,SQL常用的IN/EXISTS子句需要改寫。IN/EXISTS子查詢?cè)贖IVE中一個(gè)更高效的實(shí)現(xiàn)是利用LEFT SEMI JOIN重寫子查詢語句。LEFT SEMI JOIN的限制是,JOIN右邊的表不能在WHERE子句、SELECT子句或其他地方過濾,只能在ON子句中設(shè)置過濾條件。
SELECT x.key, x.value FROM x WHERE x.key in (SELECT y.key FROM y)
被重寫為:
SELECT x.key, x.val FROM x LEFT SEMI JOIN y on (x.key=y.key)
對(duì)于多個(gè)子查詢SQL無關(guān)且計(jì)算量過大的SQL,可以開啟并行執(zhí)行MR任務(wù),減少計(jì)算壓力。
hive.exec.parallel[=true]
hive.exec.parallel.thread.number[=8]
hive.exec.parallel可以控制一個(gè)SQL中多個(gè)可并行執(zhí)行的job的運(yùn)行方式。當(dāng)hive.exec.parallel為true的時(shí)候,同一個(gè)SQL中可以并行執(zhí)行的job會(huì)并發(fā)的執(zhí)行。參數(shù)hive.exec.parallel.thread.number就是控制對(duì)于同一個(gè)SQL來說同時(shí)可以運(yùn)行的job的最大值,該參數(shù)默認(rèn)為8。此時(shí)最大可以同時(shí)運(yùn)行8個(gè)job。通過修改參數(shù)hive.exec.parallel和hive.exec.parallel.thread.number測(cè)試不同情況的執(zhí)行速度,實(shí)現(xiàn)性能優(yōu)化和負(fù)載均衡。
3.2.2 數(shù)據(jù)傾斜問題的解決
數(shù)據(jù)傾斜表現(xiàn):任務(wù)進(jìn)度長時(shí)間維持在99%(或100%),查看任務(wù)監(jiān)控頁面,發(fā)現(xiàn)只有少量Reduce子任務(wù)未完成。因?yàn)槠涮幚淼臄?shù)據(jù)量和其他Reduce差異過大。單一Reduce的記錄數(shù)與平均記錄數(shù)差異過大,通??赡苓_(dá)到三倍甚至更多。最長時(shí)長遠(yuǎn)大于平均時(shí)長。造成數(shù)據(jù)傾斜的主要原因:①key分布不均勻;②業(yè)務(wù)數(shù)據(jù)本身的特性;③建表時(shí)考慮不周;④某些SQL語句本身就有數(shù)據(jù)傾斜。
表3 數(shù)據(jù)傾斜
Tab.3 Data skew
關(guān)鍵詞 情形 后果
Join 其中一個(gè)表較小,但是key集中 分發(fā)到某一個(gè)或幾個(gè)Reduce上的數(shù)據(jù)遠(yuǎn)高于平均值
大表與大表,但是分桶的判斷字段0值或空值過多 這些空值都由一個(gè)Reduce處理,非常慢
group by group by維度過小,
某特殊值過多 處理某值的Reduce非常耗時(shí)
Count Distinct 某值的數(shù)量過多 處理此特殊值的Reduce耗時(shí)
(1)參數(shù)調(diào)節(jié)
hive.map.aggr=true
Map 端部分聚合,相當(dāng)于Combiner。
hive.groupby.skewindata=true
數(shù)據(jù)傾斜聚合優(yōu)化,設(shè)置參數(shù)hive.groupby.skewindata=true,控制生成兩個(gè)MR Job,第一個(gè)MR Job中,Map的輸出結(jié)果會(huì)隨機(jī)分配到reduce做一次預(yù)匯總,減少某些key值條數(shù)過多或某些key值條數(shù)過少而造成的數(shù)據(jù)傾斜問題。
(2)SQL語句調(diào)節(jié)
如何Join:關(guān)于驅(qū)動(dòng)表的選取,選用join key分布最均勻的表作為驅(qū)動(dòng)表,做好列裁剪和filter操作,以達(dá)到兩表做join的時(shí)候,數(shù)據(jù)量相對(duì)變小的效果。
大小表Join:使用map join讓小的維度表先進(jìn)內(nèi)存。在map端完成reduce。
大表Join大表:把空值的key變成一個(gè)字符串加上隨機(jī)數(shù),把傾斜的數(shù)據(jù)分到不同的reduce上,由于null值關(guān)聯(lián)不上,處理后并不影響最終結(jié)果。
count distinct大量相同特殊值:count distinct時(shí),將特殊值單獨(dú)處理。如果還有其他計(jì)算需要進(jìn)行g(shù)roup by,可以先將特殊值的記錄單獨(dú)處理,再和其他計(jì)算結(jié)果進(jìn)行union。
group by維度過?。翰捎胹um() group by的方式來替換count(distinct)完成計(jì)算。
舉例:空值產(chǎn)生的數(shù)據(jù)傾斜
日志中,常會(huì)有信息丟失的問題,比如log中的user_id和users表中的user_id關(guān)聯(lián),會(huì)碰到數(shù)據(jù)傾斜的問題。
解決方法1:user_id為空的不參與關(guān)聯(lián)
select * from log x join users y on x.user_id is not null and x.user_id=y.user_id union all select * from log x where x.user_id is null;
解決方法2:空值的key變成一個(gè)字符串加上隨機(jī)數(shù)形成新的key值
select * from log x left outer join users y on case when x.user_id is null then concat(‘hive,rand() ) else x.user_id end=y.user_id;
方法2比方法1效率更高,IO和作業(yè)數(shù)都少了。方法1中l(wèi)og讀取兩次,jobs數(shù)是2,方法2中job數(shù)是1。以上優(yōu)化方法適合無效id,比如-99,“”,null等產(chǎn)生的傾斜問題。
4 結(jié)論(Conclusion)
針對(duì)移動(dòng)通信數(shù)據(jù)中的流動(dòng)人口統(tǒng)計(jì)業(yè)務(wù)需求,設(shè)計(jì)Hive數(shù)據(jù)倉庫并進(jìn)行優(yōu)化,重點(diǎn)對(duì)性能影響較大的join連接和數(shù)據(jù)傾斜問題進(jìn)行了優(yōu)化,實(shí)現(xiàn)了數(shù)據(jù)的高速查詢和統(tǒng)計(jì)。簡(jiǎn)單統(tǒng)計(jì),如count,sum等10分鐘以內(nèi)完成;聚合統(tǒng)計(jì),如join、group by等30分鐘左右完成,高效完成了流動(dòng)人口的統(tǒng)計(jì)。
參考文獻(xiàn)(References)
[1] 智勇.基于移動(dòng)通信信息資源的人口流動(dòng)趨勢(shì)研究[J].山東社 會(huì)科學(xué),2013(5):102-105.
[2] 王大力.基于移動(dòng)通信數(shù)據(jù)處理的公安流動(dòng)人口管理系統(tǒng)設(shè) 計(jì)與研究[D].同濟(jì)大學(xué),2012.
[3] 朱珠.基于Hadoop的海量數(shù)據(jù)處理模型研究和應(yīng)用[D].北京 郵電大學(xué),2011.
作者簡(jiǎn)介:
周天綺(1976-),男,碩士,講師.研究領(lǐng)域:大數(shù)據(jù)處理.