王 浩,葛 昂,趙 晴
(華北計算機(jī)系統(tǒng)工程研究所,北京 100083)
?
基于Spark的數(shù)據(jù)庫增量準(zhǔn)實時同步
王 浩,葛 昂,趙 晴
(華北計算機(jī)系統(tǒng)工程研究所,北京 100083)
為了實現(xiàn)將傳統(tǒng)關(guān)系型數(shù)據(jù)庫中的增量數(shù)據(jù)快速導(dǎo)入同構(gòu)或者異構(gòu)目的庫,在使用已有的增量提取方法的基礎(chǔ)上,提出了通過增加并行度和流式計算的方法加快同步速度。此方法不僅支持插入、更新和刪除的增量數(shù)據(jù)同步,而且可以抽取出數(shù)據(jù)庫表結(jié)構(gòu)信息動態(tài)支持表結(jié)構(gòu)變更。與傳統(tǒng)單點抽取方式相比,大大提高了目的庫數(shù)據(jù)的新鮮度。
增量同步; Spark; 流式計算
隨著大數(shù)據(jù)技術(shù)的發(fā)展,越來越多的企業(yè)開始構(gòu)建大數(shù)據(jù)平臺進(jìn)行數(shù)據(jù)處理。然而如何將保存在關(guān)系型數(shù)據(jù)庫中的數(shù)據(jù)快速同步到大數(shù)據(jù)平臺組件(例如HBase、HDFS)中,正成為很多企業(yè)面臨的問題。Sqoop是常用的數(shù)據(jù)同步工具,其實質(zhì)是MapReduce任務(wù),延時較高,而且需要通過定時任務(wù)來達(dá)到自動化流程效果。本文在觸發(fā)器記錄數(shù)據(jù)變化的基礎(chǔ)上,提出了一種使用Spark Streaming將增量數(shù)據(jù)抽取出來,然后根據(jù)需要寫入到不同的目的庫的方法。由于只提取增量數(shù)據(jù),所以較Sqoop減少了數(shù)據(jù)量。另外由于是流式處理方式,降低了延時。
1.1 增量提取的概念
增量提取是針對上一次提取而言,將上一次提取時間點到現(xiàn)在數(shù)據(jù)庫中插入、更新、刪除的數(shù)據(jù)提取出來[1]。
1.2 常用的增量提取方法
1.2.1基于業(yè)務(wù)系統(tǒng)日志
在業(yè)務(wù)中將數(shù)據(jù)庫DML(Data Manipulation Language)語句輸出以日志的方式存儲,然后通過解析日志將DML語句在目的庫中重放以達(dá)到目的。此方法需要侵入業(yè)務(wù)系統(tǒng),對于已經(jīng)成型的業(yè)務(wù)系統(tǒng)不適用。
1.2.2 基于數(shù)據(jù)庫日志
解析數(shù)據(jù)庫日志也能達(dá)到增量提取的目的,但是各大數(shù)據(jù)庫廠商不對外開放數(shù)據(jù)庫系統(tǒng)的日志格式,這就使得解析日志變成了問題。而且各數(shù)據(jù)庫的日志格式還不盡相同,難以達(dá)到通用性。
1.2.3 基于觸發(fā)器
基于觸發(fā)器的方式,目前被廣泛運(yùn)用于數(shù)據(jù)庫增量提取。它通過在源表上建立插入、更新、刪除觸發(fā)器來記錄對數(shù)據(jù)的操作。每當(dāng)有數(shù)據(jù)變化時,就會觸發(fā)相應(yīng)的觸發(fā)器,然后運(yùn)行觸發(fā)器定義的邏輯,將變化記錄到增量表。
1.3 基于觸發(fā)器方法的具體實現(xiàn)
由于觸發(fā)器方法具有實現(xiàn)邏輯簡單,對業(yè)務(wù)無入侵,數(shù)據(jù)庫通用等優(yōu)點,所以本文采用了基于觸發(fā)器方式的增量提取方法。具體實現(xiàn)方法如下:
(1)創(chuàng)建名為dml_log的數(shù)據(jù)庫表,字段為id、table_name、record_id、execute_date、dml_type。其中id為自增id,table_name存儲要同步的源表表名稱,record_id是源表中發(fā)生變化的記錄的唯一標(biāo)識,execute_date為觸發(fā)器執(zhí)行時的時間戳,dml_type為I、U、D分別代表insert、update、delete操作。
(2)在源表上創(chuàng)建插入、更新、刪除類型的觸發(fā)器。創(chuàng)建語句在此省略。
2.1 Spark Streaming
Spark是目前大數(shù)據(jù)處理領(lǐng)域比較常用的計算框架。它將中間計算結(jié)果維護(hù)在內(nèi)存中,這樣不僅可以做到中間結(jié)果的重用,而且減少了磁盤IO,大大加快了計算速度。Spark Streaming是構(gòu)建于Spark core之上的流式處理模塊。其原理是將流式數(shù)據(jù)切分成一個個小的片段,以mini batch的形式來處理這一小部分?jǐn)?shù)據(jù),從而模擬流式計算達(dá)到準(zhǔn)實時的效果。
2.2 JdbcRDD
彈性分布式數(shù)據(jù)集(Resilient Distributed Datasets,RDD),它是Spark數(shù)據(jù)抽象的基石。RDD是一個只讀的分區(qū)記錄集合,分區(qū)分散在各個計算節(jié)點[2]。RDD提供了transformation和action兩類操作,其中transformation是lazy級別的,主要對數(shù)據(jù)處理流程進(jìn)行標(biāo)記,而不立即進(jìn)行運(yùn)算。action操作會觸發(fā)作業(yè)的提交,然后進(jìn)行回溯導(dǎo)致transformation操作進(jìn)行運(yùn)算。
JdbcRDD擴(kuò)展自RDD,是RDD的子類。內(nèi)部通過JDBC(Java Data Base Connectivity)操作以數(shù)據(jù)庫為源頭構(gòu)建RDD。其構(gòu)造函數(shù)簽名為:
class JdbcRDD[T: ClassTag](
sc: SparkContext,
getConnection:()=> Connection,
sql: String,
lowerBound: Long,
upperBound: Long,
numPartitions: Int,
mapRow:(ResultSet) => T =
JdbcRDD.resultSetToObjectArray _)
extends RDD[T](sc,Nil) with Logging {…}
2.3 具體實現(xiàn)
Spark官方提供用于構(gòu)建Spark Streaming的數(shù)據(jù)源沒有對數(shù)據(jù)庫進(jìn)行支持,所以本文自己實現(xiàn)對數(shù)據(jù)庫的支持。編寫繼承自InputDStream類的DirectJdbcInputDStream類,其簽名為:
class DirectJdbcInputDStream[T: ClassTag](
@transient ssc_ : StreamingContext,
param: JdbcParam) extends
InputDStream[Row] (ssc_) with Logging {…}
對start()、compute()和stop()方法進(jìn)行重寫。
(1)在start函數(shù)中注冊JDBC驅(qū)動,用于JDBC獲取初始化信息(構(gòu)造JdbcRDD時的參數(shù));
(2)compute函數(shù)會被框架間隔指定的時間反復(fù)調(diào)用,其實質(zhì)是如何返回一個JdbcRDD。首先通過JDBC獲取本次需要拉取的trigger記錄的id的上下界以及表的Schema信息;然后以這些信息為參數(shù)生成提取真實數(shù)據(jù)的SQL,其邏輯為用選中的trigger表中的記錄和原表在record_id上進(jìn)行左連接;最后使用該SQL當(dāng)做參數(shù)構(gòu)建JdbcRDD。值得說明的是,構(gòu)建JdbcRDD時是可以指定并行度的,每個worker節(jié)點都會建立到數(shù)據(jù)庫的JDBC連接,由多個節(jié)點并行去數(shù)據(jù)庫拉取屬于自己的那一部分?jǐn)?shù)據(jù),這就大大增加了提取和處理速度。
(3)在stop函數(shù)中關(guān)閉JDBC連接??傮w來看,就是在driver程序中執(zhí)行的JDBC程序獲取初始化參數(shù),在executor中執(zhí)行的JDBC程序拉取真實的數(shù)據(jù)。
(4)編寫driver程序:
val sc = new SparkContext(new SparkConf)
val ssc = new StreamingContext(sc,Seconds(30))
val directStream = new DirectJdbcInputDStream[Row](ssc,jdbcParam)
directStream.foreachRDD(rdd => {
…//對數(shù)據(jù)進(jìn)行處理
})
2.4 限流
假設(shè)當(dāng)前時間點到上次提取的時間點之間新增數(shù)據(jù)量太大,就會導(dǎo)致在新一次作業(yè)提交時,上一次的作業(yè)仍然沒有完成,可能會因此造成作業(yè)積壓使得系統(tǒng)不穩(wěn)定。本文使用了基于規(guī)則的限流方法,綜合考慮集群處理能力以及間隔時間,可以配置化設(shè)置每次最大提取條數(shù)。如果當(dāng)前需要提取的數(shù)據(jù)條數(shù)大于最大提取條數(shù),則本次就只提取最大條數(shù),剩下的延時到下次再進(jìn)行提取。這樣做的好處是削減了峰流對系統(tǒng)造成的影響。
測試環(huán)境:VMware虛擬機(jī),處理器設(shè)置為4核心,2 GB內(nèi)存, 64位CentOS 6.5操作系統(tǒng),Spark 1.5.1,Oracle 11g。使用4臺虛擬機(jī)搭建成Spark集群,1臺為Master,3臺為Worker。數(shù)據(jù)庫表分別設(shè)置為20、40個字段,每次最大抽取記錄數(shù)分別設(shè)置為10 000、50 000、500 000。將抽取出來的數(shù)據(jù)寫成parquet格式的文件存儲到hdfs上。測試結(jié)果如表1所示。
表1 運(yùn)行時間 (單位:s)
本文在基于數(shù)據(jù)庫觸發(fā)器記錄數(shù)據(jù)變化的基礎(chǔ)上,通過自己構(gòu)造DirectJdbcStream類提供Spark Streaming對數(shù)據(jù)庫的支持,達(dá)到準(zhǔn)實時從數(shù)據(jù)庫中抽取出增量數(shù)據(jù)的目的。并且可以對抽取出來的數(shù)據(jù)進(jìn)行過濾、清洗等操作,根據(jù)需求靈活地寫入到不同的目的庫。
[1] 郭亮.基于MD5與HASH的數(shù)據(jù)庫增量提取算法及其應(yīng)用[D].長沙:湖南大學(xué),2013.
[2] ZAHARIA M, CHOWDHURY M,DAS T,et al.Resilient distributed datasets: a fault-tolerant abstraction for in-memory cluster computing[C].Usenix Conference on Networked Systems Design & Implementation,2012,70(2):141-146.
[3] DEAN J, GHEMAWAT S.MapReduce: simplified dataprocessing on large clusters[C].USENIX Association OSDI′04: 6th Symposium on Operating Systems Design and Implementation,2004:137-149.
[4] MARTIN O.Programming in scala[M].California: Artima Press,2010.
[5] YADAV R.Spark cookbook[M].UK: Packt Publishing Ltd,2015.
[6] KARAU H.Learning spark[M].America: O’Reilly Media,Inc.2015.
[7] 梁剛.企業(yè)大數(shù)據(jù)管理解決方案[J].微型機(jī)與應(yīng)用,2013,32(24):7-10,13.
Spark-based database increment near-real-time synchronization
Wang Hao,Ge Ang,Zhao Qing
(National Computer System Engineering Research Institute of China,Beijing 100083,China)
In order to export incremental data stored in traditional database to homogeneous or heterogeneous destination,on the basis of existing incremental extraction method,we proposed a solution to speed up synchronization by increasing parallelism and using streaming instead of batch.This approach supports incremental data of inserting,updating and deleting,and can extract the database table schema information to support dynamic table structure changes.Compared with traditional single-point mode,it makes data more fresh.
increment extraction; Spark; streaming computing
TP311.1
A DOI:10.19358/j.issn.1674-7720.2016.19.002
王浩,葛昂,趙晴.基于Spark的數(shù)據(jù)庫增量準(zhǔn)實時同步[J].微型機(jī)與應(yīng)用,2016,35(19):9-10,13.
2016-05-05)
王浩(1989-),男,碩士,主要研究方向:大數(shù)據(jù)。
葛昂(1972-),男,碩士,高級工程師,主要研究方向:企業(yè)軟件架構(gòu)、多維數(shù)據(jù)綜合應(yīng)用。
趙晴(1964-),男,學(xué)士,高級工程師,主要研究方向:工業(yè)控制物聯(lián)網(wǎng)。