• 
    

    
    

      99热精品在线国产_美女午夜性视频免费_国产精品国产高清国产av_av欧美777_自拍偷自拍亚洲精品老妇_亚洲熟女精品中文字幕_www日本黄色视频网_国产精品野战在线观看 ?

      上海財(cái)經(jīng)大學(xué)消息中心點(diǎn)亮智慧校園

      2019-08-15 01:28:56胡慶亮王珊珊高亮
      中國(guó)教育網(wǎng)絡(luò) 2019年7期
      關(guān)鍵詞:交換器調(diào)用隊(duì)列

      文/胡慶亮 王珊珊 高亮

      高校智慧校園建設(shè)旨在改變師生與學(xué)校資源、環(huán)境的交互方式,開展以人為本的個(gè)性化服務(wù),進(jìn)而建立智能開放的教育環(huán)境和便利舒適的生活環(huán)境。消息中心服務(wù)可以將各類業(yè)務(wù)過(guò)程中產(chǎn)生的消息進(jìn)行集中管理與收發(fā),師生用戶可以方便、及時(shí)、準(zhǔn)確的獲知個(gè)人所關(guān)注的各類業(yè)務(wù)狀態(tài),實(shí)現(xiàn)了學(xué)校消息的一站式與個(gè)性化推送。因此,消息中心成為高校智慧校園建設(shè)的一項(xiàng)重要內(nèi)容。

      消息中心的實(shí)現(xiàn)依賴于高效可靠的消息隊(duì)列中間件(簡(jiǎn)稱消息中間件),它可以通過(guò)消息傳遞和消息排隊(duì)模型,在分布式環(huán)境下提供應(yīng)用解耦、彈性伸縮、冗余存儲(chǔ)、流量削峰、異步通信、數(shù)據(jù)同步等功能。目前,應(yīng)用比較廣泛的消息中間件包括:RabbitMQ、ActiveMQ、Kafka、RocketMQ,其中RabbitMQ 是使用Erlang 語(yǔ)言開發(fā)的開源消息隊(duì)列系統(tǒng),基于AMQP 協(xié)議實(shí)現(xiàn),該協(xié)議面向消息、隊(duì)列和路由(包括點(diǎn)對(duì)點(diǎn)和發(fā)布/訂閱),強(qiáng)調(diào)可靠性與安全性,主要應(yīng)用于對(duì)數(shù)據(jù)一致性、穩(wěn)定性和可靠性要求很高的場(chǎng)景,此外RabbitMQ 還有高可用性、高易用性等優(yōu)點(diǎn)。結(jié)合高校的統(tǒng)一消息服務(wù)特點(diǎn)(對(duì)數(shù)據(jù)一致性、穩(wěn)定性和可靠性要求很高,并發(fā)量、吞吐量要求一般),考慮采用RabbitMQ來(lái)構(gòu)建高校智慧校園消息中心。本文主要對(duì)基于RabbitMQ 構(gòu)建的智慧校園消息中心的設(shè)計(jì)方案與實(shí)現(xiàn)進(jìn)行闡述。

      RabbitMQ 原理

      RabbitMQ 起源于金融系統(tǒng),用于在分布式系統(tǒng)中存儲(chǔ)轉(zhuǎn)發(fā)消息,具有易用性、擴(kuò)展性、高可用性等優(yōu)勢(shì),其內(nèi)部結(jié)構(gòu)如圖1 所示。

      圖1 RabbitMQ 內(nèi)部結(jié)構(gòu)

      1. Message:消息,消息是不具名的,它由消息頭和消息體組成。消息體是不透明的,消息頭則由一系列的可選屬性組成,這些屬性包括routing-key(路由鍵)、priority(相對(duì)于其他消息的優(yōu)先權(quán))、delivery-mode(指出該消息可能需要持久性存儲(chǔ))等。

      2.Publisher:消息的生產(chǎn)者,也是一個(gè)向交換器發(fā)布消息的客戶端應(yīng)用程序。

      3.Exchange:交換器,用來(lái)接收生產(chǎn)者發(fā)送的消息并將這些消息路由給服務(wù)器中的隊(duì)列。

      4.Binding:綁定,是消息隊(duì)列和交換器之間的關(guān)聯(lián)。一個(gè)綁定就是基于路由鍵將交換器和消息隊(duì)列連接起來(lái)的路由規(guī)則,所以可以將交換器理解成一個(gè)由綁定構(gòu)成的路由表。

      5.Queue:消息隊(duì)列,用來(lái)保存消息直到發(fā)送給消費(fèi)者。它是消息的容器,也是消息的終點(diǎn)。一個(gè)消息可投入一個(gè)或多個(gè)隊(duì)列,消息一直在隊(duì)列里面,等待消費(fèi)者連接到這個(gè)隊(duì)列將其取走。

      6.Connection:網(wǎng)絡(luò)連接,比如一個(gè)TCP 連接。

      7.Channel:信道,多路復(fù)用連接中的一條獨(dú)立的雙向數(shù)據(jù)流通道。信道是建立在真實(shí)的TCP 連接內(nèi)地虛擬連接,AMQP命令都是通過(guò)信道發(fā)出去的,不管是發(fā)布消息、訂閱隊(duì)列還是接收消息,這些動(dòng)作都是通過(guò)信道完成。因?yàn)閷?duì)于操作系統(tǒng)來(lái)說(shuō)建立和銷毀TCP 都是非常昂貴的開銷,所以引入了信道的概念,以復(fù)用一條TCP連接。

      8.Consumer:消息的消費(fèi)者,表示一個(gè)從消息隊(duì)列中取得消息的客戶端應(yīng)用程序。

      9.Virtual Host:虛擬主機(jī),表示一批交換器、消息隊(duì)列和相關(guān)對(duì)象。虛擬主機(jī)是共享相同的身份認(rèn)證和加密環(huán)境的獨(dú)立服務(wù)器域。每個(gè)vhost 本質(zhì)上就是一個(gè)mini 版的 RabbitMQ 服務(wù)器,擁有自己的隊(duì)列、交換器、綁定和權(quán)限機(jī)制。vhost是AMQP 概念的基礎(chǔ),必須在連接時(shí)指定,RabbitMQ 默認(rèn)的vhost 是“/”。

      10.Broker:表示消息隊(duì)列服務(wù)器實(shí)體。

      系統(tǒng)架構(gòu)

      基于RabbitMQ 的智慧校園消息中心包括:“消息匯聚層”和“消息下發(fā)層”,消息匯聚層完成業(yè)務(wù)消息的統(tǒng)一匯集與存儲(chǔ),消息下發(fā)層則以方便、有效的途徑(服務(wù)門戶、短信消息、微信消息等)將消息下發(fā)給師生用戶。具體架構(gòu)如圖2 所示。

      圖2 基于RabbitMQ 的智慧校園消息中心架構(gòu)

      1.消息匯聚層

      在消息匯聚層,對(duì)于RabbitMQ 而言,生產(chǎn)者是消息接口API,業(yè)務(wù)系統(tǒng)通過(guò)調(diào)用消息接口API 將消息數(shù)據(jù)放入消息隊(duì)列;消費(fèi)者的職責(zé)則由后臺(tái)輪詢程序完成。在該架構(gòu)下,完整的消息集成流程如下:

      (1)業(yè)務(wù)系統(tǒng)調(diào)用消息接口API;

      (2)消息接口API 被調(diào)用后,首先將消息數(shù)據(jù)落地到數(shù)據(jù)庫(kù)表,消息記錄的初始推送狀態(tài)設(shè)置為“pushstatus=0”,然后以Confirm 方式將消息發(fā)送給RabbitMQ;

      (3) 消 息 接 口API 在 接 收 到RabbitMQ返回的Confirm消息確認(rèn)成功后,更新消息記錄的推送狀態(tài)“pushstatus=1”。

      (4)輪詢程序從RabbitMQ 隊(duì)列讀取消息,調(diào)用消息匯聚中心接口將消息寫入消息匯聚中心數(shù)據(jù)庫(kù)表。

      其中,步驟三是RabbitMQ 的發(fā)送確認(rèn)過(guò)程。在此過(guò)程中,可能出現(xiàn)網(wǎng)絡(luò)閃斷、MQ Broker 端異常等情況,導(dǎo)致回送消息失敗或者異常,因此需要發(fā)送方(生產(chǎn)者)對(duì)消息進(jìn)行可靠性投遞,以保障消息不丟失。為此專門設(shè)計(jì)了輪詢機(jī)制,設(shè)置定時(shí)任務(wù),每5 分鐘讀取一次中間狀態(tài)的消息(消息可以設(shè)置一個(gè)超時(shí)時(shí)間,比如超時(shí)1 分鐘且“pushstatus=0”,也就是1 分鐘的時(shí)間窗口內(nèi)沒(méi)有被確認(rèn)的消息,才會(huì)被定時(shí)任務(wù)拉取出來(lái)),然后將中間狀態(tài)的消息重新發(fā)送到MQ,稱之為“Retry send 機(jī)制”。輪詢程序的另外一個(gè)功能是定時(shí)比較源頭與消息匯聚中心的數(shù)據(jù)差異,將差異數(shù)據(jù)再次寫入消息匯聚中心(相比重新投遞的定時(shí)任務(wù),此任務(wù)的時(shí)間窗口應(yīng)設(shè)置的較大,如一天內(nèi)未成功寫入的消息;執(zhí)行時(shí)間間隔也比較長(zhǎng),如一個(gè)小時(shí)),稱之為“Rewrite 機(jī)制”。因此本方案除了利用RabbitMQ 自身的可靠性機(jī)制(包括隊(duì)列持久化、發(fā)送確認(rèn))之外,“Retry send 機(jī)制”與“Rewrite 機(jī)制”作為額外的保障措施,提供了更高的可靠性。

      2.消息下發(fā)層

      在消息下發(fā)層,通過(guò)調(diào)用各類發(fā)送渠道,包括校園服務(wù)門戶(PC 與移動(dòng))、短信平臺(tái)、微信平臺(tái)、郵件平臺(tái)等,將消息方便及時(shí)的推送給師生用戶。

      系統(tǒng)實(shí)現(xiàn)

      下文主要針對(duì)消息匯聚層中的生產(chǎn)者(消息接口API)和消費(fèi)者(輪詢程序)的實(shí)現(xiàn)過(guò)程進(jìn)行闡述。消息下發(fā)層以調(diào)用第三方程序接口為主,不是本方案的核心內(nèi)容,故不再贅述。

      1.生產(chǎn)者

      通過(guò)分析實(shí)際的應(yīng)用場(chǎng)景,定義了兩種消息類型:提醒與待辦,提醒是業(yè)務(wù)系統(tǒng)發(fā)送給用戶的提示消息,具有“已讀”、“未讀”屬性;待辦則是需要用戶辦理的一類特殊提醒,具有“未辦理”、“已辦理”屬性?;谙㈩愋偷亩x,對(duì)于提醒,API 提供了“提醒生成”與“提醒已讀”兩個(gè)操作;對(duì)于待辦,API 提供了“待辦生成”與“待辦消除”兩個(gè)操作。由于提醒在程序?qū)崿F(xiàn)上與待辦類似,所以下文僅描述待辦API 的實(shí)現(xiàn)過(guò)程。

      (1)待辦生成API

      待辦生成API 程序在功能上主要實(shí)現(xiàn)了待辦消息數(shù)據(jù)的落地以及將待辦數(shù)據(jù)放入RabbitMQ 消息隊(duì)列并更新推送狀態(tài)。主要程序?qū)崿F(xiàn)(java 代碼)如下:

      /*

      *待辦數(shù)據(jù)寫入數(shù)據(jù)庫(kù)表

      */try

      {

      String sql_insert = "insert into " + schema + ".TMP_TODOSERVICE(SEQ_ID, APP_ID, REFNO, MESSAGE_TYPE_CODE, TARGET_TYPE, TARGET_IDS, CONTENT,

      URL, DO_STEP, CREATETIME, PUSHSTATUS, DOFLAG,PUSHSTATUS_2) " + "values(" + schema + ".SEQ_TMP.

      NEXTVAL, " + app_id + ", '" + refno + "', '" + message_type_code + "', '" + target_type + "', '" + target_ids + "', '" + content+ "', '" + url + "', '" + do_step + "', sysdate, -1, 1, -1)";

      st = conn.createStatement();

      st.execute(sql_insert);

      } catch (Exception e) {

      e.printStackTrace();

      int i = 2;

      return i;

      }

      /*

      *RabbitMQ 生產(chǎn)者,將待辦數(shù)據(jù)放入

      RabbitMQ 隊(duì)列并更新推送狀態(tài)

      */

      JSONObject joTodo = new JSONObject();

      joTodo.put("datatype", "push");

      joTodo.put("app_id", Long.valueOf(app_id));

      joTodo.put("app_key", app_key);

      joTodo.put("refno", refno);

      joTodo.put("message_type_code", message_type_code);

      joTodo.put("target_type", target_type);

      joTodo.put("target_ids", target_ids);

      joTodo.put("content", content);

      joTodo.put("url", url);

      joTodo.put("do_step", do_step);

      Boolean result = AMQPClientUtil.NewTask("task_queue_todo", joTodo.toString());

      (2)待辦消除API

      消除待辦API 程序在功能上主要實(shí)現(xiàn)了待辦完成數(shù)據(jù)的落地(修改已寫入數(shù)據(jù)庫(kù)的待辦的完成狀態(tài))、將待辦數(shù)據(jù)放入RabbitMQ 消息隊(duì)列并更新推送狀態(tài)。主

      要程序?qū)崿F(xiàn)(java 代碼)如下:

      /*

      *待辦完成數(shù)據(jù)寫入數(shù)據(jù)庫(kù)表

      */

      try

      {

      String sql_update = "update " + schema + ".TMP_TODOSERVICE set DOFLAG=0, DONETIME=sysdate " +"where APP_ID=" + app_id + " and REFNO='" + refno + "'";

      st = conn.createStatement();

      st.execute(sql_update);

      } catch (Exception e) {

      e.printStackTrace();

      int i = 2;

      return i;

      }

      /*

      *RabbitMQ 生產(chǎn)者,將待辦完成數(shù)據(jù)放入RabbitMQ 隊(duì)列并更新推送狀態(tài)

      */JSONObject joTodo_complete = new JSONObject();

      joTodo_complete.put("datatype", "complete");

      joTodo_complete.put("app_id", Long.valueOf(app_id));

      joTodo_complete.put("app_key", app_key);

      joTodo_complete.put("refno", refno);

      Boolean result = AMQPClientUtil.NewTask("task_queue_todo", oTodo_complete.toString());

      2.消費(fèi)者

      作為消費(fèi)者的輪詢程序?qū)崿F(xiàn)的功能包括:(1)讀取消息隊(duì)列中的待辦數(shù)據(jù)并調(diào)用消息匯聚中心接口,將數(shù)據(jù)寫入消息匯聚中心數(shù)據(jù)庫(kù)表;(2)“Retry Send”功能,定時(shí)拉取推送MQ 失敗的消息,重新發(fā)送給RabbitMQ;(3)“Rewrite”功能,定時(shí)比較源頭與消息匯聚中心數(shù)據(jù)的差異(消息落地?cái)?shù)據(jù)庫(kù)表與消息匯聚中心數(shù)據(jù)庫(kù)表),調(diào)用消息匯聚中心接口將差異數(shù)據(jù)重新寫入。主要程序?qū)崿F(xiàn)(java 代碼)如下:

      /*

      *RabbitMQ 消費(fèi)者,從RabbitMQ 隊(duì)

      列獲取待辦數(shù)據(jù)并寫入消息匯聚中心

      */try {

      JSONObject jsonObj = JSONObject.fromObject(message);

      long app_id = Long.parseLong(jsonObj.get("app_id").toString());

      String app_key = jsonObj.get("app_key").toString();

      String refno = jsonObj.get("refno").toString();

      if (StringUtils.isEmpty(app_key)) {

      app_key = (String)appKeyMap.get(Long.valueOf(app_id));

      }

      if((null != jsonObj.get("datatype")) && ("push".equals(jsonObj.

      get("datatype").toString()))) {

      String message_type_code = jsonObj.get("message_type_

      code").toString();

      String target_type = jsonObj.get("target_type").toString();

      String target_ids = jsonObj.get("target_ids").toString();

      String content = jsonObj.get("content").toString();

      String url = jsonObj.get("url").toString();

      String do_step = jsonObj.get("do_step").toString();

      表1 驗(yàn)證結(jié)果對(duì)比

      ret = TodoIServiceUtil.todoServicePush(portalServ

      erUrl, app_id, app_key, refno, message_type_code,

      target_type, target_ids, content, url, do_step);//調(diào)用

      消息匯聚中心接口

      } else if ((null != jsonObj.get("datatype")) && ("complete".

      equals(jsonObj.get("datatype").toString())))

      {

      ret = TodoIServiceUtil.todoServiceComplete(portalServerUrl,

      app_id, app_key, refno);//調(diào)用消息匯聚中心接口

      }

      }

      /*

      *定時(shí)Retry send

      */

      try

      {

      retrySendTodoPushFailureFromDb();//重發(fā)推送mq 失敗的

      未完成待辦

      retrySendTodoStateFailureFromDb();//重發(fā)推送mq 失敗的

      已完成待辦

      }

      /*

      *定時(shí)Rewrite

      */

      try

      {

      matchTodoPushFailureFromDb();//重寫未寫入消息匯聚中心的待辦

      matchTodoCompleteFailureFromDb();//重寫完成狀態(tài)不一致的待辦

      }

      系統(tǒng)驗(yàn)證

      在方案驗(yàn)證環(huán)節(jié),對(duì)系統(tǒng)可靠性(消息接收成功率)與及時(shí)性(消息的平均延遲時(shí)間(毫秒))進(jìn)行了測(cè)試與考察,定義如下:

      2.第i 條消息的延遲=第i 條消息的數(shù)據(jù)庫(kù)寫入時(shí)間-第i 條消息的發(fā)送時(shí)間

      (n 為接收消息總數(shù))

      同時(shí)將“Retry Send 機(jī)制”的觸發(fā)時(shí)間設(shè)定為5 分鐘,時(shí)間窗口設(shè)定為1 分鐘,“Rewrite 機(jī)制”的觸發(fā)時(shí)間設(shè)定為10 分鐘,時(shí)間窗口設(shè)定為5 分鐘。java 程序循環(huán)調(diào)用待辦推送接口API,分別發(fā)送待辦2000條、5000 條、10000 條。在每條消息發(fā)出時(shí),記錄其發(fā)送時(shí)間,并和數(shù)據(jù)庫(kù)記錄生成時(shí)間做比較,得到每條消息的延遲時(shí)間。三次驗(yàn)證結(jié)果分別統(tǒng)計(jì)見表1。

      驗(yàn)證過(guò)程并沒(méi)有考慮程序本身執(zhí)行時(shí)間以及網(wǎng)絡(luò)延遲的影響,可見隨著消息發(fā)送的增加,消息的平均延遲時(shí)間差別并不大;另外,在驗(yàn)證過(guò)程中,遇到了RabbitMQ 因網(wǎng)絡(luò)連接超時(shí)等情況而發(fā)送失敗的情況,但方案中“Retry send 機(jī)制”與“Rewrite 機(jī)制”保證了消息仍然被準(zhǔn)確接收,驗(yàn)證了方案的可靠性。

      驗(yàn)證過(guò)程未包含RabbitMQ 的吞吐量測(cè)試,有資料表明,RabbitMQ 吞吐量可達(dá)到5.95w/s,在消息持久化場(chǎng)景下,吞吐量也能達(dá)到2.6w/s 左右。這也說(shuō)明AMQP 協(xié)議為了保證消息的可靠性在吞吐量上做了一定程度的取舍。

      應(yīng)用成效

      基于RabbitMQ 的智慧校園消息中心方案已在上海財(cái)經(jīng)大學(xué)的一站式校園服務(wù)門戶中投入使用,經(jīng)過(guò)三年多的運(yùn)行,目前消息中心已實(shí)現(xiàn)了面向教職工的75 種消息和面向?qū)W生的48 種消息的匯聚與下發(fā)。同時(shí)支持用戶個(gè)性化設(shè)置消息接收渠道(門戶站內(nèi)信、手機(jī)短信以及微信消息)和業(yè)務(wù)消息類型,方便師生用戶及時(shí)、準(zhǔn)確的接收個(gè)人所關(guān)注的業(yè)務(wù)狀態(tài)變更提醒,大大提升了業(yè)務(wù)辦理效率以及用戶使用體驗(yàn),取得了很好的應(yīng)用效果。

      本文描述了一種基于RabbitMQ 的智慧校園消息中心設(shè)計(jì)方案以及主要的程序?qū)崿F(xiàn)。方案在利用RabbitMQ 自身可靠性機(jī)制的基礎(chǔ)上,增加了“Retry Send 機(jī)制”與“Rewrite 機(jī)制”,提高了消息接收的整體可靠性。測(cè)試驗(yàn)證結(jié)果與實(shí)際應(yīng)用成效表明,該方案可以很好的滿足高校中的消息集成需求,為高校智慧校園建設(shè)提供大力支撐。

      猜你喜歡
      交換器調(diào)用隊(duì)列
      媽媽交換器
      隊(duì)列里的小秘密
      基于多隊(duì)列切換的SDN擁塞控制*
      軟件(2020年3期)2020-04-20 00:58:44
      核電項(xiàng)目物項(xiàng)調(diào)用管理的應(yīng)用研究
      在隊(duì)列里
      LabWindows/CVI下基于ActiveX技術(shù)的Excel調(diào)用
      豐田加速駛?cè)胱詣?dòng)駕駛隊(duì)列
      基于系統(tǒng)調(diào)用的惡意軟件檢測(cè)技術(shù)研究
      一種新型交換器
      科技資訊(2016年24期)2016-05-30 19:04:11
      百通推出入門級(jí)快速工業(yè)以太網(wǎng)絡(luò)交換器系列
      枞阳县| 霞浦县| 郧西县| 当涂县| 周口市| 喀什市| 精河县| 石嘴山市| 汉沽区| 磐石市| 平远县| 平定县| 白山市| 长丰县| 吉安县| 上饶县| 泰顺县| 措美县| 萨迦县| 桐梓县| 黄冈市| 阿克| 奎屯市| 广昌县| 杂多县| 和田市| 苍南县| 清镇市| 隆回县| 青铜峡市| 高青县| 孟连| 寻乌县| 华蓥市| 泰顺县| 辰溪县| 静宁县| 孟州市| 昆山市| 哈尔滨市| 麻江县|