冶莉娟 付大偉
【摘 要】隨著青海電網(wǎng)的信息化發(fā)展需要的系統(tǒng)增多,在消息處理中每個(gè)系統(tǒng)各自為政,而且消息格式不統(tǒng)一、不及時(shí)、甚至誤發(fā)的情況,借此統(tǒng)一消息處理,是非常有必要的,本文介紹為解決這個(gè)問題,我們提出了通過redis實(shí)現(xiàn)消息存儲,消息隊(duì)列管理,使用websocket實(shí)現(xiàn)服務(wù)端與客戶端的通訊。從而實(shí)現(xiàn)消息統(tǒng)一管理,統(tǒng)一發(fā)送渠道討論說明。
【關(guān)鍵詞】redis websocket 消息服務(wù)
1引言
隨著移動(dòng)互聯(lián)網(wǎng)的發(fā)展,手機(jī)、平板電腦等智能移動(dòng)終端成為了信息的重要載體,在移動(dòng)化的趨勢下,傳統(tǒng)業(yè)務(wù)開始逐步向移動(dòng)業(yè)務(wù)擴(kuò)展,企業(yè)應(yīng)用系統(tǒng)移動(dòng)化的需求也隨之日益增長。開始出現(xiàn)消息不統(tǒng)一,無法跨業(yè)務(wù)系統(tǒng),已無法滿足現(xiàn)有業(yè)務(wù)增長需求。
消息統(tǒng)一服務(wù),為各個(gè)業(yè)務(wù)系統(tǒng)與移動(dòng)端、web端提供統(tǒng)一的消息通道、統(tǒng)一的消息格式,加快業(yè)務(wù)系統(tǒng)的信息建設(shè),剔除各業(yè)務(wù)系統(tǒng)繁雜不統(tǒng)一的消息處理,業(yè)務(wù)系統(tǒng)的開發(fā)人員只需要引入服務(wù)端、客戶端SDK,消息接口調(diào)用就可實(shí)現(xiàn)消息發(fā)送與接收。
這里我們通過使用websocket提供消息通道、redis完成消息存儲通過send服務(wù)器與狀態(tài)服務(wù)器完成消息統(tǒng)一處理。
2. 關(guān)鍵技術(shù)
2.1 Query實(shí)現(xiàn)技術(shù)
方便消息數(shù)據(jù)的快速讀寫操作,使用redis技術(shù) ,redis支持主從的模式、讀寫分離模型、數(shù)據(jù)分片模型,以及他高速操作:SET操作每秒鐘 110000 次,GET操作每秒鐘 81000 次。結(jié)合redis支持分布式的特性,使用多服務(wù)器更高效的為消息服務(wù)提供存儲查詢服務(wù)。
2.2線程池實(shí)現(xiàn)技術(shù)
線程池作用就是限制系統(tǒng)中執(zhí)行線程的數(shù)量。
為實(shí)現(xiàn)系統(tǒng)運(yùn)行效果的最優(yōu)化,可以根據(jù)系統(tǒng)的環(huán)境情況,對線程數(shù)量進(jìn)行自動(dòng)或手動(dòng)設(shè)定;少了浪費(fèi)了系統(tǒng)資源,多了造成系統(tǒng)擁擠效率不高。通過線程池進(jìn)行線程數(shù)量的控制,根據(jù)命令的先后依次執(zhí)行。一個(gè)任務(wù)完成以后,再執(zhí)行任務(wù)列表中時(shí)間最考前的任務(wù)。當(dāng)任務(wù)列表中沒有可執(zhí)行進(jìn)程時(shí),線程池則進(jìn)入等待狀態(tài)。當(dāng)一個(gè)新任務(wù)需要完成時(shí),如果線程池中沒有正在運(yùn)行的工作線程,就可以開始運(yùn)行了;否則進(jìn)入等待隊(duì)列。
為什么要用線程池:
(1)減少了創(chuàng)建和銷毀線程的次數(shù),每個(gè)工作線程都可以被重復(fù)利用,可執(zhí)行多個(gè)任務(wù)。
(2)可以根據(jù)系統(tǒng)的承受能力,調(diào)整線程池中工作線線程的數(shù)目,防止因?yàn)橄倪^多的內(nèi)存,而把服務(wù)器累趴下(每個(gè)線程需要大約1MB內(nèi)存,線程開的越多,消耗的內(nèi)存也就越大,最后死機(jī))。
Java里面線程池的頂級接口是Executor,但是嚴(yán)格意義上講Executor并不是一個(gè)線程池,而只是一個(gè)執(zhí)行線程的工具。真正的線程池接口是ExecutorService。
比較重要的幾個(gè)類:
ExecutorService 真正的線程池接口。
ScheduledExecutorService 能和Timer/TimerTask類似,解決那些需要任務(wù)重復(fù)執(zhí)行的問題。
ThreadPoolExecutor ExecutorService的默認(rèn)實(shí)現(xiàn)。
ScheduledThreadPoolExecutor 繼承ThreadPoolExecutor的ScheduledExecutorService接口實(shí)現(xiàn),周期性任務(wù)調(diào)度的類實(shí)現(xiàn)。
要配置一個(gè)線程池是比較復(fù)雜的,尤其是對于線程池的原理不是很清楚的情況下,線程池的配置很可能不是最完美的,因此在Executors類中設(shè)置了一些配方,專門由于形成經(jīng)常用到的線程池。
2.2.1 newSingleThreadExecutor
形成用于單線程工作的線程池。運(yùn)行過程中只有單一線程在線程池中運(yùn)行,即所有的任務(wù)通過單線程的工作方式執(zhí)行。一旦所執(zhí)行的線程因某種原因而停止,則會有隊(duì)列中時(shí)間最靠前的線程進(jìn)入運(yùn)行。該線程池的一切任務(wù)按照時(shí)間的先后順序依次進(jìn)行。
2.2.2 newFixedThreadPool
形成固定處理能力的線程池。每當(dāng)有任務(wù)提交時(shí),則對應(yīng)形成一個(gè)線程,直到達(dá)到線程池的最大處理能力。線程池滿負(fù)荷工作后則不再創(chuàng)建新的線程,當(dāng)正在運(yùn)行的線程完成或者因?yàn)槠渌蚪Y(jié)束時(shí),則會在隊(duì)列中選擇最靠前的線程執(zhí)行。
2.2.3 newCachedThreadPool
形成具有緩存空間的線程池。當(dāng)需處理的線程較少,不能充滿線程池時(shí),則會將線程池中較長時(shí)間不運(yùn)行的空閑線程收回,一旦增加任務(wù)量,線程池又會自動(dòng)投入線程執(zhí)行新的任務(wù)。該線程池的大小不進(jìn)行設(shè)定,其大小完全根據(jù)操作系統(tǒng)線程的最大處理能力而定。
2.2.4 newScheduledThreadPool
創(chuàng)建一個(gè)大小無限的線程池。此線程池支持定時(shí)以及周期性執(zhí)行任務(wù)的需求。綜上述技術(shù)表現(xiàn)這里使用newFixedThreadPool線程管理。解決資源的合理利用,同時(shí)又不影響多操作的性能。
2.3 websocket實(shí)現(xiàn)技術(shù)
WebSocket protocol 是HTML5一種新的協(xié)議(protocol)。它是實(shí)現(xiàn)了瀏覽器與服務(wù)器全雙工通信(full-duplex)。
現(xiàn)很多網(wǎng)站為了實(shí)現(xiàn)即時(shí)通訊(real-time),輪詢(polling)是最常用的方式。輪詢是通過設(shè)定固定的時(shí)間間隔(time interval)(如每1秒),由瀏覽器向服務(wù)器發(fā)出傳輸請求,收到請求后,服務(wù)器將最新的數(shù)據(jù)信返回服務(wù)端瀏覽器。這種常規(guī)的傳輸請求模式的缺點(diǎn)顯而易見,瀏覽器的請求(request)必須不停的向服務(wù)器發(fā)出,可是即使信息中的數(shù)據(jù)很小,通常請求的頭部信息也會很長,從而造成帶寬的浪費(fèi)。
而最新的輪詢技術(shù)效果是Comet – 用了AJAX。但這種技術(shù)雖然采用全雙工通信,但請求(reuqest)仍舊不可避免需要發(fā)出。
在 WebSocket API,僅僅需要在瀏覽器和服務(wù)器進(jìn)行一個(gè)握手動(dòng)作,瀏覽器和服務(wù)器之間的傳輸通道就形成了。數(shù)據(jù)通訊就可以在彼此之間直接進(jìn)行。通過WebSocket 協(xié)議進(jìn)行實(shí)時(shí)服務(wù)的好處有兩點(diǎn):
(1)Header。通訊所用的頭部信息是很小的-通常僅有2 Bytes。
(2)Server Push。服務(wù)器到客戶端的數(shù)據(jù)可以進(jìn)行主動(dòng)傳輸,編輯本段握手協(xié)議。
在實(shí)現(xiàn)websocket連線過程中,需要通過瀏覽器發(fā)出websocket連線請求,然后服務(wù)器發(fā)出回應(yīng),這個(gè)過程通常稱為“握手” (handshaking)。
(1)握手協(xié)議在后期的版本中,會標(biāo)明版本編號,下面的例子屬于早期的協(xié)定之一,對于新版的 chrome 和 Firefox 皆不適用。
(2)后期的版本大多屬于功能上的擴(kuò)充,例如使用第7版的握手協(xié)議同樣也適用于第8版的握手協(xié)議。
3 整體實(shí)現(xiàn)(圖1)
(1)客戶端client調(diào)用Login中的登錄接口實(shí)現(xiàn)登錄,login服務(wù)端通過人員Map查詢得到登錄人員分配的send消息服務(wù)器。并返回給登錄人員。
(2)登錄人員得到分配的send服務(wù)器后,通過websocket技術(shù)與send服務(wù)器第一次握手,打開消息通道。
(3)業(yè)務(wù)系統(tǒng)服務(wù)端調(diào)用線程池中的write接口,將消息寫入redis中,同時(shí)將消息調(diào)用各個(gè)send服務(wù)器的send接口將消息發(fā)送到client端。
(4)當(dāng)send服務(wù)的send接口被調(diào)用時(shí),觸發(fā)send服務(wù),將消息發(fā)送予已與send握手的客戶端client。
websocket實(shí)現(xiàn)基礎(chǔ)代碼:
java
protected StreamInbound createWebSocketInbound(String arg0,
HttpServletRequest arg1) {
// TODO Auto-generated method stub
return new ChatWebSocket(users);
}
創(chuàng)建一個(gè)chatwebsocket用于處理這個(gè)請求,觸發(fā)該chatwebsocket對象的onOpen事件
@Override
protected void onOpen(WsOutbound outbound) {
// this.connection=connection;
this.username = "#" + String.valueOf(USERNUMBER);
USERNUMBER++;
try {
String message = "NAME" + "\t" + this.username;
CharBuffer buffer = CharBuffer.wrap(message);
this.getWsOutbound().writeTextMessage(buffer);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
users.add(this);
}
websocket的數(shù)據(jù)傳輸是frame形式傳輸?shù)?,比如會將一條消息分為幾個(gè)frame,按照先后順序傳輸出去。這樣做會有幾個(gè)好處:
(1)大數(shù)據(jù)的傳輸可以分片傳輸,不用考慮到數(shù)據(jù)大小導(dǎo)致的長度標(biāo)志位不足夠的情況。
(2)和http的chunk一樣,可以邊生成數(shù)據(jù)邊傳遞消息,即提高傳輸效率。
傳輸協(xié)議:
FIN:1位,用來表明這是一個(gè)消息的最后的消息片斷,當(dāng)然第一個(gè)消息片斷也可能成為最終的消息片斷。
RSV1, RSV2, RSV3, 分別都是1位,若互相之間的自定義協(xié)議沒有進(jìn)行約定,則這幾位的狀態(tài)被賦0,否則WebSocket連接必須被切除。
Opcode操作碼為4位,對有效負(fù)載數(shù)據(jù)進(jìn)行定義,當(dāng)收到的操作碼未知時(shí),也必須將連接關(guān)斷,以下是定義的操作碼:
* %x0 代表消息片斷是連續(xù)的;
* %x1 代表消息片斷為文本格式;
* %x2 代表消息片斷為二進(jìn)制格式;
* %x3-7 為非控制消息片段預(yù)留操作碼,未進(jìn)行定義;
* %x8 代表已經(jīng)關(guān)斷連接;
* %x9 代表心跳檢查的ping;
* %xA 代表心跳檢查的pong;
* %xB-F 為控制消息片段預(yù)留操作碼,未進(jìn)行定義。
Mask:1位,代表掩碼是否存在于傳輸?shù)臄?shù)據(jù)中,若該位為1,則masking-key中必須有掩碼,該位在客戶端傳輸?shù)椒?wù)端的消息中狀態(tài)都為1。
Payload length: 發(fā)送數(shù)據(jù)的長度,其表示形式為字節(jié):7位、7+16位、或者7+64位。若該數(shù)值在0-125的范圍內(nèi),則該值代表所發(fā)送數(shù)據(jù)的長度;若該數(shù)值為126,那么其后面的兩個(gè)字節(jié)代表一個(gè)16位無符號整數(shù),代表所發(fā)送數(shù)據(jù)的長度;若該數(shù)值為127,那么它后面的8各字節(jié)為一個(gè)64位無符號整數(shù),該數(shù)值代表所發(fā)送數(shù)據(jù)的長度。網(wǎng)絡(luò)字節(jié)的順序代表多字節(jié)長度的量。擴(kuò)展數(shù)據(jù)和應(yīng)用數(shù)據(jù)相加即為負(fù)載數(shù)據(jù)的長度,當(dāng)擴(kuò)展數(shù)據(jù)的長度是0時(shí),負(fù)載數(shù)據(jù)的長度與應(yīng)用數(shù)據(jù)的長度相等。
Masking-key:0或4個(gè)字節(jié),客戶端傳輸?shù)椒?wù)端的數(shù)據(jù),都是通過內(nèi)嵌的一個(gè)32位值作為掩碼的;掩碼鍵只有在掩碼位設(shè)置為1的時(shí)候存在。
Payload data: (x+y)位,負(fù)載數(shù)據(jù)長度等于擴(kuò)展數(shù)據(jù)和應(yīng)用數(shù)據(jù)長度相加的結(jié)果。
Extension data:x位,擴(kuò)展數(shù)據(jù)的長度在當(dāng)客戶端和服務(wù)端沒有特殊約定時(shí)保持為0,擴(kuò)展數(shù)據(jù)的長度必須在擴(kuò)展時(shí)直接或者通過計(jì)算的方式被指定,同時(shí)其握手方式也必須提前確定。若擴(kuò)展數(shù)據(jù)存在,那么其必定包含在負(fù)載數(shù)據(jù)之中。
Application data:y位,自由定義的應(yīng)用數(shù)據(jù),其往往放置于擴(kuò)展數(shù)據(jù)的后面,應(yīng)用數(shù)據(jù)的長度為負(fù)載數(shù)據(jù)和擴(kuò)展數(shù)據(jù)的長度之差。
讀取數(shù)據(jù)需要按照這個(gè)格式讀取,發(fā)送數(shù)據(jù)也需要按照這個(gè)格式發(fā)送返回。
通過websocket實(shí)現(xiàn)客戶端與服務(wù)端的握手連接,當(dāng)業(yè)務(wù)系統(tǒng)發(fā)送消息時(shí),query服務(wù)會將服務(wù)發(fā)送到send服務(wù)器并處發(fā)send服務(wù),將消息發(fā)送到與此send服務(wù)器握手的客戶端上。
4 結(jié)語
通過Redis實(shí)現(xiàn)消息存儲,消息隊(duì)列,websocket實(shí)現(xiàn)客戶端與服務(wù)端的通訊。從而實(shí)現(xiàn)客戶端與業(yè)務(wù)端的通訊連接,提供消息推送,消息訂閱服務(wù)。
參考文獻(xiàn):
[1] 羅永剛.大型網(wǎng)絡(luò)棋牌游戲服務(wù)器端設(shè)計(jì)與實(shí)現(xiàn)[D].山東大學(xué),2011年.
[2] 李蔚.腦動(dòng)力游戲服務(wù)器端子系統(tǒng)的分析與設(shè)計(jì)[D].北京郵電大學(xué),2011年.
[3] 杜松波.企業(yè)即時(shí)通訊系統(tǒng)服務(wù)器的設(shè)計(jì)與實(shí)現(xiàn)[D].電子科技大學(xué),2005年.
[4] 劉曉宇.基于SIP的即時(shí)通訊系統(tǒng)的實(shí)現(xiàn)與應(yīng)用[D].中國科學(xué)院研究生院(計(jì)算技術(shù)研究所),2006年.
[5] 蔡文健.航海距離系統(tǒng)的服務(wù)器端設(shè)計(jì)與實(shí)現(xiàn)[D].北京郵電大學(xué),2010年.
[6] 孔鵬.即時(shí)通訊系統(tǒng)的研究與實(shí)現(xiàn)[D].山東大學(xué),2006年.
[7] 陳春源.基于服務(wù)器端的HTTP信息過濾系統(tǒng)設(shè)計(jì)與實(shí)現(xiàn)[D].華南理工大學(xué),2012年.
作者簡介:冶莉娟(1982一),女,山西渾源人,本科,工程師,主要電網(wǎng)調(diào)度自動(dòng)化工作;付大偉(1976一),男,吉林四平人,研究生,高級工程師,主要負(fù)責(zé)電力自動(dòng)化技術(shù)工作。