劉 瀟
(江蘇省疾病預防控制中心 公共衛(wèi)生信息所, 南京 210009)
隨著疾控信息化工作的不斷深入, 疾控的傳染病、公共衛(wèi)生突發(fā)事件、慢病、計劃免疫以及精神衛(wèi)生等業(yè)務條線的信息系統(tǒng)在不斷地建立與完善, 疾控信息化標準體系[1,2]的建立與完善有力地推動了全民健康信息化中公共衛(wèi)生的數(shù)據(jù)整合. 在當前各行業(yè)協(xié)作日益緊密、各級疾控一體化集成日漸成熟的大背景下, 疾控中心各類的數(shù)據(jù)共享與交換[3,4]需求也隨之而來. 根據(jù)不同的業(yè)務需求, 各個信息系統(tǒng)需要調(diào)用不同來源的接口來完成數(shù)據(jù)的下載、上傳或核驗等操作.在數(shù)據(jù)量比較小、任務實時性要求比較低的情況下,全量數(shù)據(jù)逐條調(diào)用數(shù)據(jù)接口并記錄接口反饋信息的模式可以滿足業(yè)務需求, 但是當數(shù)據(jù)量比較大并且任務實時性要求比較高的情況下, 比如: 疫情期間, 全省億級數(shù)量的常住人口庫的全量數(shù)據(jù)需要周期性調(diào)用通信管理接口或核酸檢測查詢接口以獲得個人行程記錄與核酸檢測的相關信息, 或是在特定的時間內(nèi), 某個月增百萬級隨訪數(shù)據(jù)的業(yè)務系統(tǒng)的大量的隨訪信息需要全部上傳至指定的平臺, 逐條調(diào)用數(shù)據(jù)接口的模式效率太低, 無法在規(guī)定的時間內(nèi)完成任務, 如何利用有限的硬件資源高效地完成數(shù)據(jù)交換任務成為了疾控在信息化建設中面臨的一個問題.
在有限的硬件資源下, 解決這個問題的思路是讓數(shù)據(jù)交換任務并發(fā)執(zhí)行, 直接在服務器上為每一個數(shù)據(jù)交換任務分配一個線程并同時啟動大量線程去完成數(shù)據(jù)交換的方法會導致服務器壓力過大, 線程的運行缺乏有效的控制, 線程的創(chuàng)建與銷毀都會造成系統(tǒng)開銷, 操作系統(tǒng)對大量線程的頻繁的切換與調(diào)度會給CPU 帶來沉重的負擔, 容易造成服務卡頓或服務器宕機. 本文基于線程池與消息中間件技術建立一個數(shù)據(jù)交換的并發(fā)處理模型, 使用Java 線程池去控制數(shù)據(jù)交換任務的并發(fā)處理, 并引用消息中間件Kafka 作為中間件來記錄數(shù)據(jù)交換結(jié)果, 進一步提高任務完成的效率, 通過實驗的對比證明該模型的可行性與高效性.
線程池技術是一種設計程序并發(fā)運行的技術, 其核心思想是對已有線程的復用來避免大量線程創(chuàng)建與銷毀帶來的系統(tǒng)開銷, 在CPU 上創(chuàng)建和結(jié)束線程造成的開銷是創(chuàng)建或銷毀任務的18 至100 倍[5], 而且通過任務進行同步的開銷也遠低于同步多個線程的開銷, 因此線程池技術能夠更好地支持細粒度的任務并發(fā)[6]. 常見的線程池一般主要包括4 個部分: 線程管理器、工作線程、任務接口和輸入輸出任務隊列, 在啟動時線程池創(chuàng)建若干數(shù)量的空閑線程, 當任務到達時利用已經(jīng)創(chuàng)建的線程執(zhí)行任務, 任務處理完成后, 該線程會被線程池回收用來執(zhí)行下一個任務以達到線程復用的效果, 同時線程池還要對任務隊列的大小、空閑線程的銷毀、新線程的創(chuàng)建以及對任務的拒絕策略等進行管理.
Java 從JDK 1.5 版本開始在java.util.concurrent 包中提供了對線程池功能的支持[7], 相關類的繼承關系如圖1 所示, 其中ThreadPoolExecutor 是最核心的一個類, Java 通過封裝ThreadPoolExecutor 類提供了SingleThreadExecutor、CachedThreadPool、Fixed ThreadPool 以及ScheduledThreadPool 這4 類適合特定場景的線程池供編程人員調(diào)用, 同時Java 也支持編程人員重寫ThreadPoolExecutor 的構(gòu)造方法, 通過設置構(gòu)造參數(shù)自定義線程池.
圖1 Java 線程池UML 靜態(tài)類圖
ThreadPoolExecutor 類構(gòu)造方法的主要的構(gòu)造參數(shù)如下:
corePoolSize: 核心線程數(shù), 即常駐線程池的工作線程數(shù)量.
maximumPoolSize: 最大線程數(shù), 即某一時刻, 當任務大于線程池當前存在的工作線程數(shù)時, 線程池中的工作線程可以增加到的最大值.
keepAliveTime: 當線程數(shù)大于核心線程數(shù)時, 空閑的工作線程等待新任務的最長時間, 超過這個時間空閑線程沒有接到任務就會被銷毀, 線程池只保留核心線程數(shù)的工作線程數(shù)量.
workQueue: 任務隊列, 即線程池中的工作線程的數(shù)量已經(jīng)達到最大線程數(shù)時, 任務的等待隊列.
threadFactory: 線程工廠, 可以用來自定義線程池中線程的命名方式, 優(yōu)先級等屬性.
Handler: 拒絕策略, 即線程池中的工作線程的數(shù)量已經(jīng)達到最大線程數(shù)且任務隊列已滿的情況下, 線程池對超出線程池處理能力的任務所做的處理策略.
消息中間件是可以在不同系統(tǒng)之間進行消息傳遞的一類組件, 它利用高效、可靠的消息傳遞機制進行平臺無關的數(shù)據(jù)交流[8], 消息生產(chǎn)者定向發(fā)送數(shù)據(jù), 消息消費者獲取并消費數(shù)據(jù), 基于數(shù)據(jù)通信進行分布式系統(tǒng)的集成. 消息中間件的消息傳遞主要有兩種模式,分別是點對點模式和發(fā)布-訂閱模式. 目前比較主流的分布式消息中間件有Kafka, RabbitMQ, ActiveMQ 等.
Kafka 是一個分布式的消息發(fā)布-訂閱模式[9]的中間件系統(tǒng). Kafka 在主題中保存消息的信息, 生產(chǎn)者向主題寫入數(shù)據(jù), 消費者從主題讀取數(shù)據(jù), 從而實現(xiàn)數(shù)據(jù)傳輸.
高性能、高吞吐、低延時是Kafka 的顯著的特性,雖然Kafka 的消息保存在磁盤上, 但是由于采用了順序?qū)懭?、MMFiles (memory mapped files)、Zero Copy、批量壓縮等技術優(yōu)化了讀寫性能[10], 使其可以突破傳統(tǒng)的數(shù)據(jù)庫、消息隊列等數(shù)據(jù)引擎所受限的磁盤IO瓶頸, 即使是部署在普通的單機服務器上, Kafka 也能輕松支持每秒百萬級的寫入請求[11], 讀寫速度超過大部分的消息中間件, 這種特性使得Kafka 在海量數(shù)據(jù)場景中應用廣泛.
疾控信息化工作中處理數(shù)據(jù)交換的基本流程是:從數(shù)據(jù)庫中分批取出需要調(diào)用數(shù)據(jù)接口的數(shù)據(jù), 為批次中的每一條數(shù)據(jù)創(chuàng)建一個數(shù)據(jù)交換任務, 任務主要包括調(diào)用接口獲得反饋信息、將反饋信息回寫數(shù)據(jù)庫進行持久化兩個步驟.
由于各數(shù)據(jù)交換任務相互之間的無關性, 可以在調(diào)用的數(shù)據(jù)接口可承載的并發(fā)調(diào)用范圍內(nèi), 使數(shù)據(jù)交換任務并發(fā)進行以提高效率, 并在數(shù)據(jù)交換任務的反饋信息持久化階段將反饋信息寫入吞吐量更高的消息中間件進行存儲, 進一步縮短數(shù)據(jù)交換任務的運行時間以提高效率.
在圖2 中, 通過一個數(shù)據(jù)交換調(diào)度控制程序建立并初始化數(shù)據(jù)交換任務的線程池, 在進行數(shù)據(jù)交換任務時, 為從數(shù)據(jù)庫取出的批量數(shù)據(jù)構(gòu)造數(shù)據(jù)交換任務,并將任務交給線程池進行并發(fā)處理的調(diào)度, 數(shù)據(jù)接口的反饋信息寫入中間件進行保存, 不同的數(shù)據(jù)消費者進程可以異步消費消息中間以獲取反饋信息, 按照不同的業(yè)務需求進行日志信息持久化到數(shù)據(jù)庫或者實時進行交換日志的統(tǒng)計與分析等操作.
圖2 數(shù)據(jù)交換并發(fā)處理模型
數(shù)據(jù)交換調(diào)度控制程序用Java 設計, 使用Java 線程池與Kafka 對模型進行實現(xiàn), 模型實現(xiàn)主要包含數(shù)據(jù)交換任務構(gòu)造、Kafka 調(diào)用以及數(shù)據(jù)交換線程池3 個部分.
2.2.1 數(shù)據(jù)交換任務構(gòu)造
封裝數(shù)據(jù)交換任務的類需要實現(xiàn)Runnable 接口以保證其可以在實例化后被線程池工作線程所調(diào)用,在該類的構(gòu)造器中傳遞具體的Kafka 連接以及數(shù)據(jù)接口調(diào)用所需要的參數(shù), 并實現(xiàn)Runnable 接口的run 方法完成具體數(shù)據(jù)接口調(diào)用與反饋信息的記錄, 其核心代碼如下:
?public class DSTask implements Runnable{ //數(shù)據(jù)交換任務封裝類public DSTask(KafkaProducer
2.2.2 Kafka 調(diào)用
在數(shù)據(jù)交換任務封裝類的sendData 方法中調(diào)用Kafka api 提供的send 方法記錄反饋信息, String 類型topicName 為Kafka 的相關主題名, String 類型context 為數(shù)據(jù)交換任務最終按約定格式拼接好的反饋信息, 其核心代碼如下:
2.2.3 數(shù)據(jù)交換線程池
通過參數(shù)設置自定義ThreadPoolExecutor 類實例化線程池來控制數(shù)據(jù)交換任務并發(fā)處理. 由于數(shù)據(jù)交換任務需要連續(xù)穩(wěn)定的處理, 線程池的核心線程數(shù)和最大線程數(shù)設為相同值, 即線程池中的常駐的工作線程數(shù), 這個值的大小在運行前需要由用戶綜合考慮所調(diào)用數(shù)據(jù)接口能承載的并發(fā)訪問量, 以及當前任務所運行的服務器的CPU 核數(shù)來設定, 在數(shù)據(jù)接口并發(fā)訪問的承載范圍內(nèi), 在實際工程應用中一般遵循如式(1)所示[12]:
線程池的任務隊列的大小設置為每批要調(diào)用數(shù)據(jù)接口的數(shù)據(jù)的數(shù)量, 以保證所有的數(shù)據(jù)交換任務都會被任務隊列容納, 等待線程池的有效調(diào)度, 這樣可以直接使用線程池默認的拒絕策略, 不需要再設計拒絕策略去處理線程池無法處理的數(shù)據(jù)交換任務.
線程池核心代碼如下:
?ThreadPoolExecutor executor = new ThreadPoolExecutor(threadNum,threadNum, 10, TimeUnit.SECONDS,new LinkedBlockingQueue
為測試該模型處理數(shù)據(jù)交換任務的效率, 在疾控內(nèi)部局域網(wǎng)部署應用進行測試, 應用部署的服務器配置: 4 核CPU, 內(nèi)存8 GB, 操作系統(tǒng): 64 位Linux CentOS 7.7, JDK 版本: Openjdk 1.8, 測試從疾控內(nèi)網(wǎng)某業(yè)務庫(業(yè)務庫版本: MySQL 8.0.18)批量取出5 000 條個人信息數(shù)據(jù)調(diào)用在公網(wǎng)發(fā)布的疫苗接種記錄查詢接口獲取個人某疫苗首次接種記錄的相關信息, 在逐條處理以及使用線程池模型進行處理、接口反饋的結(jié)果回寫數(shù)據(jù)庫或?qū)懭隟afka 等一些不同的情況下, 分別進行如下仿真實驗:
實驗1. 數(shù)據(jù)接口反饋信息回寫數(shù)據(jù)庫, 單線程逐條處理以及使用線程池在工作線程數(shù)取不同值的情況下的運行時間對比, 運行時間皆為5 次實驗的平均值,數(shù)據(jù)如表1 所示.
表1 不同工作線程數(shù)運行時間對比
很顯然, 線程池處理完成數(shù)據(jù)交換任務的效率明顯優(yōu)于單線程逐條處理, 且在實際接口的實際條件以及4 核CPU 的硬件資源條件下, 在工作線程數(shù)設為4 時的運行效率已達到最佳.
實驗2. 在線程池在工作線程數(shù)取最佳值的情況下, 數(shù)據(jù)接口反饋信息回寫數(shù)據(jù)庫與寫入Kafka (版本:Kafka 2.5.0)的運行時間對比, 運行時間皆為5 次實驗的平均值, 數(shù)據(jù)如表2 所示.
表2 反饋信息回寫數(shù)據(jù)庫與寫入Kafka 運行時間對比
對比兩者的運行時間可以看出, 將數(shù)據(jù)接口反饋信息寫入Kafka 可以極大地提高了數(shù)據(jù)交換任務完成的效率.
在疾控的數(shù)據(jù)交換工作中對模型進行實際應用時,工程師根據(jù)需要進行數(shù)據(jù)交換任務的數(shù)據(jù)總量, 綜合考慮部署數(shù)據(jù)交換應用程序的服務器內(nèi)存情況, 對數(shù)據(jù)進行批次的劃分, 確定每一批完成數(shù)據(jù)交換任務的數(shù)量與線程池任務隊列的容量, 并根據(jù)服務器CPU 的核數(shù)與需要調(diào)用的數(shù)據(jù)接口的實測情況確定線程池工作線程的數(shù)量, 設計數(shù)據(jù)交換調(diào)度控制程序. 如圖3 所示, 數(shù)據(jù)交換調(diào)度控制程序在初始化各類連接并建立線程池后, 按照預設的批次, 分批對數(shù)據(jù)進行數(shù)據(jù)交換任務的處理, 為了判斷線程池是否已完成當前批次的所有數(shù)據(jù)交換任務, 可以設置一個線程安全的全局變量, 每次數(shù)據(jù)交換任務完成時對這個變量進行累加操作, 數(shù)據(jù)交換調(diào)度控制程序通過讀取這個變量值來獲取線程池的當前狀態(tài), 如果當前批次的任務尚未全部完成, 調(diào)度控制程序執(zhí)行自旋等待操作, 等待當前批次的任務全部完成, 線程池處于空閑狀態(tài)后, 獲取下一批次的數(shù)據(jù)繼續(xù)進行, 直至所有批次的數(shù)據(jù)全部完成.
圖3 數(shù)據(jù)交換調(diào)度控制程序流程設計
圖4 展示的是實際工作中某重點人群庫數(shù)據(jù)使用該模型調(diào)用新冠疫苗接種查詢接口獲取個人新冠疫苗第一針接種結(jié)果在Kafka 相關主題中的存儲情況, 該項數(shù)據(jù)交換任務按約定的格式記錄了個人信息在業(yè)務庫的主鍵號, 調(diào)用接口的匹配標識, 以及調(diào)用接口所獲取的接種新冠疫苗第一針的疫苗廠商、接種時間、接種單位等信息, 各數(shù)據(jù)項之間插入制表符以便在信息消費時進行解析.
圖4 Kafka 記錄的反饋信息展示
針對疾控中心在處理大規(guī)模數(shù)據(jù)交換時傳統(tǒng)的處理模式效率不高, 難以及時完成任務的問題, 本文根據(jù)數(shù)據(jù)交換任務的特點設計了一個數(shù)據(jù)交換任務的并發(fā)處理模型, 并使用Java 線程池與消息中間件Kafka 給出了模型的具體實現(xiàn). 該模型已成功應用在江蘇省疾控中心的數(shù)據(jù)交換的處理中, 實踐表明, 模型具有良好的數(shù)據(jù)交換任務并發(fā)控制與處理能力, 進行數(shù)據(jù)交換的數(shù)據(jù)量越大, 其優(yōu)勢越明顯. 在不大幅度增加硬件成本的前提下, 該模型適用面廣, 可用于各類型的數(shù)據(jù)換的處理與控制, 在保證服務穩(wěn)定性的同時可以有效地提高數(shù)據(jù)交換的處理能力.