文/胡慶亮 王珊珊 高亮
高校智慧校園建設(shè)旨在改變師生與學(xué)校資源、環(huán)境的交互方式,開展以人為本的個(gè)性化服務(wù),進(jìn)而建立智能開放的教育環(huán)境和便利舒適的生活環(huán)境。消息中心服務(wù)可以將各類業(yè)務(wù)過(guò)程中產(chǎn)生的消息進(jìn)行集中管理與收發(fā),師生用戶可以方便、及時(shí)、準(zhǔn)確的獲知個(gè)人所關(guān)注的各類業(yè)務(wù)狀態(tài),實(shí)現(xiàn)了學(xué)校消息的一站式與個(gè)性化推送。因此,消息中心成為高校智慧校園建設(shè)的一項(xiàng)重要內(nèi)容。
消息中心的實(shí)現(xiàn)依賴于高效可靠的消息隊(duì)列中間件(簡(jiǎn)稱消息中間件),它可以通過(guò)消息傳遞和消息排隊(duì)模型,在分布式環(huán)境下提供應(yīng)用解耦、彈性伸縮、冗余存儲(chǔ)、流量削峰、異步通信、數(shù)據(jù)同步等功能。目前,應(yīng)用比較廣泛的消息中間件包括:RabbitMQ、ActiveMQ、Kafka、RocketMQ,其中RabbitMQ 是使用Erlang 語(yǔ)言開發(fā)的開源消息隊(duì)列系統(tǒng),基于AMQP 協(xié)議實(shí)現(xiàn),該協(xié)議面向消息、隊(duì)列和路由(包括點(diǎn)對(duì)點(diǎn)和發(fā)布/訂閱),強(qiáng)調(diào)可靠性與安全性,主要應(yīng)用于對(duì)數(shù)據(jù)一致性、穩(wěn)定性和可靠性要求很高的場(chǎng)景,此外RabbitMQ 還有高可用性、高易用性等優(yōu)點(diǎn)。結(jié)合高校的統(tǒng)一消息服務(wù)特點(diǎn)(對(duì)數(shù)據(jù)一致性、穩(wěn)定性和可靠性要求很高,并發(fā)量、吞吐量要求一般),考慮采用RabbitMQ來(lái)構(gòu)建高校智慧校園消息中心。本文主要對(duì)基于RabbitMQ 構(gòu)建的智慧校園消息中心的設(shè)計(jì)方案與實(shí)現(xiàn)進(jìn)行闡述。
RabbitMQ 起源于金融系統(tǒng),用于在分布式系統(tǒng)中存儲(chǔ)轉(zhuǎn)發(fā)消息,具有易用性、擴(kuò)展性、高可用性等優(yōu)勢(shì),其內(nèi)部結(jié)構(gòu)如圖1 所示。
圖1 RabbitMQ 內(nèi)部結(jié)構(gòu)
1. Message:消息,消息是不具名的,它由消息頭和消息體組成。消息體是不透明的,消息頭則由一系列的可選屬性組成,這些屬性包括routing-key(路由鍵)、priority(相對(duì)于其他消息的優(yōu)先權(quán))、delivery-mode(指出該消息可能需要持久性存儲(chǔ))等。
2.Publisher:消息的生產(chǎn)者,也是一個(gè)向交換器發(fā)布消息的客戶端應(yīng)用程序。
3.Exchange:交換器,用來(lái)接收生產(chǎn)者發(fā)送的消息并將這些消息路由給服務(wù)器中的隊(duì)列。
4.Binding:綁定,是消息隊(duì)列和交換器之間的關(guān)聯(lián)。一個(gè)綁定就是基于路由鍵將交換器和消息隊(duì)列連接起來(lái)的路由規(guī)則,所以可以將交換器理解成一個(gè)由綁定構(gòu)成的路由表。
5.Queue:消息隊(duì)列,用來(lái)保存消息直到發(fā)送給消費(fèi)者。它是消息的容器,也是消息的終點(diǎn)。一個(gè)消息可投入一個(gè)或多個(gè)隊(duì)列,消息一直在隊(duì)列里面,等待消費(fèi)者連接到這個(gè)隊(duì)列將其取走。
6.Connection:網(wǎng)絡(luò)連接,比如一個(gè)TCP 連接。
7.Channel:信道,多路復(fù)用連接中的一條獨(dú)立的雙向數(shù)據(jù)流通道。信道是建立在真實(shí)的TCP 連接內(nèi)地虛擬連接,AMQP命令都是通過(guò)信道發(fā)出去的,不管是發(fā)布消息、訂閱隊(duì)列還是接收消息,這些動(dòng)作都是通過(guò)信道完成。因?yàn)閷?duì)于操作系統(tǒng)來(lái)說(shuō)建立和銷毀TCP 都是非常昂貴的開銷,所以引入了信道的概念,以復(fù)用一條TCP連接。
8.Consumer:消息的消費(fèi)者,表示一個(gè)從消息隊(duì)列中取得消息的客戶端應(yīng)用程序。
9.Virtual Host:虛擬主機(jī),表示一批交換器、消息隊(duì)列和相關(guān)對(duì)象。虛擬主機(jī)是共享相同的身份認(rèn)證和加密環(huán)境的獨(dú)立服務(wù)器域。每個(gè)vhost 本質(zhì)上就是一個(gè)mini 版的 RabbitMQ 服務(wù)器,擁有自己的隊(duì)列、交換器、綁定和權(quán)限機(jī)制。vhost是AMQP 概念的基礎(chǔ),必須在連接時(shí)指定,RabbitMQ 默認(rèn)的vhost 是“/”。
10.Broker:表示消息隊(duì)列服務(wù)器實(shí)體。
基于RabbitMQ 的智慧校園消息中心包括:“消息匯聚層”和“消息下發(fā)層”,消息匯聚層完成業(yè)務(wù)消息的統(tǒng)一匯集與存儲(chǔ),消息下發(fā)層則以方便、有效的途徑(服務(wù)門戶、短信消息、微信消息等)將消息下發(fā)給師生用戶。具體架構(gòu)如圖2 所示。
圖2 基于RabbitMQ 的智慧校園消息中心架構(gòu)
1.消息匯聚層
在消息匯聚層,對(duì)于RabbitMQ 而言,生產(chǎn)者是消息接口API,業(yè)務(wù)系統(tǒng)通過(guò)調(diào)用消息接口API 將消息數(shù)據(jù)放入消息隊(duì)列;消費(fèi)者的職責(zé)則由后臺(tái)輪詢程序完成。在該架構(gòu)下,完整的消息集成流程如下:
(1)業(yè)務(wù)系統(tǒng)調(diào)用消息接口API;
(2)消息接口API 被調(diào)用后,首先將消息數(shù)據(jù)落地到數(shù)據(jù)庫(kù)表,消息記錄的初始推送狀態(tài)設(shè)置為“pushstatus=0”,然后以Confirm 方式將消息發(fā)送給RabbitMQ;
(3) 消 息 接 口API 在 接 收 到RabbitMQ返回的Confirm消息確認(rèn)成功后,更新消息記錄的推送狀態(tài)“pushstatus=1”。
(4)輪詢程序從RabbitMQ 隊(duì)列讀取消息,調(diào)用消息匯聚中心接口將消息寫入消息匯聚中心數(shù)據(jù)庫(kù)表。
其中,步驟三是RabbitMQ 的發(fā)送確認(rèn)過(guò)程。在此過(guò)程中,可能出現(xiàn)網(wǎng)絡(luò)閃斷、MQ Broker 端異常等情況,導(dǎo)致回送消息失敗或者異常,因此需要發(fā)送方(生產(chǎn)者)對(duì)消息進(jìn)行可靠性投遞,以保障消息不丟失。為此專門設(shè)計(jì)了輪詢機(jī)制,設(shè)置定時(shí)任務(wù),每5 分鐘讀取一次中間狀態(tài)的消息(消息可以設(shè)置一個(gè)超時(shí)時(shí)間,比如超時(shí)1 分鐘且“pushstatus=0”,也就是1 分鐘的時(shí)間窗口內(nèi)沒(méi)有被確認(rèn)的消息,才會(huì)被定時(shí)任務(wù)拉取出來(lái)),然后將中間狀態(tài)的消息重新發(fā)送到MQ,稱之為“Retry send 機(jī)制”。輪詢程序的另外一個(gè)功能是定時(shí)比較源頭與消息匯聚中心的數(shù)據(jù)差異,將差異數(shù)據(jù)再次寫入消息匯聚中心(相比重新投遞的定時(shí)任務(wù),此任務(wù)的時(shí)間窗口應(yīng)設(shè)置的較大,如一天內(nèi)未成功寫入的消息;執(zhí)行時(shí)間間隔也比較長(zhǎng),如一個(gè)小時(shí)),稱之為“Rewrite 機(jī)制”。因此本方案除了利用RabbitMQ 自身的可靠性機(jī)制(包括隊(duì)列持久化、發(fā)送確認(rèn))之外,“Retry send 機(jī)制”與“Rewrite 機(jī)制”作為額外的保障措施,提供了更高的可靠性。
2.消息下發(fā)層
在消息下發(fā)層,通過(guò)調(diào)用各類發(fā)送渠道,包括校園服務(wù)門戶(PC 與移動(dòng))、短信平臺(tái)、微信平臺(tái)、郵件平臺(tái)等,將消息方便及時(shí)的推送給師生用戶。
下文主要針對(duì)消息匯聚層中的生產(chǎn)者(消息接口API)和消費(fèi)者(輪詢程序)的實(shí)現(xiàn)過(guò)程進(jìn)行闡述。消息下發(fā)層以調(diào)用第三方程序接口為主,不是本方案的核心內(nèi)容,故不再贅述。
1.生產(chǎn)者
通過(guò)分析實(shí)際的應(yīng)用場(chǎng)景,定義了兩種消息類型:提醒與待辦,提醒是業(yè)務(wù)系統(tǒng)發(fā)送給用戶的提示消息,具有“已讀”、“未讀”屬性;待辦則是需要用戶辦理的一類特殊提醒,具有“未辦理”、“已辦理”屬性?;谙㈩愋偷亩x,對(duì)于提醒,API 提供了“提醒生成”與“提醒已讀”兩個(gè)操作;對(duì)于待辦,API 提供了“待辦生成”與“待辦消除”兩個(gè)操作。由于提醒在程序?qū)崿F(xiàn)上與待辦類似,所以下文僅描述待辦API 的實(shí)現(xiàn)過(guò)程。
(1)待辦生成API
待辦生成API 程序在功能上主要實(shí)現(xiàn)了待辦消息數(shù)據(jù)的落地以及將待辦數(shù)據(jù)放入RabbitMQ 消息隊(duì)列并更新推送狀態(tài)。主要程序?qū)崿F(xiàn)(java 代碼)如下:
/*
*待辦數(shù)據(jù)寫入數(shù)據(jù)庫(kù)表
*/try
{
String sql_insert = "insert into " + schema + ".TMP_TODOSERVICE(SEQ_ID, APP_ID, REFNO, MESSAGE_TYPE_CODE, TARGET_TYPE, TARGET_IDS, CONTENT,
URL, DO_STEP, CREATETIME, PUSHSTATUS, DOFLAG,PUSHSTATUS_2) " + "values(" + schema + ".SEQ_TMP.
NEXTVAL, " + app_id + ", '" + refno + "', '" + message_type_code + "', '" + target_type + "', '" + target_ids + "', '" + content+ "', '" + url + "', '" + do_step + "', sysdate, -1, 1, -1)";
st = conn.createStatement();
st.execute(sql_insert);
} catch (Exception e) {
e.printStackTrace();
int i = 2;
return i;
}
/*
*RabbitMQ 生產(chǎn)者,將待辦數(shù)據(jù)放入
RabbitMQ 隊(duì)列并更新推送狀態(tài)
*/
JSONObject joTodo = new JSONObject();
joTodo.put("datatype", "push");
joTodo.put("app_id", Long.valueOf(app_id));
joTodo.put("app_key", app_key);
joTodo.put("refno", refno);
joTodo.put("message_type_code", message_type_code);
joTodo.put("target_type", target_type);
joTodo.put("target_ids", target_ids);
joTodo.put("content", content);
joTodo.put("url", url);
joTodo.put("do_step", do_step);
Boolean result = AMQPClientUtil.NewTask("task_queue_todo", joTodo.toString());
(2)待辦消除API
消除待辦API 程序在功能上主要實(shí)現(xiàn)了待辦完成數(shù)據(jù)的落地(修改已寫入數(shù)據(jù)庫(kù)的待辦的完成狀態(tài))、將待辦數(shù)據(jù)放入RabbitMQ 消息隊(duì)列并更新推送狀態(tài)。主
要程序?qū)崿F(xiàn)(java 代碼)如下:
/*
*待辦完成數(shù)據(jù)寫入數(shù)據(jù)庫(kù)表
*/
try
{
String sql_update = "update " + schema + ".TMP_TODOSERVICE set DOFLAG=0, DONETIME=sysdate " +"where APP_ID=" + app_id + " and REFNO='" + refno + "'";
st = conn.createStatement();
st.execute(sql_update);
} catch (Exception e) {
e.printStackTrace();
int i = 2;
return i;
}
/*
*RabbitMQ 生產(chǎn)者,將待辦完成數(shù)據(jù)放入RabbitMQ 隊(duì)列并更新推送狀態(tài)
*/JSONObject joTodo_complete = new JSONObject();
joTodo_complete.put("datatype", "complete");
joTodo_complete.put("app_id", Long.valueOf(app_id));
joTodo_complete.put("app_key", app_key);
joTodo_complete.put("refno", refno);
Boolean result = AMQPClientUtil.NewTask("task_queue_todo", oTodo_complete.toString());
2.消費(fèi)者
作為消費(fèi)者的輪詢程序?qū)崿F(xiàn)的功能包括:(1)讀取消息隊(duì)列中的待辦數(shù)據(jù)并調(diào)用消息匯聚中心接口,將數(shù)據(jù)寫入消息匯聚中心數(shù)據(jù)庫(kù)表;(2)“Retry Send”功能,定時(shí)拉取推送MQ 失敗的消息,重新發(fā)送給RabbitMQ;(3)“Rewrite”功能,定時(shí)比較源頭與消息匯聚中心數(shù)據(jù)的差異(消息落地?cái)?shù)據(jù)庫(kù)表與消息匯聚中心數(shù)據(jù)庫(kù)表),調(diào)用消息匯聚中心接口將差異數(shù)據(jù)重新寫入。主要程序?qū)崿F(xiàn)(java 代碼)如下:
/*
*RabbitMQ 消費(fèi)者,從RabbitMQ 隊(duì)
列獲取待辦數(shù)據(jù)并寫入消息匯聚中心
*/try {
JSONObject jsonObj = JSONObject.fromObject(message);
long app_id = Long.parseLong(jsonObj.get("app_id").toString());
String app_key = jsonObj.get("app_key").toString();
String refno = jsonObj.get("refno").toString();
if (StringUtils.isEmpty(app_key)) {
app_key = (String)appKeyMap.get(Long.valueOf(app_id));
}
if((null != jsonObj.get("datatype")) && ("push".equals(jsonObj.
get("datatype").toString()))) {
String message_type_code = jsonObj.get("message_type_
code").toString();
String target_type = jsonObj.get("target_type").toString();
String target_ids = jsonObj.get("target_ids").toString();
String content = jsonObj.get("content").toString();
String url = jsonObj.get("url").toString();
String do_step = jsonObj.get("do_step").toString();
表1 驗(yàn)證結(jié)果對(duì)比
ret = TodoIServiceUtil.todoServicePush(portalServ
erUrl, app_id, app_key, refno, message_type_code,
target_type, target_ids, content, url, do_step);//調(diào)用
消息匯聚中心接口
} else if ((null != jsonObj.get("datatype")) && ("complete".
equals(jsonObj.get("datatype").toString())))
{
ret = TodoIServiceUtil.todoServiceComplete(portalServerUrl,
app_id, app_key, refno);//調(diào)用消息匯聚中心接口
}
}
/*
*定時(shí)Retry send
*/
try
{
retrySendTodoPushFailureFromDb();//重發(fā)推送mq 失敗的
未完成待辦
retrySendTodoStateFailureFromDb();//重發(fā)推送mq 失敗的
已完成待辦
}
/*
*定時(shí)Rewrite
*/
try
{
matchTodoPushFailureFromDb();//重寫未寫入消息匯聚中心的待辦
matchTodoCompleteFailureFromDb();//重寫完成狀態(tài)不一致的待辦
}
在方案驗(yàn)證環(huán)節(jié),對(duì)系統(tǒng)可靠性(消息接收成功率)與及時(shí)性(消息的平均延遲時(shí)間(毫秒))進(jìn)行了測(cè)試與考察,定義如下:
2.第i 條消息的延遲=第i 條消息的數(shù)據(jù)庫(kù)寫入時(shí)間-第i 條消息的發(fā)送時(shí)間
(n 為接收消息總數(shù))
同時(shí)將“Retry Send 機(jī)制”的觸發(fā)時(shí)間設(shè)定為5 分鐘,時(shí)間窗口設(shè)定為1 分鐘,“Rewrite 機(jī)制”的觸發(fā)時(shí)間設(shè)定為10 分鐘,時(shí)間窗口設(shè)定為5 分鐘。java 程序循環(huán)調(diào)用待辦推送接口API,分別發(fā)送待辦2000條、5000 條、10000 條。在每條消息發(fā)出時(shí),記錄其發(fā)送時(shí)間,并和數(shù)據(jù)庫(kù)記錄生成時(shí)間做比較,得到每條消息的延遲時(shí)間。三次驗(yàn)證結(jié)果分別統(tǒng)計(jì)見表1。
驗(yàn)證過(guò)程并沒(méi)有考慮程序本身執(zhí)行時(shí)間以及網(wǎng)絡(luò)延遲的影響,可見隨著消息發(fā)送的增加,消息的平均延遲時(shí)間差別并不大;另外,在驗(yàn)證過(guò)程中,遇到了RabbitMQ 因網(wǎng)絡(luò)連接超時(shí)等情況而發(fā)送失敗的情況,但方案中“Retry send 機(jī)制”與“Rewrite 機(jī)制”保證了消息仍然被準(zhǔn)確接收,驗(yàn)證了方案的可靠性。
驗(yàn)證過(guò)程未包含RabbitMQ 的吞吐量測(cè)試,有資料表明,RabbitMQ 吞吐量可達(dá)到5.95w/s,在消息持久化場(chǎng)景下,吞吐量也能達(dá)到2.6w/s 左右。這也說(shuō)明AMQP 協(xié)議為了保證消息的可靠性在吞吐量上做了一定程度的取舍。
基于RabbitMQ 的智慧校園消息中心方案已在上海財(cái)經(jīng)大學(xué)的一站式校園服務(wù)門戶中投入使用,經(jīng)過(guò)三年多的運(yùn)行,目前消息中心已實(shí)現(xiàn)了面向教職工的75 種消息和面向?qū)W生的48 種消息的匯聚與下發(fā)。同時(shí)支持用戶個(gè)性化設(shè)置消息接收渠道(門戶站內(nèi)信、手機(jī)短信以及微信消息)和業(yè)務(wù)消息類型,方便師生用戶及時(shí)、準(zhǔn)確的接收個(gè)人所關(guān)注的業(yè)務(wù)狀態(tài)變更提醒,大大提升了業(yè)務(wù)辦理效率以及用戶使用體驗(yàn),取得了很好的應(yīng)用效果。
本文描述了一種基于RabbitMQ 的智慧校園消息中心設(shè)計(jì)方案以及主要的程序?qū)崿F(xiàn)。方案在利用RabbitMQ 自身可靠性機(jī)制的基礎(chǔ)上,增加了“Retry Send 機(jī)制”與“Rewrite 機(jī)制”,提高了消息接收的整體可靠性。測(cè)試驗(yàn)證結(jié)果與實(shí)際應(yīng)用成效表明,該方案可以很好的滿足高校中的消息集成需求,為高校智慧校園建設(shè)提供大力支撐。