摘要:面對(duì)部分國(guó)產(chǎn)非公開數(shù)據(jù)庫日志格式的數(shù)據(jù)庫數(shù)據(jù)高實(shí)時(shí)共享同步問題,文章設(shè)計(jì)了一種基于ThreadLocal和雙切面的數(shù)據(jù)同步共享方案。通過在Dao層切面采集Web請(qǐng)求響應(yīng)過程中對(duì)數(shù)據(jù)庫的所有變更記錄,暫存在ThreadLocal中;在Controller層切面攔截響應(yīng),從ThreadLocal提取出所有變更記錄,最后通過傳輸管理模塊將變更記錄共享發(fā)送給其他系統(tǒng),實(shí)現(xiàn)數(shù)據(jù)庫數(shù)據(jù)的增量共享。實(shí)驗(yàn)對(duì)比發(fā)現(xiàn)方案采集時(shí)間、發(fā)送時(shí)間相對(duì)于業(yè)務(wù)操作時(shí)間占比極低,這是因?yàn)閿?shù)據(jù)采集、發(fā)送等操作均在本地內(nèi)存中進(jìn)行,而業(yè)務(wù)操作依賴于數(shù)據(jù)庫網(wǎng)絡(luò)IO,所以該方案能夠?qū)崿F(xiàn)國(guó)產(chǎn)數(shù)據(jù)庫數(shù)據(jù)同步共享,具備較高的實(shí)時(shí)性和較低的侵入性。
關(guān)鍵詞:數(shù)據(jù)共享;切面;ThreadLocal;國(guó)產(chǎn)數(shù)據(jù)庫;高實(shí)時(shí)
中圖分類號(hào):TP39 "文獻(xiàn)標(biāo)志碼:A
0 引言
數(shù)據(jù)是維系企業(yè)生存的“血液”[1],要想提高企業(yè)生命力,必須重視數(shù)據(jù)的同步共享。通常的數(shù)據(jù)同步共享方式可分為同源數(shù)據(jù)同步和異構(gòu)數(shù)據(jù)同步2種。同源數(shù)據(jù)庫解決方案主要包括基于數(shù)據(jù)庫觸發(fā)器或者數(shù)據(jù)庫日志系統(tǒng)進(jìn)行數(shù)據(jù)同步共享,如劉鵬等[2]利用canal基于數(shù)據(jù)庫日志系統(tǒng)實(shí)現(xiàn)數(shù)據(jù)雙向同步;異構(gòu)數(shù)據(jù)同步解決方案主要為基于ETL工具進(jìn)行數(shù)據(jù)同步,如陶慧玲等[3]采用DataX進(jìn)行異構(gòu)數(shù)據(jù)同步,王天舉等[4]基于Kettle實(shí)現(xiàn)鐵路系統(tǒng)異構(gòu)數(shù)據(jù)同步。
在國(guó)產(chǎn)化背景下使用國(guó)產(chǎn)關(guān)系型數(shù)據(jù)庫存儲(chǔ)業(yè)務(wù)數(shù)據(jù),根據(jù)業(yè)務(wù)需要將國(guó)產(chǎn)關(guān)系型數(shù)據(jù)庫中數(shù)據(jù)實(shí)時(shí)同步共享出去。其中,基于數(shù)據(jù)庫觸發(fā)器數(shù)據(jù)同步方式耗費(fèi)數(shù)據(jù)庫資源高[5],基于數(shù)據(jù)庫日志系統(tǒng)同步方式不適用于非公開數(shù)據(jù)庫日志格式的場(chǎng)景[6],基于ETL異構(gòu)數(shù)據(jù)源共享的機(jī)制決定了其難以解決高實(shí)時(shí)性問題[7]。而筆者面臨場(chǎng)景就是常規(guī)的Web請(qǐng)求下,使用國(guó)產(chǎn)關(guān)系型數(shù)據(jù)庫持久化業(yè)務(wù)數(shù)據(jù),但并未提供數(shù)據(jù)同步工具,也未開放數(shù)據(jù)庫日志格式,同時(shí)又對(duì)數(shù)據(jù)實(shí)時(shí)共享要求非常高。傳統(tǒng)的方法不管是ETL還是利用公開日志格式進(jìn)行數(shù)據(jù)同步共享均不能很好地滿足需求,因此需要設(shè)計(jì)一種新的高實(shí)時(shí)非公開日志格式的國(guó)產(chǎn)關(guān)系型數(shù)據(jù)庫同步共享方案,該方案要具較高的實(shí)時(shí)性同時(shí)對(duì)業(yè)務(wù)代碼侵入較低。
1 原理
1.1 相關(guān)理論
在典型的Web請(qǐng)求-響應(yīng)流程中,一個(gè)Web Request進(jìn)入服務(wù)端到Response返回前端的生命周期中,依次經(jīng)過Controller、Service、Dao等組件層,通過Dao層將數(shù)據(jù)變更寫入數(shù)據(jù)庫,再反過來經(jīng)Dao、Service、Controller層,完成一次Web請(qǐng)求響應(yīng)過程。通常請(qǐng)求-響應(yīng)模式中,為提高響應(yīng)效率,服務(wù)端請(qǐng)求響應(yīng)線程由服務(wù)端從線程池中分配而來,響應(yīng)結(jié)束后又歸還給服務(wù)端線程池,由單個(gè)線程完成整個(gè)處理過程。該方案的技術(shù)路徑是通過依次采集該Web請(qǐng)求響應(yīng)過程中對(duì)數(shù)據(jù)庫的所有變更記錄(如增加、修改、刪除等操作),然后將變更記錄同步共享出去,完成數(shù)據(jù)的同步共享,整體過程如圖1所示。
如果直接在程序中通過編碼的方式記錄下對(duì)數(shù)據(jù)庫的操作,無疑會(huì)對(duì)原有業(yè)務(wù)代碼侵入較大,導(dǎo)致業(yè)務(wù)邏輯不清晰,后期維護(hù)成本增大,因此須要采用 Aspect Oriented Programming(AOP),即面向切面編程[8]。將與核心業(yè)務(wù)無關(guān)卻多次使用的功能模塊抽取分離成切面,和核心業(yè)務(wù)模塊進(jìn)行解耦,降低代碼復(fù)雜度,提高程序的后期可維護(hù)性[9]?,F(xiàn)代程序大量使用切面編程方式,在實(shí)現(xiàn)一些與核心業(yè)務(wù)弱相關(guān)功能的同時(shí)又降低了程序的復(fù)雜性。這里將數(shù)據(jù)庫變更記錄和采集過程使用AOP技術(shù)從業(yè)務(wù)中剝離出來,能夠提高程序可讀性以及可維護(hù)性。
為了提高程序的實(shí)時(shí)性,結(jié)合請(qǐng)求-響應(yīng)的單線程過程,這里采用線程局部量(ThreadLocal)技術(shù)。通過在線程內(nèi)部存儲(chǔ)變量,能夠減少同一個(gè)線程內(nèi)多個(gè)函數(shù)或者組件之間一些公共變量傳遞的復(fù)雜度[10]。白曉濤等[11]用ThreadLocal存儲(chǔ)當(dāng)前用戶信息,方便處理過程中進(jìn)行存儲(chǔ)使用。線程局部量存儲(chǔ)的數(shù)據(jù)可以很方便地在本線程運(yùn)行過程獲取,不會(huì)受到其他線程的影響。因此,使用ThreadLocal存儲(chǔ)數(shù)據(jù)庫變更記錄,能夠提高程序的響應(yīng)速度,同時(shí)進(jìn)一步降低對(duì)業(yè)務(wù)代碼的侵入。
1.2 實(shí)現(xiàn)原理
該解決方案由Controller層切面、Dao層切面、ThreadLocal管理、數(shù)據(jù)傳輸管理等模塊組成,如圖2所示。
Controller層切面架設(shè)在Controller和Service之間,攔截進(jìn)入的請(qǐng)求,初始化ThreadLocal,清理該ThreadLocal中的臟數(shù)據(jù)(來自線程池中的復(fù)用線程,會(huì)攜帶上次請(qǐng)求處理中的臟數(shù)據(jù))。Dao層切面架設(shè)在Service和Dao之間,采集Dao層對(duì)數(shù)據(jù)庫所做的所有變更操作,抽象出變更的數(shù)據(jù)內(nèi)容(add操作、delete操作、modify操作)并將其存儲(chǔ)在線程局部量ThreadLocal中,以便于后續(xù)的數(shù)據(jù)打包發(fā)送。在請(qǐng)求返回Controller層切面時(shí),提取Dao層切面采集的變更記錄數(shù)據(jù),將其序列化打包,通過數(shù)據(jù)傳輸模塊將數(shù)據(jù)變更記錄發(fā)送出去,通過Kafka或者其他數(shù)據(jù)中心將數(shù)據(jù)變更記錄同步到共享出去,通過這4個(gè)模塊的配合實(shí)現(xiàn)高實(shí)時(shí)國(guó)產(chǎn)數(shù)據(jù)庫數(shù)據(jù)同步共享功能,具體實(shí)現(xiàn)原理如圖3所示。
2 技術(shù)實(shí)現(xiàn)與實(shí)驗(yàn)
2.1 偽代碼
筆者使用Spring后端服務(wù)框架,實(shí)現(xiàn)Controller層切面、Dao層切面,ThreadLocal管理、數(shù)據(jù)傳輸管理等模塊。
2.1.1 ThreadLocal管理
線程局部量ThreadLocal中保存的是解決方案的核心數(shù)據(jù)結(jié)構(gòu),包含操作的數(shù)據(jù)對(duì)象(table)、操作方式(add、delete、modify)、操作內(nèi)容(變更的內(nèi)容)等信息。通過ThreadLocalDataHolder管理器可以方便地在同一個(gè)線程中任何時(shí)間地點(diǎn)存入和取出變更數(shù)據(jù)內(nèi)容,進(jìn)一步降低程序內(nèi)部邏輯的耦合。
public class ThreadLocalData {//線程局部量
private Listlt;DataContentOperatergt; dataOperaterLst = new ArrayList();//數(shù)據(jù)操作集合
private String operaterDescription;// 數(shù)據(jù)操作說明
public class DataContentOperater {
private int operator = -1;// 操作標(biāo)識(shí),1:新增 2更新 3刪除
private Object object;// 操作實(shí)體
private String themeName;// 操作表對(duì)象
}
}
public class ThreadLocalDataHolder {//ThreadLocal管理器
private static final ThreadLocallt;ThreadLocalDatagt; THREAD_threadLocalData = new ThreadLocal() {
protected ThreadLocalData initialValue() {
return new ThreadLocalData(); }
};
public static ThreadLocalData getThreadLocalData() {
return (ThreadLocalData)THREAD_threadLocalData.get();
}
}
2.1.2 Controller層切面
Controller層切面攔截Web請(qǐng)求,初始化ThreadLocal變量,在請(qǐng)求返回時(shí),提取ThreadLocal變量中保存的數(shù)據(jù)變更記錄,然后將變更記錄數(shù)據(jù)發(fā)送出去。在該模塊能夠計(jì)算出整體耗時(shí)。
@Aspect
public class ControllerAop {//Controller層切面
@Autowired
SendService mSendService;
@Pointcut(\"execution(* controller.*.*(..)) \")
public void Pointcut() {}//Cortoller層切面定義,攔截切面數(shù)據(jù)
@Around(\"Pointcut()\")
public ReturnBean Around(ProceedingJoinPoint point) {
ReturnBean ret = 1;
long startTime = System.currentTimeMillis();
Object returnObject = point.proceed(); //調(diào)用業(yè)務(wù)邏輯
ret = (ReturnBean)returnObject;
if (ret.getCode() == 200) {//處理失敗,無須處理數(shù)據(jù)
return ret;
} else {
this.mSendService.sendMessgae(); //同步共享數(shù)據(jù)
long AllTime = System.currentTimeMillis() -startTime;//計(jì)算整體耗時(shí)
}
return ret;
}
}
2.1.3 Dao層切面
Dao層切面攔截?cái)?shù)據(jù)庫操作,通過反射技術(shù)采集增加、修改、刪除等操作的數(shù)據(jù)庫對(duì)象[12],保存在ThreadLocal中。記錄單次數(shù)據(jù)庫操作的時(shí)間以及單次采集時(shí)間。
@Aspect
public class DaoAop {//Dao層切面
@Pointcut(\"execution(* *.dao.*.*(..))\")
public void Pointcut() {} //攔截所有save、update、delete方法
@After(\"Pointcut()\")
public void Around(JoinPoint point) {
long startTime = System.currentTimeMillis();//單次業(yè)務(wù)操作數(shù)據(jù)庫耗時(shí)
Object returnObject = point.proceed();
long endTime = System.currentTimeMillis();
long businessTime = System.currentTimeMillis() -startTime;
MethodSignature signature = (MethodSignature)point.getSignature();
Method method = signature.getMethod();
Object[] args = point.getArgs();
if (!BlankUtil.isBlank(args)) {
DataContentOperater datacontentoperater = new DataContentOperater();
String methodName = method.getName();
Classlt;?gt; clazz = point.getTarget().getClass();
datacontentoperater.setThemeName(clazz.getSimpleName());
if (methodName.contains(\"save\")) {//提取add數(shù)據(jù)
datacontentoperater.setOperator(1);
datacontentoperater.setObject(args[0]);
} else if (methodName.contains(\"update\")) {//提取modify數(shù)據(jù)
datacontentoperater.setOperator(2);
datacontentoperater.setObject(args[0]);
} else if (methodName.contains(\"delete\")) {//提取delete數(shù)據(jù)
datacontentoperater.setOperator(3);
datacontentoperater.setObject(args[0]);
}
if (datacontentoperater.getOperator() != -1) {
ThreadLocalDataHolder.getThreadLocalData().getDataOperaterLst()
.add(datacontentoperater);
}
}
long extractTime = System.currentTimeMillis()-endTime; //采集耗時(shí)
}
}
2.1.4 數(shù)據(jù)傳輸管理
數(shù)據(jù)傳輸管理是對(duì)數(shù)據(jù)進(jìn)行打包發(fā)送,該方案采用多線程技術(shù)進(jìn)行優(yōu)化,將打包和發(fā)送過程放在單獨(dú)的發(fā)送線程中,和請(qǐng)求響應(yīng)線程隔離開,以進(jìn)一步降低對(duì)程序的侵入影響,縮短程序返回時(shí)間,提高程序響應(yīng)速度。
public class SendDataService {//數(shù)據(jù)傳輸管理
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS);
@Autowire
DataSenderProxy proxy;
public void sendMessgae() {
ThreadLocalData threadLocalData = Thread LocalDataHolder
.getThreadLocalData(); ""http://提取線程局部數(shù)據(jù)
long startTime = System.currentTimeMillis();
Runnable task = new SendDataService.sendDataTask(data, proxy);
this.executor.execute(task);
long sendTime = System.currentTimeMillis() -startTime;//發(fā)送時(shí)間
}
private class SendDataTask implements Runnable {
private ThreadLocalData data;//數(shù)據(jù)內(nèi)容
private DataSenderProxy proxy; //數(shù)據(jù)中心代理
public SendDataTask(ThreadLocalData data, DataSenderProxy proxy) {
this.data = data; this.proxy = proxy;
}
public void run() {
Codec codec = ProtobufProxy.create(ThreadLocalData.class);
byte[] dateByte =codec.encode(data);
proxy.send(dateByte); //數(shù)據(jù)發(fā)送--
}
}
}
2.2 實(shí)驗(yàn)過程
以Add場(chǎng)景為例,通過swagger頁面[13],模擬業(yè)務(wù)請(qǐng)求,并在Service層循環(huán)調(diào)用Add操作,調(diào)用次數(shù)分別為100次、200次、300次,400次、500次,采集整體運(yùn)行時(shí)間、業(yè)務(wù)操作時(shí)間(對(duì)單次Add操作數(shù)據(jù)庫時(shí)間進(jìn)行累加)、數(shù)據(jù)采集時(shí)間(對(duì)單次Add操作數(shù)據(jù)采集時(shí)間進(jìn)行累加)、數(shù)據(jù)發(fā)送時(shí)間,計(jì)算數(shù)據(jù)采集時(shí)間、數(shù)據(jù)發(fā)送時(shí)間、業(yè)務(wù)處理時(shí)間相對(duì)于整體運(yùn)行時(shí)間占比,詳細(xì)的實(shí)驗(yàn)過程如圖4所示,所得結(jié)果如表1所示。
2.3 實(shí)驗(yàn)結(jié)果分析
執(zhí)行Add操作100次、200次、300次,400次、500次,業(yè)務(wù)操作時(shí)間/整體運(yùn)行時(shí)間從98.98%上漲至99.90%,采集和發(fā)送時(shí)間相對(duì)整體運(yùn)行時(shí)間的占比隨著操作次數(shù)增多,整體呈下降趨勢(shì),從不足1%到不足0.1%,說明該方案對(duì)原有程序的影響較低,業(yè)務(wù)邏輯越復(fù)雜,影響越低。
業(yè)務(wù)處理過程依賴于數(shù)據(jù)庫網(wǎng)絡(luò)IO,該過程較為耗時(shí);數(shù)據(jù)的采集過程完全在內(nèi)存中進(jìn)行,采集的數(shù)據(jù)是原始數(shù)據(jù)的引用,因此數(shù)據(jù)采集時(shí)間占比較低;數(shù)據(jù)發(fā)送過程,對(duì)傳輸發(fā)送過程進(jìn)行多線程優(yōu)化,發(fā)送過程中數(shù)據(jù)打包以及網(wǎng)絡(luò)傳輸?shù)群臅r(shí)操作均在多線程中進(jìn)行,不占用原有請(qǐng)求響應(yīng)線程時(shí)間,因此發(fā)送時(shí)間占比也較低。綜上,該方案利用雙切面和ThreadLocal實(shí)現(xiàn)了國(guó)產(chǎn)數(shù)據(jù)庫數(shù)據(jù)同步共享,具備較高的實(shí)時(shí)性和較低的侵入性。
3 結(jié)語
在國(guó)產(chǎn)化背景下,面對(duì)部分未開放數(shù)據(jù)庫日志格式國(guó)產(chǎn)數(shù)據(jù)庫系統(tǒng),難以高實(shí)時(shí)共享同步數(shù)據(jù)庫數(shù)據(jù)的問題,本文設(shè)計(jì)了一種高實(shí)時(shí)國(guó)產(chǎn)數(shù)據(jù)庫同步共享方案,在Dao層切面采集Web請(qǐng)求響應(yīng)過程中對(duì)數(shù)據(jù)庫的所有變更記錄并放入ThreadLocal;在Controller層切面攔截響應(yīng),從ThreadLocal提取出所有變更記錄,最后通過傳輸管理模塊將變更記錄同步共享給其他系統(tǒng),實(shí)現(xiàn)數(shù)據(jù)的高實(shí)時(shí)性增量共享。通過實(shí)驗(yàn)對(duì)比發(fā)現(xiàn)采集時(shí)間、發(fā)送時(shí)間相對(duì)于業(yè)務(wù)操作時(shí)間占比極低,這是因?yàn)閿?shù)據(jù)采集、發(fā)送等操作均在本地內(nèi)存中進(jìn)行,而業(yè)務(wù)操作依賴于數(shù)據(jù)庫網(wǎng)絡(luò)IO,所以該方案能夠?qū)崿F(xiàn)國(guó)產(chǎn)數(shù)據(jù)庫數(shù)據(jù)同步共享,同時(shí)具備較高的實(shí)時(shí)性和較低的侵入性。
參考文獻(xiàn)
[1]彭雅芳.FC上舞動(dòng)的NVMe:存儲(chǔ)部署的創(chuàng)新之舉[J].計(jì)算機(jī)與網(wǎng)絡(luò),2015(24):62.
[2]劉鵬,李凡平,王堃.一種基于canal的數(shù)據(jù)庫雙向同步方法,介質(zhì)及設(shè)備:CN202211313441.8[P].2024-06-09.
[3]陶慧玲,馬依琳,王曄,等.基于微服務(wù)的研究生信息系統(tǒng)數(shù)據(jù)同步方案研究與設(shè)計(jì)[J].華東師范大學(xué)學(xué)報(bào)(自然科學(xué)版),2024(2):42-52.
[4]王天舉,許丹亞,尹文志,等.基于Kettle的鐵路數(shù)據(jù)接入的設(shè)計(jì)與實(shí)現(xiàn)[J].無線互聯(lián)科技,2023(8):79-82.
[5]劉勝.基于增量ETL的分布式數(shù)據(jù)交換平臺(tái)的研究與實(shí)現(xiàn)[D].長(zhǎng)沙:國(guó)防科學(xué)技術(shù)大學(xué),2024.
[6]滕歡.基于Spring3的數(shù)據(jù)讀寫分離技術(shù)研究[D].哈爾濱:哈爾濱工程大學(xué),2016.
[7]龍濤,戴牡紅.Improving the Quality of Data in the Data Warehouse:2009國(guó)際信息技術(shù)與應(yīng)用論壇論文集(下)[C].重慶:計(jì)算機(jī)科學(xué),2009.
[8]PATEL S,KATIYAR S K,SHARMA N.Metric Analysis for AOP and OOP programming paradigm[J].Journal of The Institution of Engineers,2023(1):215-220.
[9]唐瑤.基于AOP攔截技術(shù)的精準(zhǔn)信息推送服務(wù)研究[D].哈爾濱:哈爾濱工程大學(xué),2016.
[10]魏可源.深入分析ThreadLocal內(nèi)存泄漏問題[J].計(jì)算機(jī)與網(wǎng)絡(luò),2018(21):56.
[11]白曉濤,陶智勇.高效Java后臺(tái)程序緩存用戶信息的研究[J].網(wǎng)絡(luò)新媒體技術(shù),2020(5):35-38.
[12]周志明.深入理解Java虛擬機(jī)[M].北京:機(jī)械工業(yè)出版社,2011.
[13]王志軍,姚文達(dá).Web API后端接口管理與應(yīng)用[J].智能計(jì)算機(jī)與應(yīng)用,2024(5):247-251.
(編輯 "沈 強(qiáng))
High real-time data synchronization sharing scheme of domestic database
CUI" Yu, HE" Wei, LI" Gaoshang, YAO" Wanhua, YAO" Ke
(The 28th Research Institute of China Electronics Technology Group Corporation, Nanjing 210028, China)
Abstract: Faced with the problem of high real-time sharing and synchronization of database data in log format of some domestic non-public databases, a data synchronization sharing scheme based on ThreadLocal and dual aspects was designed. Collect all change records to the database in the process of Web request response in the Dao layer aspect, and temporarily store them in ThreadLocal; Intercept the response at the Controller layer, extract all change records from ThreadLocal, and finally share the change records to other systems through the transmission management module to achieve incremental sharing of database data. Through experimental comparison, it is found that the acquisition time and transmission time of the scheme account for a very low proportion compared with the business operation time. This is because the data acquisition and transmission operations are carried out in the local memory, and the business operation depends on the database network IO. Therefore, the scheme can realize the synchronous sharing of data in the domestic database, with high real-time and low intrusion.
Key words: data sharing; aspect; ThreadLocal; domestic database; high real-time