李子乾 朱青 徐雨申
摘 要:隨著市場(chǎng)上自主研發(fā)的數(shù)據(jù)庫(kù)的大量出現(xiàn),面對(duì)各具特色的數(shù)據(jù)庫(kù)因差異性而導(dǎo)致的數(shù)據(jù)etl過(guò)程困難的問(wèn)題,本文探討了一種通過(guò)kafka作為可靠的實(shí)時(shí)數(shù)據(jù)中轉(zhuǎn),然后通過(guò)spark streaming任務(wù)來(lái)實(shí)現(xiàn)數(shù)據(jù)入庫(kù)的技術(shù)路線,最終解決了基于電力客服業(yè)務(wù)的數(shù)據(jù)倉(cāng)庫(kù)實(shí)時(shí)數(shù)據(jù)接入的問(wèn)題。該技術(shù)路線能夠一定程度上解決一部分?jǐn)?shù)據(jù)實(shí)時(shí)數(shù)據(jù)接入的困難。與此同時(shí),可以在數(shù)據(jù)接入過(guò)程中進(jìn)行復(fù)雜的數(shù)據(jù)流式計(jì)算。
關(guān)鍵詞:實(shí)時(shí)數(shù)據(jù)接入;kafka offset;spark streaming
中圖分類號(hào):TM769 文獻(xiàn)標(biāo)識(shí)碼:A 文章編號(hào):1671-2064(2020)01-0038-03
1 背景與問(wèn)題
現(xiàn)如今大數(shù)據(jù)技術(shù)發(fā)展日新月異,數(shù)據(jù)處理框架及方法也與日俱增,但是,數(shù)據(jù)集成的接入方法缺存在一定短板,如在滿足實(shí)時(shí)數(shù)據(jù)接入的需求上存在一定的難度及不實(shí)用性。伴隨著電力客服業(yè)務(wù)體量的擴(kuò)展及海量業(yè)務(wù)數(shù)據(jù)的不斷增加,數(shù)據(jù)接入的實(shí)時(shí)性問(wèn)題成了阻礙進(jìn)一步數(shù)據(jù)分析應(yīng)用的絆腳石,因此,一種數(shù)據(jù)倉(cāng)庫(kù)實(shí)時(shí)數(shù)據(jù)接入方法應(yīng)運(yùn)而生,本文將主要針對(duì)電力客服業(yè)務(wù)數(shù)據(jù)的數(shù)據(jù)倉(cāng)庫(kù)實(shí)時(shí)接入方法進(jìn)行的研究及論證。
在數(shù)據(jù)倉(cāng)庫(kù)建設(shè)過(guò)程,一般數(shù)據(jù)倉(cāng)庫(kù)的技術(shù)選型大多采用了mpp集群。基于云計(jì)算理念的并行數(shù)據(jù)庫(kù)集群,能夠支持TB到PB級(jí)別的結(jié)構(gòu)化數(shù)據(jù)存儲(chǔ)、高效查詢[3]。電力客服數(shù)據(jù)倉(cāng)庫(kù)采用了分層的架構(gòu),在最接近原始業(yè)務(wù)系統(tǒng)數(shù)據(jù)的一層稱為數(shù)據(jù)明細(xì)層,詳細(xì)數(shù)據(jù)倉(cāng)庫(kù)架構(gòu)如圖1所示。
將業(yè)務(wù)數(shù)據(jù)從數(shù)據(jù)貼源區(qū)接入到數(shù)據(jù)倉(cāng)庫(kù)明細(xì)層的過(guò)程中,需要實(shí)現(xiàn)部分結(jié)果化數(shù)據(jù)實(shí)時(shí)接入。在傳統(tǒng)的關(guān)系型數(shù)據(jù)庫(kù)中,實(shí)時(shí)數(shù)據(jù)接入常使用oracle goldengate(下簡(jiǎn)稱為ogg)來(lái)進(jìn)行數(shù)據(jù)接入。所以,首先考慮使用ogg來(lái)進(jìn)行實(shí)時(shí)數(shù)據(jù)接入。ogg出于對(duì)數(shù)據(jù)可靠性的保證,在數(shù)據(jù)接入的過(guò)程中,需要?jiǎng)?chuàng)建checkpoint來(lái)記錄數(shù)據(jù)接入任務(wù)的執(zhí)行狀態(tài),以保證如果接入進(jìn)行中斷的時(shí)候,可以通過(guò)讀取checkpoint的狀態(tài)數(shù)據(jù)來(lái)恢復(fù)接入進(jìn)程中斷前的狀態(tài)。ogg創(chuàng)建的checkpoint的主鍵是包含4個(gè)字段,而mpp集群支持?jǐn)?shù)據(jù)表的最大聯(lián)合主鍵數(shù)為3。因此,mpp集群作為ogg數(shù)據(jù)接入的目的端的時(shí)候,無(wú)法正常使用檢查點(diǎn)功能,數(shù)據(jù)傳輸?shù)目煽啃允盏搅司薮蟮挠绊憽?/p>
本文在接下來(lái)的篇幅中,將詳細(xì)介紹,一種可以解決可靠實(shí)時(shí)數(shù)據(jù)傳輸?shù)募夹g(shù)。
2 解決辦法
為了解決在mpp集群中無(wú)法創(chuàng)建checkpointtable的問(wèn)題。首先嘗試,通過(guò)手動(dòng)的方式來(lái)創(chuàng)建checkpointtable,但是自行創(chuàng)建的表,ogg數(shù)據(jù)抽取進(jìn)程無(wú)法識(shí)別。
在經(jīng)過(guò)一些方式的嘗試后,決定通過(guò)在源端和目標(biāo)端添加中轉(zhuǎn)的方式來(lái)實(shí)現(xiàn)mpp集群中的實(shí)時(shí)數(shù)據(jù)接入。
在仔細(xì)分析了ogg的checkpoint table的實(shí)現(xiàn)機(jī)制后,發(fā)現(xiàn)其與kafka的offset機(jī)制十分類似。在kafka中,offset是一個(gè)用于存儲(chǔ)每個(gè)消息被追加到分區(qū)的序列號(hào)的變量,offset的值是隨著消息的消費(fèi)情況不斷更新的。[1]
具體來(lái)說(shuō),kafka中的Offset分為兩種:Current Offset和Committed Offset。
Current Offset保存在消費(fèi)者側(cè),表示consumer消費(fèi)者已經(jīng)接收的消息序號(hào)。舉個(gè)例子來(lái)說(shuō),consumer目前接收了10條消息,則當(dāng)前current offset的值為10。于是消費(fèi)者下一次消費(fèi)的時(shí)候,就會(huì)從第11條消息開(kāi)始,這樣可以避免每次消費(fèi)者從topic中獲取消息的時(shí)候可以避免重復(fù)。
而commited offset保存在broker上,表示consumer消費(fèi)者消費(fèi)過(guò)的消息序號(hào)。舉例來(lái)說(shuō),消費(fèi)者接收了10條數(shù)據(jù),此時(shí)消費(fèi)者這邊的current offset是10。但是消費(fèi)者接收到消息后,是否真正意義上消費(fèi)了該條消息是不確定的。這里就涉及到kafka的可靠信息傳輸機(jī)制,kafka的消息在被消費(fèi)者消費(fèi)后,是需要消費(fèi)者反饋和同步消費(fèi)情況的。這一特性實(shí)現(xiàn)的機(jī)制是依靠commitSync和commitAsync兩個(gè)方法來(lái)實(shí)現(xiàn)的。當(dāng)消費(fèi)者接收到topic的消息后,current offset會(huì)立刻更新到最新的消息序號(hào),然后消費(fèi)者拿到接收到的數(shù)據(jù)后,開(kāi)始進(jìn)行消費(fèi)也就是計(jì)算和處理。完成消費(fèi)過(guò)程后,就會(huì)通過(guò)調(diào)用commitSync和commitAsync將消息的消費(fèi)情況返回給topic,broker在接收到commitSync和commitAsync信號(hào)后,會(huì)將commited offset更新為最新的序號(hào)。表示當(dāng)前已確認(rèn)消費(fèi)的序號(hào)。
如圖2所示,committed offset為3,current offset為5。這表明當(dāng)前時(shí)間,消費(fèi)者雖然接收到了5條消息,但是第4條與第5條并未被消費(fèi),已經(jīng)消費(fèi)的消息才到第三條,當(dāng)?shù)谒臈l和第五條消費(fèi)之后,消費(fèi)者會(huì)返回一個(gè)同步信號(hào)給broker,然后committed offset才會(huì)更新。而與此同時(shí),current offset會(huì)不斷增長(zhǎng),消費(fèi)者接收數(shù)據(jù)的過(guò)程是不會(huì)停止的。與消費(fèi)是同時(shí)進(jìn)行的,兩者并不干擾,但current offset始終比committed offset要大。
Committed offset在broker一端是單獨(dú)由一個(gè)topic來(lái)記錄和管理的。當(dāng)其更新的時(shí)候,最新的commited offset就會(huì)被寫入__consumer_offsets的topic中。這樣當(dāng)kafka出現(xiàn)進(jìn)程意外停止或者是consumer group成員出現(xiàn)變化,需要consumer rebalance的時(shí)候,commited offset就可以保證新的Consumer能夠從正確的位置開(kāi)始消費(fèi)一條消息,從而避免重復(fù)消費(fèi)。這樣的話,就可以實(shí)現(xiàn)類似于ogg中的checkpoint table的功能。[1]
3 接入設(shè)計(jì)
基于上述的kafka offset機(jī)制,可以完美地解決mpp集群無(wú)法創(chuàng)建檢查點(diǎn)的問(wèn)題。首先將原始貼源區(qū)數(shù)據(jù)通過(guò)ogg的方式實(shí)時(shí)接入到kafka中,然后通過(guò)spark streaming程序訂閱kafka中的消息,將ogg格式的消息轉(zhuǎn)換處理成mpp格式數(shù)據(jù),最終實(shí)現(xiàn)數(shù)據(jù)倉(cāng)庫(kù)實(shí)時(shí)數(shù)據(jù)接入。
3.1 oracle實(shí)時(shí)數(shù)據(jù)接入kafka
在ogg將數(shù)據(jù)接入到kafka過(guò)程中,首先需要通過(guò)抽取進(jìn)程,將數(shù)據(jù)抽取放進(jìn)本地指定的數(shù)據(jù)文件隊(duì)列,然后通過(guò)投遞進(jìn)程,將數(shù)據(jù)文件傳送到目的端,目的端ogg客戶端在接收到數(shù)據(jù)文件后,將文件放入指定的遠(yuǎn)程數(shù)據(jù)隊(duì)列中,然后通過(guò)復(fù)制進(jìn)程,將數(shù)據(jù)文件解析后,以生產(chǎn)者的方式將數(shù)據(jù)發(fā)布到kafka的topic中。Oracle goldengate原理詳細(xì)情況如圖3所示。
3.1.1 源與目標(biāo)端配置管理進(jìn)程
在源端和目標(biāo)端完成ogg的安裝后,兩端都需要配置mgr管理進(jìn)程。Manager進(jìn)程是ogg的控制進(jìn)程,運(yùn)行在源端和目標(biāo)端上。它主要作用有以下幾個(gè)方面:?jiǎn)?dòng)、監(jiān)控、重啟Goldengate的其他進(jìn)程,報(bào)告錯(cuò)誤及事件,分配數(shù)據(jù)存儲(chǔ)空間,發(fā)布閥值報(bào)告等。
在管理進(jìn)程中需要配置的有:ogg進(jìn)程的監(jiān)聽(tīng)端口;出了指定的固定端口以外還需要指定一系列的動(dòng)態(tài)端口列表,當(dāng)指定的默認(rèn)的監(jiān)聽(tīng)端口不可用時(shí),會(huì)在動(dòng)態(tài)端口中隨機(jī)挑選一個(gè)作為進(jìn)程的監(jiān)聽(tīng)端口;另外需要設(shè)置自動(dòng)重啟的參數(shù)用于管理進(jìn)程下某個(gè)進(jìn)程中斷或者是重啟管理進(jìn)程,管理進(jìn)程會(huì)自動(dòng)重啟這些進(jìn)程,設(shè)定重啟的最大次數(shù)以及時(shí)間間隔;此外還需要設(shè)定定期清理ogg的傳輸數(shù)據(jù)文件的周期。
3.1.2 oracle源端配置抽取進(jìn)程以及投遞進(jìn)程
在源端配置抽取進(jìn)程,將需要進(jìn)行實(shí)時(shí)傳輸?shù)臄?shù)據(jù)表配置進(jìn)抽取進(jìn)程。
配置過(guò)程中需要配置動(dòng)態(tài)解析源端數(shù)據(jù)表;需要設(shè)置環(huán)境變量,指定源端數(shù)據(jù)庫(kù)以及字符集以及連接源端數(shù)據(jù)庫(kù)的密碼;然后指定數(shù)據(jù)庫(kù)中抽取出的數(shù)據(jù)的保存位置以及文件名;最后配置需要復(fù)制表的清單。
在配置完抽取進(jìn)程后,需要配置相配合的投遞進(jìn)程,用于將抽取出來(lái)的隊(duì)列文件發(fā)送到指定的目的端服務(wù)器上。在配置過(guò)程重要配置參數(shù)有,除了同樣需要配置禁止ogg與oracle交互以及動(dòng)態(tài)解析以外,還需要配置遠(yuǎn)程目標(biāo)端的ip地址和目的端的管理進(jìn)程的監(jiān)聽(tīng)端口,用于構(gòu)建點(diǎn)到點(diǎn)的數(shù)據(jù)傳輸通道,同樣需要配置目的端用于存放傳輸?shù)年?duì)列文件的路徑。
配置好抽取和投遞進(jìn)程后,需要分別將本地隊(duì)列文件路徑和目標(biāo)端的隊(duì)列文件路徑和抽取進(jìn)程進(jìn)行綁定。
3.1.3 配置數(shù)據(jù)表define文件
ogg在傳輸數(shù)據(jù)的過(guò)程中,需要將傳輸?shù)臄?shù)據(jù)表的詳細(xì)定義信息發(fā)送到目標(biāo)端。
首先,配置一個(gè)需要導(dǎo)出表定義文件的表清單,然后在在ogg根目錄下調(diào)用defgen指令執(zhí)行對(duì)應(yīng)的配置文件,即可自動(dòng)生成數(shù)據(jù)表定義文件,然后將表定義文件發(fā)送到目的端的指定目錄下。
3.1.4 kafka目標(biāo)端配置檢查點(diǎn)
kafka端配置好檢查點(diǎn)后,會(huì)自動(dòng)記錄數(shù)據(jù)同步的當(dāng)前進(jìn)度,當(dāng)程序中斷恢復(fù)的時(shí)候,將從檢查點(diǎn)表中記錄的最新?tīng)顟B(tài)還原。
3.1.5 kafka目標(biāo)端配置復(fù)制進(jìn)程
源端通過(guò)抽取進(jìn)程從數(shù)據(jù)表中抽取了數(shù)據(jù)文件,然后通過(guò)投遞進(jìn)程發(fā)送到目的端。目的端接收到數(shù)據(jù)文件后,需要使用復(fù)制進(jìn)程,將接收的數(shù)據(jù)文件插入到目標(biāo)端指定的數(shù)據(jù)庫(kù)中,此處即將數(shù)據(jù)復(fù)制進(jìn)kafka中,復(fù)制進(jìn)程中設(shè)計(jì)詳細(xì)的參數(shù)有:
復(fù)制進(jìn)程中需要指定從源端服務(wù)器上傳輸過(guò)來(lái)的表定義文件;然后定義kafka的詳細(xì)配置,以及對(duì)數(shù)據(jù)進(jìn)入kafka指定了固定格式以及方式和復(fù)制任務(wù)的報(bào)告生成頻率;然后需要設(shè)置復(fù)制進(jìn)程以事務(wù)傳輸時(shí),事務(wù)合并的單位,用這種方式來(lái)減少IO操作;最后需要詳細(xì)配置源端與目標(biāo)端的映射關(guān)系。
配置好復(fù)制進(jìn)程后,同樣需要將隊(duì)列文件路徑綁定到復(fù)制進(jìn)程上。
3.1.6 配置kafka.props
此處主要需要配置的參數(shù)為topicname,表示數(shù)據(jù)進(jìn)入的topic名稱,以及數(shù)據(jù)進(jìn)入topic的指定數(shù)據(jù)格式。
3.1.7 啟動(dòng)全部進(jìn)程
將上述配置好的抽取,投遞,復(fù)制進(jìn)程逐一啟動(dòng),啟動(dòng)任務(wù)后,數(shù)據(jù)將實(shí)時(shí)從源端數(shù)據(jù)庫(kù)接入到kafka中。
3.2 kafka數(shù)據(jù)實(shí)時(shí)接入mpp集群
將kafka topic中數(shù)據(jù)接入到數(shù)據(jù)倉(cāng)庫(kù)中,首先需要將kafka的數(shù)據(jù)取出,需要定義一個(gè)消費(fèi)者來(lái)消費(fèi)topic中數(shù)據(jù)。然后在消費(fèi)者的程序中將kafka中的數(shù)據(jù)解析成可以加載進(jìn)入mpp集群的的數(shù)據(jù)格式,可以解析成純數(shù)據(jù)文件,也可以直接將數(shù)據(jù)換成sql語(yǔ)句。
首先,需要通過(guò)sparkStreaming來(lái)讀取kafka的數(shù)據(jù),這里存在兩種模式,是receiver-base和direct。兩者存在一定的區(qū)別。
Receiver模式首先創(chuàng)建一個(gè)receiver也就是接收器從kafka接收數(shù)據(jù)并存儲(chǔ)在Spark executor中,然后用觸發(fā)的方式去處理接收到的數(shù)據(jù)。為了不丟數(shù)據(jù),需要開(kāi)啟WAL機(jī)制,這會(huì)將receiver接收到的數(shù)據(jù)寫一份備份到其他的存儲(chǔ)組件中去。
Direct模式,是定期查詢kafka中的每個(gè)partition的最新的offset,每個(gè)批次拉取上次處理的offset和當(dāng)前查詢的offset的范圍的數(shù)據(jù)進(jìn)行處理。這種模式為了保證數(shù)據(jù)傳輸?shù)目煽啃?,offset是需要手動(dòng)保存的。
這里,本文介紹的方法中,選用了direct模式來(lái)消費(fèi)kafka。
在配置好連接kafka的功能后,將每個(gè)接受到的消息轉(zhuǎn)化為sparkStreaming rdd任務(wù),對(duì)每個(gè)rdd任務(wù)進(jìn)行數(shù)據(jù)的解析。[4]
由于kafka中保存的ogg同步數(shù)據(jù)格式可以解析成json格式,獲取到j(luò)son數(shù)據(jù)后,可以通過(guò)數(shù)據(jù)中的操作類型來(lái)將數(shù)據(jù)還原成sql。
在完成sql的解析還原后,只需要通過(guò)執(zhí)行jdbc去執(zhí)行該條sql,就可以完成數(shù)據(jù)的入庫(kù)。
在整個(gè)過(guò)程中,通過(guò)創(chuàng)建streamingContext類來(lái)實(shí)現(xiàn)數(shù)據(jù)流的監(jiān)聽(tīng)功能。[5]
這里通過(guò)終端等待的方式,來(lái)監(jiān)聽(tīng)kafka中的數(shù)據(jù),當(dāng)kafka中產(chǎn)生新的數(shù)據(jù)時(shí),kafka就會(huì)給spark streaming發(fā)送signal,而spark程序則會(huì)以響應(yīng)的方式,立刻去消費(fèi)kafka中新產(chǎn)生的數(shù)據(jù)。[2]但是這樣的過(guò)程是存在一定的延遲的。在實(shí)際的測(cè)試過(guò)程中,這樣的延遲可能會(huì)達(dá)到5s左右,對(duì)于數(shù)據(jù)倉(cāng)庫(kù)實(shí)時(shí)數(shù)據(jù)接入的需求來(lái)看,這樣程度的延遲是可以接受的。
綜上,上述過(guò)程即為數(shù)據(jù)倉(cāng)庫(kù)實(shí)時(shí)數(shù)據(jù)接入的一種實(shí)現(xiàn)方式。
4 結(jié)語(yǔ)
本文詳細(xì)介紹了,在傳統(tǒng)的實(shí)時(shí)數(shù)據(jù)傳輸工具(ogg)存在一定局限性的時(shí)候,首先分析了限制工具的影響因素,從中分析出了根本原因,由于mpp集群的主鍵數(shù)量限制,采用將數(shù)據(jù)先實(shí)時(shí)傳輸?shù)絢afka中,并且通過(guò)kafka實(shí)現(xiàn)了數(shù)據(jù)的可靠傳輸,然后通過(guò)spark Streaming實(shí)現(xiàn)了響應(yīng)式的數(shù)據(jù)入庫(kù)過(guò)程。最終以幾秒的延遲代價(jià),解決了數(shù)據(jù)倉(cāng)庫(kù)mpp集群的實(shí)時(shí)數(shù)據(jù)接入問(wèn)題。
從這個(gè)技術(shù)路線向外延展,通過(guò)kafka中轉(zhuǎn),再到spark Streaming流處理。該條技術(shù)實(shí)現(xiàn)完全可以實(shí)現(xiàn)更多的數(shù)據(jù)處理與計(jì)算。通過(guò)spark Streaming,kafka中的數(shù)據(jù)完全可以流向更多的數(shù)據(jù)組件而不僅僅局限于本文所提到的mpp集群。
參考文獻(xiàn)
[1] 費(fèi)秀宏.基于Kafka的日志處理平臺(tái)的研究[D].長(zhǎng)春:吉林大學(xué),2017.
[2] 薛瑞,朱曉民.基于Spark Streaming的實(shí)時(shí)日志處理平臺(tái)設(shè)計(jì)與實(shí)現(xiàn)[J].電信工程技術(shù)與標(biāo)準(zhǔn)化,2015(09):55-58.
[3] EfemG.Mallach.決策支持與數(shù)據(jù)倉(cāng)庫(kù)系統(tǒng)[M].李昭智,李昭勇,譯.北京:電子工業(yè)出版社,2001.
[4] 黨壽江,劉學(xué),王星凱,等.基于Spark Streaming的實(shí)時(shí)數(shù)據(jù)采集分析系統(tǒng)設(shè)計(jì)[J].網(wǎng)絡(luò)新媒體技術(shù),2017(05):48-53.
[5] 韓德志,陳旭光,雷雨馨,等.基于Spark Streaming的實(shí)時(shí)數(shù)據(jù)分析系統(tǒng)及其應(yīng)用[J].計(jì)算機(jī)運(yùn)用,2017(05):1263-1269.