侯曉芳,王 歡,李 瑛
(北華航天工業(yè)學院 計算機與遙感信息技術(shù)學院,河北 廊坊 065000)
隨著移動設(shè)備和互聯(lián)網(wǎng)業(yè)務的快速發(fā)展,每天都會有TB級甚至更多的數(shù)據(jù)量產(chǎn)生。這些數(shù)據(jù)具有數(shù)據(jù)量大、增長快速、非結(jié)構(gòu)化等特點,可能隱藏著大量的潛在信息。互聯(lián)網(wǎng)工作人員可以通過對這些海量數(shù)據(jù)進行分析處理,從中挖掘出一些有價值的信息,這些信息可以在企業(yè)業(yè)務拓展、市場營銷、產(chǎn)品推薦和企業(yè)管理等方面為企業(yè)提供一定的決策支持[1],也可以作為對某一行業(yè)未來發(fā)展趨勢判斷的依據(jù),大數(shù)據(jù)產(chǎn)生的價值不可估量。要產(chǎn)生上述方面的意義就離不開海量數(shù)據(jù)的收集、數(shù)據(jù)的存儲及處理、數(shù)據(jù)挖掘等技術(shù),面對這些井噴的數(shù)據(jù),如何高效的收集、存儲、處理并分析這些數(shù)據(jù)就成為擺在我們面前亟待解決的問題。
隨著“互聯(lián)網(wǎng)+”時代的到來,移動業(yè)務快速增長,它提供給我們的不僅是傳統(tǒng)通信業(yè)務,更多的將是智慧生活和商業(yè)服務[2-3]。電信運營商擁有著大量的數(shù)據(jù),正是提供這些服務的支撐。面對海量數(shù)據(jù)的挖掘和處理,傳統(tǒng)的關(guān)系型數(shù)據(jù)庫和大型高性能計算機顯而易見已不能滿足現(xiàn)階段服務的需求,存在著技術(shù)上和性能上的瓶頸。這些都給電信運營商提出了巨大的挑戰(zhàn),急需新技術(shù)存儲、管理和挖掘海量數(shù)據(jù)。
為了解決這些問題,已經(jīng)有很多研究者對其進行了研究。文獻[4]在MapReduce對海量數(shù)據(jù)計算范式的基礎(chǔ)上,提出了一種基于頻繁子圖挖掘算法的MapReduce迭代框架。文獻[5]基于Hive的任務轉(zhuǎn)化模式,利用Hadoop架構(gòu)的分布式文件系統(tǒng)和MapReduce并行編程模型,實現(xiàn)海量日志的有效存儲與查詢。文獻[6]根據(jù)ERF數(shù)據(jù)的特點結(jié)合分布式框架Hadoop的計算優(yōu)勢,改進了網(wǎng)絡數(shù)據(jù)的解析模式和數(shù)據(jù)存儲模式,完成ERF網(wǎng)絡數(shù)據(jù)自動上傳。文獻[7]基于Hive的性能優(yōu)化研究,解決分布式存儲系統(tǒng)中文件系統(tǒng)的數(shù)據(jù)壓縮和存儲格式問題,通過對MapReduce作業(yè)調(diào)度和Hive性能調(diào)優(yōu)兩個方面對Hive的性能進行了優(yōu)化。
大數(shù)據(jù)技術(shù)應用主要體現(xiàn)在電信數(shù)據(jù)分析處理中,即如何全面地解決數(shù)據(jù)收集、存儲、分析和處理等眾多問題。本文將從分布式集群部署、日志數(shù)據(jù)收集、HDFS數(shù)據(jù)存儲、數(shù)據(jù)清洗、數(shù)據(jù)業(yè)務處理和等方面提出解決關(guān)鍵技術(shù)的方法。
連接到網(wǎng)絡中的所有設(shè)備,如智能手機、手持電腦、筆記本等通過互聯(lián)網(wǎng)訪問網(wǎng)絡資源時,其訪問的信息會通過相應電信運營商的基站轉(zhuǎn)發(fā)出去,并進行傳輸,所以從基站可以獲取到所有用戶訪問互聯(lián)網(wǎng)的日志,這些日志信息就是我們要分析處理的數(shù)據(jù),從其中分析出有價值的信息。
從基站得到的用戶日志信息可能是通話、短信、即時通信(如QQ、微信等)或者HTTP網(wǎng)絡訪問等。從訪問的鏈接地址可以分析統(tǒng)計出用戶訪問各網(wǎng)站的頻率,如微信即時通信、支付信息、游戲、閱讀、音頻視頻和導航定位等。還可以統(tǒng)計出用戶對同一個網(wǎng)絡資源的訪問情況、居民上網(wǎng)時間分布、網(wǎng)站訪問量排名等。我們可以根據(jù)上網(wǎng)人群的日志信息,分析出不同地域、各類人群的用戶行為,把這些結(jié)論應用在商業(yè)背景下將會產(chǎn)生很大的效益,如產(chǎn)品推薦系統(tǒng)就是大數(shù)據(jù)技術(shù)背景下的產(chǎn)物,電信業(yè)務數(shù)據(jù)的處理也有著巨大地潛在利益。
日志信息通常來源于多個基站,首先需要進行日志數(shù)據(jù)收集,可利用多臺服務器通過Flume進行,海量數(shù)據(jù)將落地在HDFS中。這些數(shù)據(jù)在分析處理前需要進行數(shù)據(jù)清洗,拋棄多種原因?qū)е碌腻e誤無效日志,清洗出有效的數(shù)據(jù),對數(shù)據(jù)進行分組、合并后,可根據(jù)實際業(yè)務需求進行處理,得到想要的結(jié)果。顯然,這一過程不能僅依賴于一臺服務器進行,即同時提供海量數(shù)據(jù)的收集、存儲、處理和分析,這樣效率太低,且不能滿足高可用,故需要采用分布式機器集群系統(tǒng)才能滿足需求。圖1為搭建的集群部署示意圖。
圖1 集群部署示意圖
電信用戶的數(shù)據(jù)存儲在服務器中,這些數(shù)據(jù)被收集到Flume服務器中。這些服務器負責讀取數(shù)據(jù)并將數(shù)據(jù)分發(fā)到二級服務器中,二級服務器則負責持久化和處理數(shù)據(jù)。為保證集群正常運行,提高系統(tǒng)可靠性和執(zhí)行效率,故增加了故障恢復與負載均衡服務。
具體電信日志數(shù)據(jù)處理的步驟為:
(1)利用Flume收集數(shù)據(jù)存儲到HDFS中。共配置三臺虛擬機,其中一臺接受外部數(shù)據(jù)源傳遞給它的事件,Source從指定的文件夾下定時地掃描拷貝文件,掃描到Flume中,通過Memory Channel連接到Avro sink,將數(shù)據(jù)發(fā)送給第二臺虛擬機,此時數(shù)據(jù)存儲到了第二臺虛擬機的HDFS中。第三臺虛擬機作為第二臺虛擬機的備份,負責負載均衡或故障恢復,此處也可增設(shè)一臺服務器都做配置。虛擬機之間關(guān)系及配置如圖2所示。
圖2 虛擬機之間的關(guān)系及配置
(2)創(chuàng)建HIVE[8]外部表管理HDFS中的日志數(shù)據(jù)。
(3)利用HIVE清洗數(shù)據(jù),過濾掉錯誤日志,新建一個Hive表保存清洗過的有效數(shù)據(jù)。
(4)對有效數(shù)據(jù)進行分組,根據(jù)實際需求進行業(yè)務邏輯處理。例如,假設(shè)要統(tǒng)計用戶對某一網(wǎng)址的訪問情況,就將所有訪問HTTP請求的記錄存儲在一張數(shù)據(jù)表中,數(shù)據(jù)表代表訪問次數(shù)的相應字段設(shè)為1。根據(jù)網(wǎng)址字段值的不同,合并記錄做累加將結(jié)果保存到新數(shù)據(jù)表中,數(shù)據(jù)仍然存儲在HDFS中。
(5)可使用Sqoop將HDFS中處理后的數(shù)據(jù)導出到Mysql數(shù)據(jù)庫中。
虛擬機1要實現(xiàn)將日志數(shù)據(jù)傳輸?shù)教摂M機2的HDFS中,虛擬機3作為故障恢復或負載均衡服務器,當虛擬機2無法正常工作時,虛擬機3開始工作。故在虛擬機1中Flume需要配置sink processor,其type設(shè)可為failover或load_balance。該processor需要兩個sink,優(yōu)先級高的sink通過通道連接虛擬機2,優(yōu)先級低的連接虛擬機3,相關(guān)配置如下:
a1.sinkgroups.g.processor.type=failover
a1.sinkgroups.g.processor.priority.s1=5
a1.sinkgroups.g.processor.priority.s2=1
虛擬機1的Source應配置為 spooling directory source,用來監(jiān)視存儲日志信息的文件目錄,Sink配置為avro sink,Channel配置為memory。由于Flume數(shù)據(jù)收集不需要Hadoop支持,所以該虛擬機不需要安裝Hadoop組件。
虛擬機2 Flume的Source應配置為avro source,對接虛擬機1的Sink,監(jiān)聽虛擬機1的avro sink端口。虛擬機2的Sink應配置為hdfs sink,此Sink將把數(shù)據(jù)寫到Hadoop的HDFS分布式文件系統(tǒng)中。虛擬機2利用Flume與HDFS通信,要求Hadoop必須安裝配置好。虛擬機3與虛擬機2配置基本相同,核心配置為:
a1.sinks.s1.type=hdfs
a1.sinks.s1.hdfs.path=hdfs://[hostname]:9000/mydata
為了方便數(shù)據(jù)處理,可以在虛擬機1的Source配置中增設(shè)Timestamp攔截器,給每條收集到的日志加上時間戳,這樣我們就可以利用這一時間戳將日志數(shù)據(jù)按時間落在不同的HDFS文件夾中。實驗設(shè)置每天的數(shù)據(jù)在一個文件夾中。虛擬機1中增加Source攔截器的配置,如下:
a1.sources.r1.interceptors=i
a1.sources.r1.interceptors.i.type=timestamp
HDFS數(shù)據(jù)存儲路徑配置可改為:
a1.sinks.s1.hdfs.path=hdfs://[hostname]:9000/mydata/time=%Y-%m-%d-00-00-00
分別啟動每臺虛擬機的Agent,上傳實驗數(shù)據(jù)到虛擬機1的Agent監(jiān)視目錄,數(shù)據(jù)正常落到了虛擬機2的HDFS上,可以通過命令查看到生成的日志文件,路徑為Path指定的目錄。
HIVE是基于Hadoop[6-7]的一個數(shù)據(jù)倉庫工具,是Facebook公司創(chuàng)建的數(shù)據(jù)倉庫應用。它可以將結(jié)構(gòu)化的數(shù)據(jù)映射成一張數(shù)據(jù)表,提供HQL語句進行查詢、刪除等簡單操作,對于復雜的業(yè)務邏輯提供了內(nèi)置函數(shù)及JAVA API接口去自定義實現(xiàn)。這些HIVE的操作底層均是轉(zhuǎn)為MapReduce任務去執(zhí)行,可以方便地實現(xiàn)海量數(shù)據(jù)的統(tǒng)計分析[9]。
利用HIVE處理HDFS上的數(shù)據(jù),首先需要創(chuàng)建外部表來管理這些數(shù)據(jù),數(shù)據(jù)經(jīng)清洗后存儲在新的數(shù)據(jù)庫表中,按照實際業(yè)務需求對新表數(shù)據(jù)做分組合并操作,可映射成多個數(shù)據(jù)庫表。具體過程如下。
(1)創(chuàng)建外部表關(guān)聯(lián)文件與數(shù)據(jù)采集
首先依次啟動Hadoop、HIVE,建立數(shù)據(jù)庫、建立外部數(shù)據(jù)表dx_data,將HDFS中的記錄插入到該表中,這個過程會轉(zhuǎn)換成MapReduce執(zhí)行。由于原始數(shù)據(jù)字段較多,此處不再列出詳細表結(jié)構(gòu)。在數(shù)據(jù)采集過程中,通過FTP下載的方式將源系統(tǒng)中的數(shù)據(jù)采集到Hadoop集群服務器上,是一種完全分布式的數(shù)據(jù)采集方式。
(2)數(shù)據(jù)清洗
本文只統(tǒng)計分析HTTP網(wǎng)絡訪問情況,根據(jù)需求取出相關(guān)字段新建表dx_newdata。對表dx_data做查詢,利用HIVE的Insert操作,將結(jié)果保存在新表dx_newdata中。數(shù)據(jù)庫表dx_newdata相關(guān)字段如表1所示。
表1 dx_newdata
(3)業(yè)務邏輯處理
新建業(yè)務邏輯表dx_businessdata,前7個字段是原有字段,剩余3個字段需利用Hive內(nèi)置函數(shù)對dx_newdata表中數(shù)據(jù)做運算,表結(jié)構(gòu)如表2。每條記錄當Trans_state字段值有效時,說明鏈接成功,接收次數(shù)Receive_num值置為1,否則置為0;總流量Stream_total值為Stream_up與Stream_down值之和;Trans_state字段值有效時訪問時長Time_total值為Time_end與Time_start之差,否則為0,結(jié)果插入到新表dx_businessdata中。
表2 dx_businessdata
業(yè)務邏輯處理需要根據(jù)需求進行,不同的需求處理不同。對應HQL語句為:
insert overwrite table dx_businessdata select Client_ip,Client_port,Server_ip,Server_port,Host_name,Type_id,Application_class,if(Trans_state==1,1,0),sum(Stream_up+Stream_down),if(Trans_state==1,Time_end-Time_start,0) from dx_newdata;
(4)數(shù)據(jù)分組、歸并
這一過程仍要根據(jù)需求進行,下面以統(tǒng)計網(wǎng)站訪問排名為例來說明。對表dx_businessdata以字段Server_ip和Host_name進行分組,建立新數(shù)據(jù)表結(jié)構(gòu)如表3,插入數(shù)據(jù)到對應字段。HQL語句為:
insert overwrite table dx_host select Server_ip, Host_name,sum(Receive_num),sum(Stream_total), sum(Time_total) from dx_businessdata group by Server_ip, Host_name;
表3 dx_host
這一過程相當于MapReduce框架的Shuffle,根據(jù)相同的主機名和服務器IP對記錄進行歸并,得到類似
本文實驗硬件設(shè)備為intel core i7 四核處理器、6G內(nèi)存、64位操作系統(tǒng)。虛擬機軟件環(huán)境為CentOS6.5、JDK8、hadoop2.7.1、flume1.6.0、Hive1.2.0和mysql5.1.38。為了體現(xiàn)所提方法的優(yōu)異性,將文獻[6]和文獻[7]技術(shù)視為對照組。
為了研究數(shù)據(jù)采集方面的性能,本文對不同大小的文件作為采集對象,大小有5 M、100 M、500 M,采集時間如表4-6所示,對照組是文獻[6]采用的傳統(tǒng)采集方式、文獻[7]采用的偽分布式。
從表4-6中可以看出,本文通過完全分布式方法將源系統(tǒng)中的數(shù)據(jù)采集到Hadoop集群服務器上,整體上速度最快。文獻[7]的偽分布方式次之,這主要是因為在完全分布式模塊中,單個任務節(jié)點可以給三個工作節(jié)點分配任務,整體效率更高。從表4的前2行數(shù)據(jù)、表5的前3行數(shù)據(jù)、表6的前2行數(shù)據(jù)中還可以看出,文獻[7]的偽分布Hadoop方式比傳統(tǒng)方式更優(yōu),這主要是因為Hadoop的管理機制會為數(shù)據(jù)的處理預留一定的緩存空間,但并沒有被完全使用,因此,數(shù)據(jù)量較小時,其優(yōu)勢難以體現(xiàn)。
表4 文件大小約為5 M的數(shù)據(jù)采集時間/s
表5 文件大小約為100 M的數(shù)據(jù)采集時間/s
表6 文件大小約為500 M的數(shù)據(jù)采集時間/s
根據(jù)日志文件,訪問排名前五網(wǎng)站的情況如圖3所示,可以看出,本文配置下的MapReduce的CPU耗費時間為8.75 s,總耗時為114.81 s(這里沒有考慮數(shù)據(jù)采集時間)。由于處理時間與機器硬件配置有關(guān),因此對于不同實驗環(huán)境,結(jié)果可能有所不同。本文對其他方法盡最大可能進行重復實現(xiàn),對不同大小的文件,文件數(shù)為1個,總耗時情況如圖4所示。可以看出本文框架,隨著文件大小的增加,總處理時間的優(yōu)勢越明顯,這主要得益于數(shù)據(jù)處理與優(yōu)化過程。
圖3 網(wǎng)站訪問排名
圖4 對不同大小文件各方法的耗時情況
另外,還可以查詢某一區(qū)域或居民社區(qū)的網(wǎng)站訪問量與排名。圖5就是統(tǒng)計某一地區(qū)網(wǎng)站的訪問情況,此處的日志文件數(shù)據(jù)量較小。值得一提,根據(jù)日志中各應用類別消耗的總流量情況可統(tǒng)計出最熱門的應用類別排名,具體如圖6所示。圖中第一列為應用類別編號,第二列為對應的總流量,第三列為總消耗時間。這些結(jié)果可作額外參考。
圖5 區(qū)域內(nèi)網(wǎng)站訪問情況
圖6 應用類別統(tǒng)計排名
利用JAVA代碼雖能夠?qū)崿F(xiàn)數(shù)據(jù)收集、業(yè)務處理、統(tǒng)計分析這些功能,但其數(shù)據(jù)處理過程復雜、編寫代碼量大,主機間協(xié)調(diào)調(diào)用都需用代碼實現(xiàn),而且其僅適用于數(shù)據(jù)文件小的情況,海量數(shù)據(jù)難以處理[10-11]。利用JAVA編程實現(xiàn)只適用于有JAVA開發(fā)背景且熟悉相關(guān)API接口的IT技術(shù)人員,對行業(yè)內(nèi)部人員不適用。電信數(shù)據(jù)和很多行業(yè)數(shù)據(jù)一樣,其難度并不在于業(yè)務復雜性上,而在于海量數(shù)據(jù)上。數(shù)據(jù)量太大了,再簡單的數(shù)據(jù)處理也會變得很困難,這正是基于Hadoop平臺海量數(shù)據(jù)處理應用產(chǎn)生的背景,數(shù)據(jù)存儲、數(shù)據(jù)處理、數(shù)據(jù)統(tǒng)計等都需要用分布式集群來解決問題。
解決大規(guī)模數(shù)據(jù)的分布式存儲,Hadoop 集群支持上千個節(jié)點,支持可擴展,在規(guī)模上是普通集群無法比擬的,數(shù)據(jù)吞吐量大[12]。但MapReduce仍然是在JAVA開發(fā)環(huán)境中用JAVA語言開發(fā)Map、Reduce程序,只是省了少部分協(xié)調(diào)的代碼,數(shù)據(jù)業(yè)務處理、數(shù)據(jù)統(tǒng)計分析等大部分代碼依舊需要,JAVA編程的弊端仍然存在。
而HIVE是數(shù)據(jù)倉庫工具,可以通過編寫和執(zhí)行HQL語句直接對數(shù)據(jù)庫表進行操作,能夠新建數(shù)據(jù)表篩選數(shù)據(jù)直接插入,能夠方便快速清洗、查詢數(shù)據(jù),語句執(zhí)行自動轉(zhuǎn)換成MapReduce任務,計算速度快,數(shù)據(jù)吞吐量大,非常適合海量數(shù)據(jù)應用場景。HQL語法類似于數(shù)據(jù)庫查詢語言,具有易學易用的特點,適合非IT業(yè)人員。對于復雜的應用HIVE也提供了自定義內(nèi)置函數(shù)和API接口,可靈活使用,對HIVE進行合理配置和參數(shù)優(yōu)化,可以提高任務執(zhí)行效率。
本文以電信日志文件為具體應用背景,以統(tǒng)計分析網(wǎng)站的訪問排名為需求,介紹海量數(shù)據(jù)的處理方法。主要對數(shù)據(jù)進行清洗、邏輯處理、分組合并;最后,完成查詢分析。借助HIVE進行大數(shù)據(jù)處理,這種方法比僅使用MapReduce計算框架更優(yōu),因為它不需要編寫大量的Java代碼來處理業(yè)務邏輯,對于不同語言背景或不熟悉JAVA API接口的人員使用更方便。HIVE的這些特性使得大數(shù)據(jù)處理更方便,適合于不同行業(yè)背景的人來使用,沒有Java編程經(jīng)驗的人也可以順利完成。
未來考慮在實驗中,對HIVE內(nèi)部進行優(yōu)化,如Map數(shù)量、Join的優(yōu)化、合并小文件等方式,進一步提高數(shù)據(jù)的執(zhí)行效率。