胡喜明,胡 淼
(杭州電子科技大學(xué) 通信工程學(xué)院,浙江 杭州 310018)
隨著物聯(lián)網(wǎng)技術(shù)的發(fā)展,人們能夠通過移動(dòng)終端方便獲取所需要的信息與服務(wù)[1]。為了保證網(wǎng)絡(luò)設(shè)備與物理設(shè)備間可靠的傳輸,用戶將傳統(tǒng)的消息拉取模式轉(zhuǎn)變?yōu)閼?yīng)用消息推送。與消息拉取模式相比,由服務(wù)器將所產(chǎn)生的消息推送用戶,符合當(dāng)前互聯(lián)網(wǎng)流量管理以及資源分配的需求,同時(shí)在傳輸?shù)膶?shí)時(shí)性、高效性以及流量資源節(jié)約方面都有較大的提升[2]。物聯(lián)網(wǎng)設(shè)備需要考慮到硬件相關(guān)參數(shù),例如CPU性能、最大帶寬容量、電池使用時(shí)間、網(wǎng)絡(luò)最大實(shí)時(shí)流量等方面的問題,在服務(wù)推送方面需要一種功能完善、更加輕量、帶寬占比小且性能高的服務(wù)推送技術(shù)[3]。
當(dāng)前主流的服務(wù)推送方案有GCM服務(wù)、MQTT以及HTTP消息輪循[4]。其中HTTP輪循方式最為簡(jiǎn)單,但是在實(shí)用性、可靠性以及可擴(kuò)展性方面有所欠缺。GCM服務(wù)是google研發(fā)的一款云推送框架,擁有完善的推送方案,但是受限于網(wǎng)絡(luò)原因,無(wú)法在國(guó)內(nèi)使用。MQTT協(xié)議具有低延遲、低帶寬、推送速度快等優(yōu)勢(shì),適合大部分物聯(lián)網(wǎng)開發(fā)場(chǎng)景,然而單純的MQTT協(xié)議本身過于簡(jiǎn)單,在消息推送、安全性管理、主題化推送方式、數(shù)據(jù)緩存性等方面開發(fā)難度大,因此需要對(duì)MQTT協(xié)議進(jìn)行二次開發(fā)[5]。
本文提出一種基于Reactor-Netty+MQTT的高性能服務(wù)推送框架。項(xiàng)目中應(yīng)用Reactor-Netty技術(shù)替代傳統(tǒng)Netty技術(shù)對(duì)MQTT協(xié)議進(jìn)行封裝,借助該技術(shù)的響應(yīng)式非阻塞編程、事件驅(qū)動(dòng)等特性構(gòu)建高性能通信服務(wù)。通過Redis進(jìn)行數(shù)據(jù)緩存和Kafka實(shí)現(xiàn)消息代理轉(zhuǎn)發(fā)進(jìn)一步提高系統(tǒng)服務(wù)處理能力以及線程安全。
1.1.1 MQTT背景及特點(diǎn)
MQTT是由IBM公司在1999年所提出的一款基于TCP協(xié)議的發(fā)布訂閱協(xié)議[6,7]。MQTT是為了在有限的帶寬以及內(nèi)存條件下實(shí)現(xiàn)消息可靠傳輸。該協(xié)議提供發(fā)布/訂閱形式,實(shí)現(xiàn)單點(diǎn)以及一對(duì)多的發(fā)布消息推送、應(yīng)用TCP協(xié)議實(shí)現(xiàn)有序可靠雙端連接、報(bào)頭以及心跳報(bào)文僅占用3個(gè)字節(jié),將帶寬傳輸最小化,有效降低網(wǎng)絡(luò)通信流量、支持“最多一次”、”至少一次”,“僅一次“3種服務(wù)質(zhì)量等級(jí)、支持遺囑機(jī)制,實(shí)現(xiàn)客戶端異常斷開后,自動(dòng)根據(jù)所設(shè)置的遺囑機(jī)制,發(fā)布相關(guān)主題信息通知其它訂閱的客戶端用戶?;谝陨咸攸c(diǎn)使得當(dāng)前物聯(lián)網(wǎng)的開發(fā)中絕大多數(shù)都將MQTT作為消息傳遞協(xié)議的首選[8]。
MQTT協(xié)議主要擁有3個(gè)角色:消息發(fā)布者、消息代理、消息訂閱者[9]。消息通過推送的形式從發(fā)布者經(jīng)過消息代理進(jìn)行發(fā)布,此時(shí)消息已經(jīng)確定了自身所對(duì)應(yīng)的主題,消息訂閱者可以根據(jù)主題進(jìn)行相關(guān)數(shù)據(jù)的訂閱[10,11]。MQTT工作模型如圖1所示。
圖1 MQTT工作模型
1.1.2 MQTT協(xié)議消息傳輸格式
MQTT協(xié)議主要由固定報(bào)頭、可變報(bào)頭、有效載荷3部分構(gòu)成。固定報(bào)頭是所有MQTT報(bào)文段中必須存在的,總長(zhǎng)度為2字節(jié)。在第一個(gè)字節(jié)中,7-4位標(biāo)識(shí)MQTT控制報(bào)文類型,目前報(bào)文類型主要分為連接、訂閱、發(fā)布3種。服務(wù)質(zhì)量占兩個(gè)標(biāo)識(shí)位,分為Qos0、Qos1、Oos23種。4-0位最為控制報(bào)文標(biāo)志位,可看作一種屬性參數(shù)??勺儓?bào)頭與有效載荷根據(jù)協(xié)議的不同進(jìn)行省略[12,13]。
為了應(yīng)對(duì)高并發(fā)場(chǎng)景下流量過大情況,微軟設(shè)計(jì)了一種異步的編程思想-響應(yīng)式編程(reactive programming)。響應(yīng)式編程是一種專注于數(shù)據(jù)流以及變化傳遞的異步編程模式,這也意味著可以應(yīng)用編程語(yǔ)言進(jìn)行靜態(tài)和動(dòng)態(tài)數(shù)據(jù)流的表示。在jdk9中java引入了Reactor的概念,Reactor-Netty作為響應(yīng)式編程家族的一員,其底層基于Netty框架,對(duì)Netty進(jìn)行響應(yīng)式編程封裝,將其轉(zhuǎn)換為異步事件驅(qū)動(dòng)的網(wǎng)絡(luò)應(yīng)用程序框架。Reactor-Netty內(nèi)部仍然保留了Netty的主從多線程模型,擁有Netty框架的全部?jī)?yōu)勢(shì)。
Reactor通過Reactor Streams中的背壓,進(jìn)行數(shù)據(jù)流量控制,發(fā)布者和訂閱者可以進(jìn)行數(shù)據(jù)流量協(xié)商,其中背壓分為4種回壓策略:①onBackpressureBuffer:對(duì)下游的請(qǐng)求數(shù)據(jù)采用緩存的形式,保證系統(tǒng)不會(huì)壓力過大。②OnBackpressureDrop:元素就緒時(shí),根據(jù)下游是否有未滿足的request來判斷是否發(fā)出當(dāng)前元素。③OnBackpressureLatest:一直發(fā)送當(dāng)前最新的數(shù)據(jù)。④OnBackpressureError:當(dāng)前數(shù)據(jù)已經(jīng)滿了,再次添加請(qǐng)求直接報(bào)錯(cuò)。圖2為OnBackpressureBuffer模式的背壓原理圖,當(dāng)訂閱者的消費(fèi)能力遠(yuǎn)小于發(fā)布者,訂閱者可以通知發(fā)布者進(jìn)行服務(wù)的取消和終止功能,保證傳輸數(shù)據(jù)流量合理。
圖2 OnBackpressureBuffer模式背壓原理
整體服務(wù)器分為3部分,消息推送broker模塊,針對(duì)MQTT協(xié)議所定義的多種消息類型進(jìn)行分類處理以及通訊傳輸協(xié)議的搭建。服務(wù)認(rèn)證模塊,為保證服務(wù)的安全性提供了接口化驗(yàn)證,通過將RSA算法與Redis緩存結(jié)合,實(shí)現(xiàn)安全性加密認(rèn)證。數(shù)據(jù)緩存模塊,為防止傳輸中服務(wù)突然宕機(jī)所導(dǎo)致的數(shù)據(jù)丟失,采用Redis對(duì)服務(wù)中傳輸?shù)臄?shù)據(jù)進(jìn)行緩存。消息分發(fā)模塊,針對(duì)服務(wù)中耗時(shí)操作以及大型互聯(lián)網(wǎng)數(shù)據(jù)進(jìn)行流量統(tǒng)計(jì)的需求,對(duì)接企業(yè)級(jí)消息隊(duì)列kafka實(shí)現(xiàn)消息服務(wù)端與數(shù)據(jù)分發(fā)接收端的解耦,便于對(duì)海量數(shù)據(jù)的處理。系統(tǒng)架構(gòu)如圖3所示。
圖3 服務(wù)推送系統(tǒng)架構(gòu)
服務(wù)認(rèn)證是對(duì)系統(tǒng)進(jìn)行安全維護(hù)的手段,客戶端在接入服務(wù)器時(shí)需要進(jìn)行權(quán)限認(rèn)證工作,通過對(duì)接入用戶的身份驗(yàn)證,保證服務(wù)傳輸信息不被惡意獲取。驗(yàn)證流程如圖4所示。
圖4 服務(wù)認(rèn)證流程
為保證服務(wù)的安全性,需要對(duì)密碼進(jìn)行加密處理,防止由于密碼泄露所導(dǎo)致的非法客戶端接入竊取信息。這里采用RSA非對(duì)稱加密算法實(shí)現(xiàn)公私鑰加密校驗(yàn)。應(yīng)用項(xiàng)目中提供的RSAUTIL類生成公鑰與私鑰,其內(nèi)部經(jīng)過大量的邏輯運(yùn)算實(shí)現(xiàn)加密。系統(tǒng)需要保證每次生成的公鑰與私鑰唯一,這里將服務(wù)名稱與當(dāng)前時(shí)間進(jìn)行拼接作為加密的鹽值。這樣每次獲取生成的公鑰私鑰必然唯一。公鑰私鑰生成后以服務(wù)名稱作為key值,明文密碼作為value存儲(chǔ)到Redis中。
客戶端在發(fā)起連接時(shí),需要將通過公鑰加密后的密碼和用戶名稱一并傳輸,服務(wù)器接收到信息后,通過私鑰對(duì)傳輸密碼進(jìn)行解密與Redis中存儲(chǔ)的密碼進(jìn)行對(duì)比驗(yàn)證。
Broker模塊是推送服務(wù)的核心功能模塊,關(guān)于MQTT協(xié)議的邏輯處理部分均在該模塊進(jìn)行。其中主要包括QoS服務(wù)質(zhì)量等級(jí)選擇功能、遺囑消息功能,保留消息功能、客戶端自動(dòng)重連功能、心跳機(jī)制功能、MQTT/WS連接功能以及主題過濾功能。
QoS服務(wù)質(zhì)量等級(jí)選擇功能主要是為根據(jù)訂閱端業(yè)務(wù)為其提供消息選擇類型的處理,內(nèi)部根據(jù)選擇等級(jí)的不同實(shí)現(xiàn)相應(yīng)邏輯處理。其中主要分為Qos1、Qos2以及Qos3這3種。
(1)Qos0:在該服務(wù)質(zhì)量下,消息至多發(fā)送一次,其消息的發(fā)送完全依賴于OSI七層協(xié)議中的傳輸層進(jìn)行維護(hù),可能會(huì)發(fā)生消息丟失或者重復(fù)的情況。該服務(wù)質(zhì)量可用于如下情況:定時(shí)推送周圍環(huán)境相關(guān)數(shù)據(jù),丟失一兩組數(shù)據(jù)不會(huì)影響服務(wù)的應(yīng)用且在不久后會(huì)再次推送。該情況主要用于普通APP定時(shí)推送功能,若設(shè)備當(dāng)前推送數(shù)據(jù)時(shí)設(shè)備并未接入網(wǎng)絡(luò),此時(shí)即使再次聯(lián)網(wǎng)也會(huì)丟失數(shù)據(jù)。
(2)Qos1:在該服務(wù)質(zhì)量下,消息至少發(fā)送一次,消息發(fā)送到客戶端后,客戶端返回確認(rèn)信息標(biāo)志當(dāng)前服務(wù)已送達(dá)。該服務(wù)由于有一次確認(rèn)機(jī)制,因此在網(wǎng)絡(luò)環(huán)境比較好的情況下可以實(shí)現(xiàn)數(shù)據(jù)正確流暢的傳輸,但是若網(wǎng)絡(luò)環(huán)境出現(xiàn)波動(dòng)使得服務(wù)端無(wú)法收到確認(rèn)信息,可能會(huì)造成數(shù)據(jù)多次重發(fā)的問題。
(3)Qos2:該服務(wù)質(zhì)量在三者中處于最高級(jí)別,該消息僅會(huì)發(fā)送一次。主要用在當(dāng)消息丟失或者重發(fā)時(shí),對(duì)服務(wù)端造成業(yè)務(wù)上的影響的情況。為了保證傳輸一次的要求,采用了兩階段確認(rèn)的方式,與Qos1相比開銷較大。
遺囑消息功能主要是在發(fā)生服務(wù)斷連問題的時(shí)候,訂閱端無(wú)法及時(shí)了解消息發(fā)布方狀態(tài)導(dǎo)致的持續(xù)等待問題。遺囑消息保證了消息發(fā)布方在網(wǎng)絡(luò)波動(dòng)所導(dǎo)致的服務(wù)下線時(shí)能夠及時(shí)通知訂閱方,這里遺囑的內(nèi)容以事件監(jiān)聽器的形式實(shí)現(xiàn),一旦出現(xiàn)服務(wù)斷連的情況即可通過監(jiān)聽模式進(jìn)行響應(yīng);
保留消息功能主要是對(duì)重要信息進(jìn)行標(biāo)記,任何標(biāo)記為保留消息的內(nèi)容,新接入的訂閱方都可以在連接后收到,并且不需要等待消息發(fā)送方的推送,內(nèi)部將消息存儲(chǔ)在Redis緩存中,保證數(shù)據(jù)不丟失;
訂閱方斷線重連功能主要是為了將服務(wù)自動(dòng)化管理,保證訂閱方在由于網(wǎng)絡(luò)波動(dòng)導(dǎo)致失去連接,服務(wù)器會(huì)自動(dòng)實(shí)現(xiàn)斷線重連,降低維護(hù)成本,由Reactor-Netty內(nèi)部采用的心跳檢測(cè)機(jī)制,通過設(shè)定心跳間隔實(shí)現(xiàn)斷線檢測(cè)功能,并且基于事件驅(qū)動(dòng),在產(chǎn)生了心跳斷連的情況時(shí)觸發(fā)重連機(jī)制,通過輪循的方式發(fā)送重連請(qǐng)求保證服務(wù)的可靠性;
心跳檢測(cè)功能主要是通過底層Netty的心跳檢測(cè)模塊,MQTT協(xié)議中有PINGREQ心跳請(qǐng)求,通過實(shí)現(xiàn)底層Rea-ctor-Netty的Handler類,對(duì)傳入的PINGREQ進(jìn)行管理,客戶端定時(shí)發(fā)送心跳檢測(cè)請(qǐng)求,通過Reactor-Netty內(nèi)置的MONO類進(jìn)行調(diào)用,該類基于事件觸發(fā),可根據(jù)實(shí)際情況進(jìn)行結(jié)束處理。并且MONO類可以與java8中并發(fā)異步響應(yīng)類進(jìn)行轉(zhuǎn)化,服務(wù)端將心跳響應(yīng)包裝為MONO進(jìn)行返回操作;
MQTT/WS連接功能在推送服務(wù)中,不僅要與采用MQTT協(xié)議的硬件設(shè)備進(jìn)行關(guān)聯(lián),同時(shí)還會(huì)與互聯(lián)網(wǎng)設(shè)備產(chǎn)生連接,因此在服務(wù)器接收連接請(qǐng)求時(shí)會(huì)判斷當(dāng)前訂閱端類別,根據(jù)MQTT以及WS進(jìn)行分類處理。底層采用工廠設(shè)計(jì)模式對(duì)服務(wù)進(jìn)行劃分,根據(jù)傳入的標(biāo)識(shí)進(jìn)行判斷并選取不同的處理方式。對(duì)于MQTT協(xié)議,首先需要傳入配置類,通過內(nèi)置buildServer方法將配置類中信息綁定到當(dāng)前服務(wù)中,為保證安全服務(wù)端可以設(shè)定SSL驗(yàn)證保證傳輸安全。在配置類中通過MONO類的鏈?zhǔn)秸{(diào)用,將協(xié)議的處理類,以及客戶端重傳機(jī)制進(jìn)行綁定。WS連接與MQTT不同之處在于底層采用http傳輸,同時(shí)WS也不需要重連機(jī)制。服務(wù)器內(nèi)部封裝通過Reactor-Netty底層中與網(wǎng)絡(luò)傳輸相關(guān)的Handler處理類進(jìn)行半包解析、長(zhǎng)連接以及序列化;
主題過濾功能主要是對(duì)通配符進(jìn)行匹配,其中”#“符號(hào)表示只要該符號(hào)前的信息匹配,后續(xù)字段可以忽略不計(jì)。”?“表示當(dāng)前的占位符,可替代任意符號(hào)。服務(wù)器通過對(duì)通配符進(jìn)行匹配,實(shí)現(xiàn)主題個(gè)性化定制,該功能主要針對(duì)大批量服務(wù)接入時(shí)需要根據(jù)主題進(jìn)行客戶端分類。
borker模塊中Reactor-Netty對(duì)MQTT協(xié)議進(jìn)行封裝,結(jié)合Reactor框架響應(yīng)式以及背壓的特點(diǎn),提升MQTT整體性能。
以Reactor-Netty框架初始化服務(wù)功能為例:
(1)服務(wù)啟動(dòng)后首先調(diào)用服務(wù)端工廠方法TransportServerFactory進(jìn)行服務(wù)初始化操作,MONO類進(jìn)行服務(wù)的綁定以及初始化。
//初始化類,綁定相關(guān)屬性信息
Mono.from(protocolFactory.getProtocol(
ProtocolType.valueOf(config.getProtocol()))
//獲取傳輸信息
.get()>.getTransport()
//綁定Reactor內(nèi)部類
.start(config,>unicastProcessor)
//netty初始化類
.map(this::>wrapper)
//錯(cuò)誤信息打印
.doOnError(config.getThrowableConsumer());}
(2)調(diào)用底層Reactor-Netty方法進(jìn)行初始化。創(chuàng)建Netty連接并設(shè)置定時(shí)屬性。
NettyInbound inbound = connection.getInbound();
Connection c = connection.getConnection();
// 定時(shí)關(guān)閉
Disposable disposable = Mono.fromRunnable(c::dispose)
.delaySubscription(Duration.ofSeconds(10))
.subscribe();
// 設(shè)置connection
c.channel()
.attr(AttributeKeys.connectionAttributeKey)
.set(connection);
// 設(shè)置定時(shí)關(guān)閉
c.channel()
.attr(AttributeKeys.closeConnection)
.set(disposable);
(3)設(shè)置心跳檢測(cè)以及遺囑消息處理模塊
//心跳檢測(cè)
connection.getConnection()>.onReadIdle(config.getHeart(), () -> connection.getConnection()>.dispose())
//設(shè)置遺囑消息
connection.getConnection()>.onDispose() ->
{
Optional.ofNullable(connection.getConnection()>.channel()>.attr(AttributeKeys.WILL_MESSAGE))>.map(Attribute::>get)
(4)根據(jù)QOS等級(jí)設(shè)置不同的消息處理類。
switch (qoS) {
//QOS1等級(jí)
case AT_LEAST_ONCE:
co.sendMessage(false, >qoS,>willMessage.isRetain(), willMessage.getTopicName(), willMessage.getCopyByteBuf())>.subscribe();
break;
//QOS2等級(jí)以及QOS3等級(jí)
case EXACTLY_ONCE:
case AT_MOST_ONCE:
co.sendMessageRetry(false,>qoS,>willMessage.>isRetain(), >willMessage.getTopicName(),>willMessage.getCopyByteBuf())>.
subscribe();
break;
//未傳輸時(shí)默認(rèn)傳輸?shù)燃?jí)
default:
co.sendMessage(false,>qoS,>willMessage.is Retain(), willMessage.getTopicName(),>willMessage.getCopyByteBuf())>.subscribe();
break;}
})));
(5)sendMessage內(nèi)部綁定了handler處理類,通過對(duì)傳輸類型的判斷設(shè)置不同的handler。
messageTypeCollection.computeIfAbsent(messageType,
type->{
//根據(jù)傳入的類型進(jìn)行判斷所需要的handler
switch(type){
//數(shù)據(jù)傳輸響應(yīng)
case PUBACK:
case PUBREC:
case PUBREL:
case PUBLISH:
case PUBCOMP:
return new PubHandler();
//連接請(qǐng)求
case CONNECT:
case DISCONNECT:
return new ConnectHandler();
//心跳檢測(cè)
case PINGREQ:
return new HeartHandler();
//服務(wù)訂閱
case SUBSCRIBE:
case UNSUBSCRIBE:
return new SubHandler();
}
經(jīng)過以上操作,服務(wù)器初始化成功,等待相關(guān)請(qǐng)求。服務(wù)器在接收到請(qǐng)求后,根據(jù)數(shù)據(jù)請(qǐng)求類型進(jìn)行相應(yīng)處理。
隨著服務(wù)連接數(shù)的增多,交互數(shù)據(jù)量也逐漸增大,如果將數(shù)據(jù)都存儲(chǔ)在內(nèi)存中,將降低整體服務(wù)的性能。項(xiàng)目中將數(shù)據(jù)存儲(chǔ)在Redis中,作為當(dāng)前服務(wù)數(shù)據(jù)的緩存。Redis內(nèi)部對(duì)存入信息按照key-value形式進(jìn)行存儲(chǔ),保證數(shù)據(jù)的唯一性。Redis是一款Nosql數(shù)據(jù)庫(kù),在存儲(chǔ)方面采用的單線程處理,因此訪問時(shí)不需要進(jìn)行并發(fā)維護(hù),并且Redis在并發(fā)數(shù)據(jù)訪問以及讀取方面的性能均好于數(shù)據(jù)庫(kù)。為防止Redis系統(tǒng)崩潰導(dǎo)致的數(shù)據(jù)丟失,內(nèi)部開啟數(shù)據(jù)持久化形式,將數(shù)據(jù)定時(shí)保存到磁盤文件中,同時(shí)采用sentinel監(jiān)控下的Redis集群模型,sentinel作為服務(wù)監(jiān)控組件主要功能為服務(wù)監(jiān)控、故障提示、故障自動(dòng)轉(zhuǎn)移,通過對(duì)當(dāng)前Redis集群的心跳監(jiān)控,保證當(dāng)Redis主節(jié)點(diǎn)崩潰時(shí),系統(tǒng)自動(dòng)在其它從服務(wù)器中進(jìn)行選舉,產(chǎn)生新的主節(jié)點(diǎn),實(shí)現(xiàn)服務(wù)的可用性。Redis集群模式采用哈希槽算法對(duì)每個(gè)存入的key進(jìn)行CRC16校驗(yàn)后對(duì)16 384進(jìn)行取模來決定放入那個(gè)服務(wù)器中。相對(duì)于傳統(tǒng)集群形式,該結(jié)構(gòu)更容易擴(kuò)展,如果擴(kuò)充一個(gè)節(jié)點(diǎn)D,只需要將A、B、C節(jié)點(diǎn)中的部分槽放置在D上;如果想移除節(jié)點(diǎn)A,只需要將A的數(shù)據(jù)轉(zhuǎn)移到B和C節(jié)點(diǎn)上。由于將哈希槽從一個(gè)節(jié)點(diǎn)移動(dòng)到另一個(gè)節(jié)點(diǎn)不需要停止服務(wù),只需要通過命令直接再分配,因而上述擴(kuò)展不會(huì)造成集群不可用,保證了服務(wù)的高可用性。
如表1所示,項(xiàng)目中Redis主要對(duì)以下7個(gè)內(nèi)容進(jìn)行存儲(chǔ)。
表1 Redis存儲(chǔ)信息
publish以及pubrel:主要是為了滿足Qos質(zhì)量等級(jí),質(zhì)量等級(jí)的不同導(dǎo)致數(shù)據(jù)需要重傳,因此需要對(duì)傳輸當(dāng)前數(shù)據(jù)進(jìn)行緩,等待重新發(fā)送。
session:主要是對(duì)當(dāng)前客戶端與服務(wù)器所建立的會(huì)話進(jìn)行存儲(chǔ)。MQTT的設(shè)計(jì)主要是為了對(duì)信號(hào)不穩(wěn)定的網(wǎng)絡(luò)提供服務(wù),因此有時(shí)會(huì)出現(xiàn)網(wǎng)絡(luò)波動(dòng)導(dǎo)致的斷開連接情況,此時(shí)客戶端就可以在Session中將遺囑信息也進(jìn)行存儲(chǔ),服務(wù)器會(huì)定期對(duì)客戶端狀態(tài)進(jìn)行檢查,如果出現(xiàn)異常可以向Topic中發(fā)送遺囑信息通知訂閱方,避免服務(wù)宕機(jī)而導(dǎo)致的訂閱方長(zhǎng)時(shí)間等待問題。
client:訂閱者根據(jù)主題的不同實(shí)現(xiàn)訂閱服務(wù),Redis中需要針對(duì)每個(gè)訂閱者進(jìn)行唯一的標(biāo)識(shí)ClientId,該標(biāo)識(shí)應(yīng)用Redis內(nèi)部的incr方法實(shí)現(xiàn)標(biāo)識(shí)號(hào)的遞增操作并與服務(wù)名進(jìn)行拼接。在高并發(fā)場(chǎng)景下,Redis內(nèi)部實(shí)現(xiàn)id的同步生成,保證線程安全。將ClientId與所訂閱的主題進(jìn)行綁定,方便后續(xù)服務(wù)推送時(shí)查詢。
subWild與NotsubWild:發(fā)布方采用個(gè)性化的主題模式進(jìn)行服務(wù)發(fā)布,其中可以通過#與?進(jìn)行多字段匹配,與當(dāng)前單獨(dú)字段匹配,因此在存儲(chǔ)的時(shí)候需要將有無(wú)通配符的主題進(jìn)行分別存儲(chǔ)。
retain:該緩存主要為了broker模塊中保留信息功能,可以讓新訂閱的客戶端獲取發(fā)布方當(dāng)前最新的信息,不需要進(jìn)行接收等待。
消息代理分發(fā)模式是在硬件設(shè)備之間通信的基礎(chǔ)上,將數(shù)據(jù)傳入大型互聯(lián)網(wǎng)設(shè)備中進(jìn)行處理。這里采用kafka作為消息代理分發(fā)的中間件,kafka用于海量數(shù)據(jù)的場(chǎng)景,通過分布式架構(gòu)所提供的消息處理機(jī)制對(duì)數(shù)據(jù)進(jìn)行流式存儲(chǔ),消費(fèi)者通過訂閱不同的主題可以實(shí)現(xiàn)毫秒級(jí)延遲的信息接收,保證數(shù)據(jù)順序性消費(fèi)。大型企業(yè)項(xiàng)目中可通過kafka將該框架的推送數(shù)據(jù)轉(zhuǎn)發(fā)到hdfs歸檔或者elasticsearch做全文檢測(cè),為大型互聯(lián)網(wǎng)設(shè)備提供高性能的數(shù)據(jù)獲取途徑。
客戶端輸入錯(cuò)誤的密碼并向服務(wù)器發(fā)送連接請(qǐng)求,在已知服務(wù)器的IP地址以及運(yùn)行端口號(hào)的情況下進(jìn)行訪問測(cè)試。測(cè)試結(jié)果如圖5所示,經(jīng)過rsa私鑰解碼顯示解析后的密碼與Redis中所存儲(chǔ)的密碼不符,返回錯(cuò)誤提示。因此該推送框架可以保證服務(wù)的安全性。
圖5 安全認(rèn)證失敗
為檢測(cè)推送服務(wù)的性能,需要對(duì)其在不同并發(fā)量情況下進(jìn)行性能測(cè)試,測(cè)試采用Jmeter進(jìn)行服務(wù)推送性能檢測(cè),根據(jù)并發(fā)數(shù)量以及響應(yīng)時(shí)間對(duì)框架進(jìn)行分析。測(cè)試設(shè)備采用兩臺(tái)阿里云服務(wù)器,配置見表2。
Jmeter端每間隔1 s發(fā)送一個(gè)350字節(jié)的數(shù)據(jù),在不同的并發(fā)數(shù)情況下進(jìn)行測(cè)試。如圖6所示,在低并發(fā)情況下,由于系統(tǒng)中自身處理速度比較快,無(wú)法清楚看出MQTT與本框架的性能,但隨著并發(fā)數(shù)的增長(zhǎng),單純采用MQTT的響應(yīng)時(shí)間上升較快,當(dāng)并發(fā)數(shù)達(dá)到了2000時(shí)可以看到MQTT的響應(yīng)時(shí)間已經(jīng)遠(yuǎn)遠(yuǎn)超過Reactor-Netty+MQTT,同時(shí)從與Netty+MQTT的響應(yīng)時(shí)間對(duì)比可以看出,該框架的響應(yīng)時(shí)間也相對(duì)較低。這說明采用Reactor-Netty+MQTT的組合形式能夠極大降低高并發(fā)場(chǎng)景下的響應(yīng)時(shí)間。
圖6 平均響應(yīng)時(shí)間對(duì)比
從吞吐量方面考慮,圖7可知當(dāng)并發(fā)數(shù)達(dá)到1500左右MQTT達(dá)到了性能的瓶頸期,隨著并發(fā)數(shù)的增高,服務(wù)吞吐量不再上升。而在并發(fā)數(shù)量提升到3000左右時(shí),Netty+MQTT方案的吞吐量提升速度不如Reactor-Netty+MQTT方案快。實(shí)驗(yàn)結(jié)果表明在4000并發(fā)數(shù)的時(shí)候,系統(tǒng)仍然有較好的吞吐量,并且其上升趨勢(shì)遠(yuǎn)好于原生MQTT以及Netty+MQTT方案。這說明Reactor-Netty+MQTT的方案有良好的并發(fā)性能,適用于高并發(fā)訪問。
圖7 平均響應(yīng)速度對(duì)比
本文介紹了一種高性能消息推送框架,基于Reactor-Netty與MQTT相結(jié)合的方案,通過Reactor-Netty響應(yīng)式編程思想對(duì)MQTT協(xié)議進(jìn)行封裝,實(shí)現(xiàn)異步高性能響應(yīng)。應(yīng)用Redis進(jìn)行消息緩存,保證服務(wù)消息的實(shí)時(shí)性,設(shè)計(jì)kafka內(nèi)容分發(fā)代理模式,可將數(shù)據(jù)推送服務(wù)與大型互聯(lián)網(wǎng)架構(gòu)相結(jié)合,實(shí)現(xiàn)服務(wù)數(shù)據(jù)高質(zhì)量處理。經(jīng)過實(shí)驗(yàn)驗(yàn)證發(fā)現(xiàn),Reactor-Netty實(shí)現(xiàn)的消息推送服務(wù)可以提高整體的并發(fā)處理能力,相對(duì)于傳統(tǒng)MQTT框架以及Netty框架性能有極大的提高。