陳瑤 李洋磊
摘 ?要:本文分析了ActiveMQ數(shù)據(jù)傳輸?shù)牡讓釉?,以解決數(shù)據(jù)突發(fā)洪峰時期的隊列數(shù)據(jù)積壓問題。利用增加并發(fā)消費者、調(diào)整消息預取值、批量消息確認等參數(shù),實現(xiàn)了傳輸性能的多倍提升。最后還根據(jù)業(yè)務(wù)運行出現(xiàn)過的問題,優(yōu)化了服務(wù)端的配置,加強了薄弱環(huán)節(jié)的監(jiān)控,提升了系統(tǒng)的穩(wěn)定性。
關(guān)鍵詞:ActiveMQ;民航數(shù)據(jù)傳輸;數(shù)據(jù)傳輸框架
中圖分類號:TP368.5 ? ? ?文獻標識碼:A 文章編號:2096-4706(2019)16-0128-04
Abstract:This paper has analyzed the underlying principle of ActiveMQ data transmission,to solve the data backlog problem during the peak period of data. By adding concurrent consumers,adjusting message prefetch values and batch message validation parameters,the transmission performance is improved many times. Finally,according to the problems in the operation of the business,the configuration of the server is optimized,the monitoring of weak links is strengthened,and the stability of the system is improved.
Keywords:ActiveMQ;civil aviation data transfer;data transfer framework
0 ?引 ?言
為保證數(shù)據(jù)及時、可靠傳輸,選用了中間件ActiveMQ,設(shè)計了一套通用的數(shù)據(jù)傳輸框架。該框架現(xiàn)階段主要應(yīng)用于民航氣象數(shù)據(jù)的傳輸,為南方航空、華南國際商務(wù)航空等用戶提供高效、可靠的數(shù)據(jù)傳輸服務(wù)。該系統(tǒng)投入使用后,數(shù)據(jù)傳輸?shù)募皶r性增強、可靠性提高,得到了用戶單位的認可。
在幾年的運行過程中,也發(fā)現(xiàn)了許多可以改進的地方。該框架現(xiàn)階段傳輸?shù)拿窈綒庀髷?shù)據(jù),具有在整點及半點數(shù)據(jù)量較大的特點,框架承載的數(shù)據(jù)量越來越多,用戶數(shù)也在增加。在數(shù)據(jù)量突發(fā)洪峰的情況下,如果某個用戶的數(shù)據(jù)獲取能力較差,ActiveMQ的數(shù)據(jù)消費能力被阻塞,容易產(chǎn)生大量的數(shù)據(jù)等待確認、大量數(shù)據(jù)重發(fā)的情況,嚴重影響數(shù)據(jù)的吞吐量,導致數(shù)據(jù)積壓,也給ActiveMQ服務(wù)器的運行帶來潛在的風險。
為解決該問題,對系統(tǒng)運行中的數(shù)據(jù)進行分析,提出適用于不同場景的消息隊列優(yōu)化過程。
1 ?ActiveMQ消息的傳送機制
如圖1所示,ActiveMQ的消息由生產(chǎn)者(即數(shù)據(jù)發(fā)送 端)發(fā)出后,會被ActiveMQ的Broker保存,消費者(即數(shù)據(jù)接收端)已經(jīng)在Broker上注冊,Broker會確保消息被發(fā)送給這些消費者,確保消息已經(jīng)送達后,該消息才會被刪除。
ActiveMQ的消息傳送機制如圖1所示,通過消費者正常接收到消息后,返回一個確認接收狀態(tài)的消息——ACK消息給Broker,如果層次較為復雜,則會一層一層的返回ACK消息。
ActiveMQ中的ACK消息有以下幾種類型,定義在字段ACK_TYPE中,如表1所示。
從ACK_TYPE的值可以看出,在ActiveMQ中,消息確認的頻率是可以由開發(fā)者選擇的??梢韵M一條消息返回一條確認消息,也可以選擇另外一種模式——延時確認。在消費者成功消費消息后,不立即返回ACK,而是等到這些ACK消息的條數(shù)積攢到某個閾值時,返回一個ACK消息把他們?nèi)看_認。
從這個定義也可看出,延時確認具有更好的性能。特別是在網(wǎng)絡(luò)擁堵的時期,N條消息只會有1條ACK消息,相比N條消息N條ACK返回,大大減輕了網(wǎng)絡(luò)負荷。但這樣的確認機制也存在一定的弊端,如果消費端出現(xiàn)異常,無法正常返回ACK,會導致N條消息重發(fā),反而會造成網(wǎng)絡(luò)負擔。
并且大量消息如果不得到及時的確認,Broker需保存這些消息,并將他們放置于隊列中排隊等待確認,這將消耗Broker服務(wù)器的內(nèi)存、硬盤等資源,如果該服務(wù)器的性能低下,將給Broker的運行帶來潛在的風險。
所以需要分析運行的實際情況,根據(jù)已有的資源進行靈活的配置、調(diào)優(yōu)。
2 ?問題定位及分析
利用現(xiàn)有的框架和數(shù)據(jù)傳輸模式,模擬測試數(shù)據(jù)突發(fā)洪峰時的數(shù)據(jù)吞吐量,從分析消息包處理耗時入手,進行各個參數(shù)的調(diào)優(yōu)。首先在生產(chǎn)端生成大量的消息,以在測試數(shù)據(jù)突發(fā)洪峰時期,每個消費者的消息處理速度、消息積壓數(shù)[2]。如表2所示。
從表2的數(shù)據(jù)可以看出,消費端的消費能力存在的差距,消費能力差的客戶端在突發(fā)數(shù)據(jù)洪峰時容易發(fā)生數(shù)據(jù)積壓。兩個消費者的網(wǎng)絡(luò)狀態(tài)類似,可以排除因網(wǎng)絡(luò)原因?qū)е碌南⒎e壓。通過進一步分析消費能力弱的消費端,研究其消息處理流程,發(fā)現(xiàn)其接收到消息后還需進行串行處理,處理過程更加復雜,導致消息處理更加耗時,消息返回ACK的時間也更長,導致了Broker需等待這個更慢的消費者。
針對消費端處理速度存在瓶頸的問題,設(shè)想通過提高消費端的處理速度入手。消費端的消息處理流程為業(yè)務(wù)需要,無法精簡處理流程來加快消費速度。那還有其他什么手段可以增加消費端對消息的消費速度?既然現(xiàn)存的串行等待的時間無法縮短,那是否可以通過并行多個消費者程序來提高效率?
3 ?增加并發(fā)消費者方案測試
擬通過增加并發(fā)消費者的方式,看是否能提高消息處理的速度。要使用并發(fā)消費者[3],可修改框架中Spring的JMS配置,增加多個Listener實例。配置項為Simple Message Listener Container[4],可以配置固定的實例個數(shù),也可以配置一個實例數(shù)的區(qū)間,這樣消費者可以根據(jù)消息的壓力情況動態(tài)調(diào)整并發(fā)數(shù)。
配置文件:
<bean class="org.springframework.jms.listener.SimpleMessageListenerContainer">
<property name="connectionFactory" ref="connec tionFactory"/>
<property name="destinationName" value="${jms.queue.name}"/>
<property name="messageListener" ref="message Receiver"/>
<property name="concurrency" value="10-20"/>
或者:
<property name="concurrentConsumers" value= "20"/>
</bean>
測試結(jié)果如表3所示。
4 ?消費者優(yōu)化
通過測試發(fā)現(xiàn),增加并行消費者后,消息的消費速度出現(xiàn)明顯的提升。但消費者數(shù)目大于10以后,消息處理速度不再提升,在多個消費者中,有些消費者很忙碌,需要處理大量的消息,有些消費者很空閑。為什么會出現(xiàn)這樣的情況?
在目標URI的定義中,有一個prefetchSize[5]參數(shù)值可配置,如下代碼所示:
String queueURI = "queueForGuest?customer.prefetchSize=100";
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination queue = session.createQueue(queue URI);
prefetchSize參數(shù)定義了一次有多少條消息推送給消費者,若在代碼中沒有指定prefetchSize參數(shù)值,系統(tǒng)將給其一個默認值,如表4所示。
從這個默認值中可以看出,使用默認值無法與現(xiàn)實需求吻合。如果消費者處理消息的能力很差,一次推送1000個消息給消費者,無疑會造成消費者端的擁堵。如果消費者端性能好、處理速度快,可配置較高的prefetchSize值[7]。
而本系統(tǒng)中消費端航空公司2的消費者就是一個慢消費者,消費消息的速度慢,如果使用默認prefetchSize值1000,一次將1000個消息推送給該消費端,剩下的推送給該消費端并行的消費者,就出現(xiàn)了上文中的情況,部分消費者要處理的消息很多,消費能力差,消費速度慢,這些消費者特別忙碌,甚至出現(xiàn)擁堵現(xiàn)象。而其他并行的消費者沒有消息需要處理。
所以嘗試將該消費端的prefetchSize值進行調(diào)整,提升消費端整體的性能。
為了測試系統(tǒng)中現(xiàn)有消費者的適合的prefetchSize值大小,將prefetchSize值分別配置為100、1000,并進行了對比分析測試,測試結(jié)果如表5所示。
將queuePrefetch參數(shù)修改為100后,消費端Consumer2并行且忙碌的消費者數(shù)量增加,消費10000條消息減少。
除了批量獲取多個消息可以使性能提高,批量確認多個消息也將使性能大大提升,ACK的模式有很多種,如ActiveMQ消息的傳送機制表中的說明,其中效率最高的機制為optimizeAcknowledge模式,當有prefetchSize的65%個消息被正確消費后,消費端將返回一條ACK消息,并批量確認這些消息。
這樣的模式雖然效率高,但若消費端出現(xiàn)異常,未正常返回這些消息的ACK,Broker將重發(fā)這些消息,這樣的模式適用于高吞吐量、對重復消息有容錯能力的系統(tǒng)。觀察系統(tǒng)運行時這樣的異常情況較少,且在消費端均做了重復消息處理,同時本系統(tǒng)現(xiàn)應(yīng)用于傳輸氣象報文,對實時性要求很高,提高吞吐量對系統(tǒng)的運行意義重大。故這種高效模式適用于本系統(tǒng)。
將原來的逐條消息ACK改為optimizeAcknowledge模式后,消費端、Broker端的資源消耗降低,處理速度提高,測試結(jié)果如表6所示。
5 ?Broker優(yōu)化
從表2的測試結(jié)果可以看出,不同的消費端的消費速度差異較大,系統(tǒng)運行中同時存在快消費者和慢消費者,在現(xiàn)場運行的過程中也多次發(fā)現(xiàn)這樣的問題,某個消費者的消費能力較慢,不能及時消費消息或者返回ACK,導致Broker必須在內(nèi)存中保存這些消息,增加了內(nèi)存的消耗,消息積壓過多時,需要將內(nèi)存的消息寫入到磁盤中,增加了Broker端的磁盤I/O消耗。如果情況進一步嚴重,Broker將阻塞生產(chǎn)者,迫使其降低生產(chǎn)消息的速率甚至不生產(chǎn)消息。
一個慢的消費者不僅給Broker端的運行帶來了巨大的潛在風險,還有可能導致快的消費者也無法正常獲取消息。這是在運行環(huán)境中必須高度重視的一個問題。
保證系統(tǒng)運行的穩(wěn)定至關(guān)重要,但與此同時,即使用戶是慢消費者,保證他們及時獲取到數(shù)據(jù)也很重要,如何滿足這個矛盾的需求,主要從以下三個方面進行了優(yōu)化:
(1)關(guān)閉producerFlowControl,即使有慢消費者,先保證消息生產(chǎn)及快消費者消費的速度,保證消息傳輸不會因為慢消費者而終止。
(2)捕獲Broker資源消耗異常,及時進行干預、優(yōu)化。
在默認情況下,producerFlowControl是開啟的,在這種模式下,如果消費者消費能力差,Broker將降低消息的生產(chǎn),以保證消費端不會由于消息擁堵而資源耗盡,該模式為調(diào)節(jié)Broker來配合慢消費端。
如果選用該模式,消息的生產(chǎn)者也可以進行一些異常的處理,可以進行異常告警,并且生產(chǎn)者可以在等待設(shè)定的時間后進行重試,避免由于失敗而使發(fā)送消息的請求立即被阻塞,生產(chǎn)者變成假死的狀態(tài)。
(3)監(jiān)控Broker資源使用情況,監(jiān)控消費者消費情況,及時發(fā)現(xiàn)慢消費者,對異常及時進行干預、優(yōu)化。監(jiān)控每個消費者消費消息的情況,主要監(jiān)控參數(shù)為消費者是否掉線、阻塞的消息數(shù)、等待確認的消息數(shù)、進隊列消息數(shù)、出隊列消息數(shù)等。
6 ?結(jié) ?論
通過各個參數(shù)的調(diào)優(yōu),傳輸系統(tǒng)數(shù)據(jù)積壓的問題得到了解決,消息傳輸?shù)男阅艿玫搅颂岣?,消息傳輸?shù)乃俣忍嵘闆r如表7所示。
表7 ?優(yōu)化后傳輸時間變化及傳輸效率提升情況
通過方案調(diào)整及參數(shù)優(yōu)化,系統(tǒng)的性能及穩(wěn)定性都得到了較大的提高,達到了預期目標?;谥虚g件ActiveMQ的調(diào)優(yōu)方法還有很多,例如消息傳送優(yōu)先級、虛擬通道、分布式網(wǎng)絡(luò)、roker集群等,在進一步的研究工作中可從這些方面進一步提高性能及系統(tǒng)穩(wěn)定性。
參考文獻:
[1] APACHE software foundation. ActiveMQ [EB/OL].http://activemq.apache.org/index.html,2018-09-10.
[2] 王鵬,從波,李國杰,等.基于ActiveMQ消息總線的性能測試方法 [J].測試技術(shù)學報,2019,33(2):147-152.
[3] 周聰.基于改進的Active MQ的通信模型的設(shè)計和實現(xiàn) [D].長春:吉林大學,2017.
[4] Spring AMQP. Spring [EB/OL].https://docs.spring.io/spring-amqp/api/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.html,2019-01-01.
[5] Bruce Snyder,Dejan Bosanac,Rob Davies. Introduction to Apache ActiveMQ Green Paper from Active MQ in action [M].London:Manning,2017:20-23.
[6] Bruce Snyder,Dejan Bosanac,Rob Davies. Active MQ inaction [M].London:Manning,2005:4-5.
[7] 龐佳麗.分布式系統(tǒng)中基于中間件的異步通信可靠性研究 [D].杭州:浙江工業(yè)大學,2017.
作者簡介:陳瑤(1987.04-),女,漢族,湖南湘潭人,工
程師,碩士,研究方向:數(shù)據(jù)傳輸框架;李洋磊(1983.01-),男,漢族,河南洛陽人,工程師,碩士,研究方向:民航氣象信息系統(tǒng)設(shè)備維護。