劉恩孚 冷芳玲 鮑玉斌
摘要:提出了一個(gè)基于整體同步并行計(jì)算(BSP)模型的、具有磁盤暫存功能的大規(guī)模圖處理系統(tǒng)——BC-BSP。該系統(tǒng)通過提供應(yīng)用程序接口(API)實(shí)現(xiàn)系統(tǒng)配置和有關(guān)策略的可擴(kuò)展性,通過優(yōu)化的圖數(shù)據(jù)磁盤存儲實(shí)現(xiàn)了數(shù)據(jù)處理規(guī)模的高可擴(kuò)展性以及高性能的容錯方案,并且可以處理普通數(shù)據(jù)集的聚類和分類等需要迭代計(jì)算的數(shù)據(jù)挖掘算法。通過實(shí)驗(yàn)驗(yàn)證了該系統(tǒng)的可擴(kuò)展性,其在真實(shí)數(shù)據(jù)集上性能優(yōu)于Giraph1.0.0,在模擬數(shù)據(jù)集上稍遜于Giraph的內(nèi)存版。
關(guān)鍵詞:BSP;大規(guī)模圖處理;迭代計(jì)算;磁盤緩存
Abstract:We describe a bulk synchronous parallel (BSP)-based parallel iterative processing system for graph data with disk caching assist. This system is called BC-BSP. The system can achieve the scalability of system configuration and policy by providing APIs, high scalability of the data scale processed, and high performance of fault-tolerant scheme by disk storage optimization to graph data. It can also execute some data mining algorithms with iterative processing, such as clustering and classification on non-graph data sets. The experimental results show that the scalability and performance of the proposed system are better than that of Giraph1.0.0 on the real data set,but it is lightly poorer than the memory version of Giraph.
Key words:BSP; large-scale graph processing; iterative computing; disk cache
圖是計(jì)算機(jī)科學(xué)中最常用的一類抽象數(shù)據(jù)結(jié)構(gòu),更具有一般性的表示能力?,F(xiàn)實(shí)世界中的許多應(yīng)用場景都可以很自然地使用圖結(jié)構(gòu)表示。例如,交通運(yùn)輸網(wǎng)絡(luò)、社交網(wǎng)絡(luò)中的資源對象之間的關(guān)系以及生物信息網(wǎng)絡(luò)等。在大數(shù)據(jù)時(shí)代,需要分析的圖規(guī)模越來越大。以互聯(lián)網(wǎng)和社交網(wǎng)絡(luò)為例,隨著互聯(lián)網(wǎng)的深入使用和Web 2.0技術(shù)的推動,網(wǎng)頁數(shù)量增長迅猛,據(jù)中國互聯(lián)網(wǎng)絡(luò)信息中心(CNNIC)統(tǒng)計(jì):截止2014年12月中國網(wǎng)頁規(guī)模達(dá)到1 899億個(gè),年增長率26.6%;而基于互聯(lián)網(wǎng)的社交網(wǎng)絡(luò)更是如此,如全球最大的社交網(wǎng)絡(luò)Facebook,2014年7月已有約22億用戶,其中月活躍用戶數(shù)13億人。在中國,如QQ空間、微博、開心網(wǎng)等,發(fā)展也異常迅猛。因此,實(shí)際應(yīng)用中圖的頂點(diǎn)可達(dá)10億,而邊就會更多,對應(yīng)的數(shù)據(jù)文件會更大。對如此大規(guī)模圖數(shù)據(jù)的存儲和分析處理的時(shí)間和空間開銷遠(yuǎn)遠(yuǎn)超出了傳統(tǒng)集中式圖數(shù)據(jù)處理的承受能力。因此,對大規(guī)模圖的有效處理成為了一個(gè)新的挑戰(zhàn)。
MapReduce計(jì)算模型可以實(shí)現(xiàn)對大規(guī)模(圖)數(shù)據(jù)的處理,并且具有很好的容錯性和可擴(kuò)展性。但是由于圖數(shù)據(jù)分析(如網(wǎng)頁的PageRank[1]計(jì)算、最短路徑計(jì)算、聚類分析)都需要多次迭代才能完成。每次迭代需要一個(gè)或多個(gè)開銷較大的MapReduce作業(yè)完成。為解決迭代計(jì)算的時(shí)間性能問題,谷歌公司開發(fā)了基于整體同步并行計(jì)算(BSP)模型的Pregel[2]系統(tǒng),之后Apache的兩個(gè)開源項(xiàng)目Hama和Giraph也開展了基于BSP的迭代計(jì)算系統(tǒng)的開發(fā)。它們都是在內(nèi)存中做數(shù)據(jù)處理,因此能夠處理的圖的規(guī)模有限。文中,我們設(shè)計(jì)開發(fā)了基于BSP模型的、能夠處理大規(guī)模(圖)數(shù)據(jù)的并行迭代計(jì)算系統(tǒng)——BC-BSP。該系統(tǒng)主要特色在于:(1)實(shí)現(xiàn)了具有磁盤輔助的基于BSP的大規(guī)模圖數(shù)據(jù)并行迭代處理系統(tǒng),該系統(tǒng)在內(nèi)存受限的情況下具有很好的數(shù)據(jù)處理能力,即在可用的節(jié)點(diǎn)規(guī)模和內(nèi)存配置的情況下,可以處理的數(shù)據(jù)規(guī)模較大;(2)系統(tǒng)多方面考慮負(fù)載均衡,在充分考慮數(shù)據(jù)本地化的前提下考慮了各個(gè)節(jié)點(diǎn)的負(fù)載均衡問題,并且結(jié)點(diǎn)的負(fù)載均衡優(yōu)先于數(shù)據(jù)本地化。我們做了大量的實(shí)驗(yàn),比較了基于BSP的大規(guī)模圖處理系統(tǒng)的性能和擴(kuò)展性。
1 BSP模型和相關(guān)工作
BSP是一種“塊”同步模型[3],即通過消息傳遞機(jī)制,實(shí)現(xiàn)塊內(nèi)異步并行,塊間顯式同步。一個(gè)基于BSP的計(jì)算系統(tǒng)是由具有處理機(jī)和存儲器的多個(gè)自治的計(jì)算服務(wù)器組成的集群,并且這個(gè)集群采用主/從結(jié)構(gòu)。主節(jié)點(diǎn)用于協(xié)調(diào)整個(gè)集群,包括接收用戶的作業(yè)提交、作業(yè)調(diào)度、故障監(jiān)控等功能,從節(jié)點(diǎn)(也稱為工作節(jié)點(diǎn))用于存儲和處理數(shù)據(jù)。
谷歌公司開發(fā)的基于BSP模型的分布式圖計(jì)算框架Pregel主要是為了處理大規(guī)模圖數(shù)據(jù),如網(wǎng)頁的PageRank計(jì)算、最短路徑等。Pregel假設(shè)處理的數(shù)據(jù)都在內(nèi)存中,因此在一定的節(jié)點(diǎn)規(guī)模下,它能夠處理的數(shù)據(jù)規(guī)模是有限制的?;赑regel的思想,許多基于BSP的大規(guī)模圖處理系統(tǒng)被開發(fā)出來。例如,Apache推出了基于Java的開源項(xiàng)目Hama[4],它是一個(gè)純粹的基于BSP的用于大規(guī)??茖W(xué)計(jì)算(如矩陣計(jì)算、圖和網(wǎng)絡(luò)算法)的計(jì)算框架,同樣它的早期版本沒有考慮磁盤輔助的問題,而是假設(shè)所有數(shù)據(jù)全部位于內(nèi)存中,最新的版本也在添加磁盤輔助功能,但是很不完善;而Apache的另一個(gè)開源項(xiàng)目Giraph,是建立在Hadoop基礎(chǔ)之上的Pregel的開源實(shí)現(xiàn)[5],可以認(rèn)為它是MapReduce模型和BSP模型的結(jié)合體,即它利用MapReduce作業(yè)的Map任務(wù)實(shí)現(xiàn)了基于BSP模型的迭代計(jì)算,而不需要Reduce任務(wù),整個(gè)圖處理過程只需要啟動一次MapRedcue作業(yè),但是一旦出現(xiàn)故障,整個(gè)作業(yè)需要重新啟動;GraphLab是卡內(nèi)基梅隆大學(xué)提出的面向大規(guī)模數(shù)據(jù)挖掘和圖計(jì)算的分布式內(nèi)存計(jì)算框架[6]。更多的基于BSP模型的類Pregel的大規(guī)模數(shù)據(jù)分布式并行處理系統(tǒng)和框架請見文獻(xiàn)[7]。
2 BC-BSP概述
圖1給出了BC-BSP系統(tǒng)的整體結(jié)構(gòu),主要包括BSP核心層、管理接口層和接口層。BC-BSP實(shí)現(xiàn)了對Hadoop分布式文件系統(tǒng)(HDFS)、HBase、MySQL等底層存儲系統(tǒng)的支持,包括數(shù)據(jù)的輸入和輸出。BC-BSP系統(tǒng)內(nèi)部核心層主要包括客戶端作業(yè)提交和數(shù)據(jù)劃分,主節(jié)點(diǎn)端的作業(yè)調(diào)度和集群監(jiān)控,從節(jié)點(diǎn)端的本地計(jì)算處理、全局同步、消息通信和容錯控制;接口層主要包括應(yīng)用編程接口(API)和命令行接口(CLI);管理接口層主要包括集群管理、系統(tǒng)自動化安裝部署、日志管理、性能管理和故障管理等工具。
從系統(tǒng)實(shí)現(xiàn)的角度,BC-BSP系統(tǒng)是一個(gè)主從式結(jié)構(gòu),主要分為客戶端、主控節(jié)點(diǎn)、工作節(jié)點(diǎn)、任務(wù)模塊、全局同步模塊。圖2給出了BC-BSP的運(yùn)行控制機(jī)制以及系統(tǒng)中客戶端、主控節(jié)點(diǎn)、工作節(jié)點(diǎn)、任務(wù)模塊、全局同步模塊之間的協(xié)作關(guān)系。
在BC-BSP系統(tǒng)中,客戶端主要根據(jù)用戶指定的輸入路徑進(jìn)行數(shù)據(jù)分片,調(diào)整分區(qū)數(shù)目,檢查作業(yè)運(yùn)行的可行性,向主控節(jié)點(diǎn)申請作業(yè)并將作業(yè)打包提交給BSP主控節(jié)點(diǎn),當(dāng)作業(yè)開始運(yùn)行后,負(fù)責(zé)及時(shí)反饋?zhàn)鳂I(yè)運(yùn)行狀態(tài);主控節(jié)點(diǎn)端管理集群工作節(jié)點(diǎn)的注冊、心跳信息和狀態(tài)信息收集等,并作為容錯控制的控制中心,提供各種狀態(tài)查詢接口,并以作業(yè)為單位,負(fù)責(zé)作業(yè)的初始化、調(diào)度和同步控制等;工作節(jié)點(diǎn)端主要負(fù)責(zé)工作節(jié)點(diǎn)本地的任務(wù)管理和局部同步控制以及局部聚集計(jì)算等;任務(wù)模塊端是任務(wù)運(yùn)行的實(shí)體,主要負(fù)責(zé)執(zhí)行用戶的業(yè)務(wù)處理邏輯和數(shù)據(jù)輸入輸出處理等;全局同步負(fù)責(zé)同一作業(yè)的所有任務(wù)在各個(gè)超步之間的全局同步工作,超步路障同步由主節(jié)點(diǎn)端、工作節(jié)點(diǎn)端及任務(wù)模塊端共同完成,在同步過程中,可以完成聚集計(jì)算,系統(tǒng)中的同步主要通過第三方組件Zookeeper實(shí)現(xiàn);消息通信主要在每一個(gè)超步的本地計(jì)算執(zhí)行過程中,負(fù)責(zé)異步地發(fā)送和接收消息,并將接收的消息暫存到本地的接收消息隊(duì)列中,當(dāng)內(nèi)存空間不足時(shí),支持磁盤輔助存儲,這里主要是通過遠(yuǎn)程過程調(diào)用協(xié)議(RPC)機(jī)制實(shí)現(xiàn)消息傳遞;容錯控制模塊負(fù)責(zé)容錯備份、故障檢測和故障恢復(fù)等功能,以寫檢查點(diǎn)機(jī)制作為主要的容錯方案,支持手動備份和自動周期備份功能;管理工具主要通過Web界面或命令行的方式為用戶提供可視化的系統(tǒng)管理和監(jiān)控功能;接口模塊主要為用戶提供本地計(jì)算、消息發(fā)送/接收等的應(yīng)用編程接口,以及為用戶提供啟動和關(guān)閉系統(tǒng)服務(wù)、作業(yè)提交等命令行接口。
3 BC-BSP提供的API
系統(tǒng)給用戶提供了與作業(yè)建立相關(guān)的API,用于編寫針對圖處理或科學(xué)計(jì)算的處理程序。另外,系統(tǒng)還提供了用于系統(tǒng)功能擴(kuò)展的接口。下面我們簡單介紹這些接口。
(1)消息管理接口負(fù)責(zé)消息的發(fā)送/接收功能,在每一個(gè)超步的本地計(jì)算執(zhí)行過程中,并行地發(fā)送和接收消息,并將接收的消息緩存到本地的接收消息隊(duì)列中,在發(fā)送消息隊(duì)列達(dá)到一定規(guī)模的時(shí)候,執(zhí)行Combine操作,然后再將消息發(fā)送給目的節(jié)點(diǎn)。
(2)分區(qū)數(shù)據(jù)管理接口負(fù)責(zé)在進(jìn)行圖數(shù)據(jù)處理之前將待處理的圖數(shù)據(jù)按照一定的原則劃分給各個(gè)任務(wù)。本系統(tǒng)實(shí)現(xiàn)了基于Hash的劃分方法和基于Hash的均衡劃分方法。
(3)圖頂點(diǎn)上下文接口負(fù)責(zé)在任務(wù)處理的一個(gè)超步中,處理每個(gè)圖頂點(diǎn)時(shí)獲取正在處理的圖頂點(diǎn)的相關(guān)屬性信息和方法。
(4)消息合并接口在圖處理過程中,通常以頂點(diǎn)為中心進(jìn)行處理,該接口為了減少在網(wǎng)絡(luò)上傳送的消息數(shù)量,在發(fā)送端對發(fā)給同一個(gè)頂點(diǎn)的消息進(jìn)行合并。
(5)聚集計(jì)算接口許多圖處理/機(jī)器學(xué)習(xí)算法中需要聚集計(jì)算,實(shí)現(xiàn)該接口可進(jìn)行超步間的聚集值計(jì)算。
(6)數(shù)據(jù)輸入輸出接口包括輸入接口和輸出接口,用于實(shí)現(xiàn)將數(shù)據(jù)從指定數(shù)據(jù)存儲系統(tǒng)中讀入和寫出。
4 BC-BSP系統(tǒng)的實(shí)現(xiàn)
本節(jié)介紹BC-BSP系統(tǒng)在實(shí)現(xiàn)上的一些主要策略和細(xì)節(jié),主要包括圖數(shù)據(jù)的表示、主節(jié)點(diǎn)控制器、從節(jié)點(diǎn)管理器、本地計(jì)算與消息通信、圖數(shù)據(jù)劃分以及故障恢復(fù)等的實(shí)現(xiàn)。
4.1 主節(jié)點(diǎn)控制器
主控節(jié)點(diǎn)是整個(gè)BC-BSP集群的控制中心,負(fù)責(zé)管理所有的工作節(jié)點(diǎn),監(jiān)控整個(gè)集群的工作狀態(tài),接收各工作節(jié)點(diǎn)的心跳信息并加以處理,完成整個(gè)作業(yè)的全局同步控制,并提供統(tǒng)一的信息查詢接口和作業(yè)提交接口。當(dāng)集群啟動后,主控節(jié)點(diǎn)接收各工作節(jié)點(diǎn)的注冊信息,形成統(tǒng)一的集群資源信息,在運(yùn)行過程中通過心跳信息不斷更新集群資源信息,例如,可用任務(wù)槽數(shù)量。當(dāng)客戶端請求提交作業(yè)時(shí),將其放入作業(yè)等待隊(duì)列,作業(yè)調(diào)度器按照優(yōu)先級加先入先出隊(duì)列(FIFO)的策略調(diào)度作業(yè);而完成一個(gè)作業(yè)的具體任務(wù)的調(diào)度則是按照負(fù)載均衡和數(shù)據(jù)本地化的原則。因?yàn)楸鞠到y(tǒng)中一個(gè)作業(yè)的所有任務(wù)需要同時(shí)運(yùn)行,所以系統(tǒng)中的任務(wù)調(diào)度是采用由BSP主節(jié)點(diǎn)控制器根據(jù)上述原則將任務(wù)依次不斷下推給各個(gè)節(jié)點(diǎn)。
4.2 從節(jié)點(diǎn)管理器
工作節(jié)點(diǎn)是硬件上的計(jì)算單元,系統(tǒng)啟動后,BC-BSP集群的各個(gè)節(jié)點(diǎn)上啟動一個(gè)從節(jié)點(diǎn)管理器(WM)進(jìn)程,負(fù)責(zé)完成具體的任務(wù)啟動和消息通信。每個(gè)工作節(jié)點(diǎn)啟動后,都首先向主控節(jié)點(diǎn)注冊,使自己成為BC-BSP集群中的一員;之后,工作節(jié)點(diǎn)定期向主控節(jié)點(diǎn)發(fā)送心跳信息,匯報(bào)自己的狀態(tài);當(dāng)有新任務(wù)下達(dá)時(shí),工作節(jié)點(diǎn)根據(jù)新任務(wù)的指令,到HDFS上讀取作業(yè)信息并下載到本地文件系統(tǒng);然后創(chuàng)建任務(wù)控制對象和對應(yīng)的執(zhí)行進(jìn)程,接著運(yùn)行任務(wù)。WM為在本節(jié)點(diǎn)上運(yùn)行的每個(gè)作業(yè)建立一個(gè)WorkerAgent對象,用于收集該作業(yè)在本節(jié)點(diǎn)上的各個(gè)任務(wù)的心跳信息、工作狀態(tài)信息等。這樣全局同步采用兩級同步方式,即一個(gè)工作節(jié)點(diǎn)上的屬于同一個(gè)作業(yè)的各個(gè)任務(wù)在本節(jié)點(diǎn)上實(shí)現(xiàn)局部同步,然后再以節(jié)點(diǎn)為單位向Zookeeper注冊實(shí)現(xiàn)全局同步。工作節(jié)點(diǎn)以作業(yè)為單位維護(hù)在本節(jié)點(diǎn)上運(yùn)行的隸屬于同一個(gè)作業(yè)的所有任務(wù),進(jìn)行統(tǒng)一管理,完成各種局部操作,例如本地聚集計(jì)算。
4.3 磁盤輔助的本地計(jì)算和消息通信
任務(wù)模塊是邏輯上的計(jì)算處理單元,稱為一個(gè)任務(wù)。BSP主節(jié)點(diǎn)控制器中的任務(wù)調(diào)度器根據(jù)負(fù)載均衡和數(shù)據(jù)本地化原則將任務(wù)分配到具體的工作節(jié)點(diǎn)上,由WM創(chuàng)建該任務(wù)模塊進(jìn)程。任務(wù)模塊啟動后,首先完成數(shù)據(jù)加載,將需要處理的數(shù)據(jù)分片從存儲介質(zhì)上按照指定的輸入格式讀入本地,并進(jìn)行數(shù)據(jù)劃分。計(jì)算過程中會定期地向WM的WorkerAgent對象發(fā)送心跳信息,報(bào)告任務(wù)的狀態(tài)等信息。
在Pregel系統(tǒng)以及基于它思想的各種實(shí)現(xiàn)中,都假設(shè)集群的處理節(jié)點(diǎn)足夠,使得待處理的數(shù)據(jù)等夠完全存放在內(nèi)存中。但是實(shí)際情況卻不是這樣的:一方面對于一個(gè)給定的待處理數(shù)據(jù)集,用戶很難確定需要幾個(gè)工作節(jié)點(diǎn)才能使得各個(gè)任務(wù)處理的數(shù)據(jù)能夠存放在內(nèi)存中;另一方面,當(dāng)集群規(guī)模有限時(shí),也希望能夠處理相對較大規(guī)模的數(shù)據(jù)。對于系統(tǒng)中發(fā)送(或接收)的消息也是如此。鑒于以上原因,本系統(tǒng)中使用了磁盤臨時(shí)存儲數(shù)據(jù)和消息(也稱之為磁盤暫存),以便能夠處理較大規(guī)模的數(shù)據(jù)。
對于消息數(shù)據(jù),將消息數(shù)據(jù)的內(nèi)存占用比例按照用戶指定的靜態(tài)劃分參數(shù)確定,系統(tǒng)運(yùn)行時(shí)處理各種類型的消息時(shí)內(nèi)存的使用單獨(dú)分配處理,每種類型的消息內(nèi)存占用都具有一個(gè)獨(dú)立的閾值控制。
對于任務(wù)處理的數(shù)據(jù)而言,在迭代計(jì)算過程中常駐磁盤。對于出邊表不變的計(jì)算情況,即不增加也不刪除邊的情形,將頂點(diǎn)的出邊表與頂點(diǎn)的其他在計(jì)算中變化的部分,例如頂點(diǎn)的值或標(biāo)簽等信息,分開存放,但是同樣使用記錄的ID的Hash映射進(jìn)行劃分,如圖3所示。將圖數(shù)據(jù)分開處理的好處在于:每次迭代結(jié)束只需將本次迭代過程中變化的數(shù)據(jù)寫回本地磁盤文件即可,不變的靜態(tài)部分不需要寫回磁盤,同時(shí)也為容錯控制提供了方便。
4.4 圖的頂點(diǎn)類
一個(gè)圖是由頂點(diǎn)集合和邊集合構(gòu)成,因此有頂點(diǎn)類和邊類。本系統(tǒng)中使用鄰接表的方式組織圖數(shù)據(jù)。這樣一個(gè)頂點(diǎn)類中除了頂點(diǎn)本身的屬性之外,還有與之相連的出邊信息,同時(shí)提供了對頂點(diǎn)和邊進(jìn)行操作的方法(見圖4)。
4.5 數(shù)據(jù)劃分
數(shù)據(jù)劃分是BSP計(jì)算與MapReduce計(jì)算不同的地方。前者需要在迭代計(jì)算中能夠定位消息發(fā)送的目的地在哪里。因此,數(shù)據(jù)劃分是將各個(gè)任務(wù)與之綁定的數(shù)據(jù)分片的數(shù)據(jù)從數(shù)據(jù)源讀入,然后利用一定的數(shù)據(jù)劃分原則,例如Hash劃分,將圖數(shù)據(jù)分配給某個(gè)任務(wù),以便形成超步迭代計(jì)算時(shí)的數(shù)據(jù)分區(qū)。
一個(gè)作業(yè)的各個(gè)數(shù)據(jù)分區(qū)大小是否均勻直接影響系統(tǒng)的負(fù)載均衡,但是Hash函數(shù)很難保證各個(gè)分區(qū)大小的均衡。為此,我們采用了多Hash桶合并的劃分方法,以實(shí)現(xiàn)數(shù)據(jù)的近似均衡劃分。合并的原則可以是各個(gè)桶中的對象數(shù)據(jù)盡可能均衡,還可以考慮數(shù)據(jù)的本地性。本系統(tǒng)目前是按照各個(gè)桶中數(shù)據(jù)對象近似均衡為主兼顧本地性的原則進(jìn)行合并。
4.6 容錯機(jī)制
容錯是本分布式處理系統(tǒng)必須考慮的問題。BC-BSP系統(tǒng)中考慮兩類故障:一類是任務(wù)故障,例如任務(wù)進(jìn)程宕掉;另一類是工作節(jié)點(diǎn)故障,例如一個(gè)Worker出現(xiàn)網(wǎng)絡(luò)斷開故障或者磁盤讀寫故障。系統(tǒng)中各個(gè)任務(wù)通過心跳機(jī)制向所在Worker的WM匯報(bào)自己的工作狀態(tài),而各個(gè)工作節(jié)點(diǎn)也是通過心跳機(jī)制定期向BSP主節(jié)點(diǎn)控制器匯報(bào)工作狀態(tài)。
本模塊包括寫檢查點(diǎn)、故障檢測和故障診斷以及故障恢復(fù)等功能。寫檢查點(diǎn)是定期或者人工控制方式將某個(gè)時(shí)刻的作業(yè)運(yùn)行快照保存到分布式文件系統(tǒng),如HDFS;故障檢測與故障診斷是完成故障信息的收集與故障類型的判斷,不同階段的不同類型的故障,采用不同的恢復(fù)機(jī)制。BC-BSP系統(tǒng)實(shí)現(xiàn)了基本的基于檢查點(diǎn)的故障恢復(fù)策略和面向磁盤駐留的多級容錯處理策略。
所謂的面向磁盤駐留的多級容錯處理策略,是利用了本系統(tǒng)的磁盤輔助機(jī)制的一些措施,即將圖數(shù)據(jù)分成不變的常駐磁盤的靜態(tài)部分(例如圖頂點(diǎn)的出邊表)和每次迭代計(jì)算幾乎都會變化的需要寫回磁盤的動態(tài)部分。因此在進(jìn)行系統(tǒng)快照備份時(shí),實(shí)現(xiàn)增量備份,即對圖數(shù)據(jù)的靜態(tài)部分只需要備份一次即可,而每次迭代計(jì)算時(shí)只需增量地備份動態(tài)變化部分。當(dāng)然每次備份時(shí)需要備份本次收到的所有消息。
5 BC-BSP系統(tǒng)應(yīng)用示例
本節(jié)討論使用本系統(tǒng)進(jìn)行圖數(shù)據(jù)的PageRank計(jì)算和多維數(shù)值型數(shù)據(jù)集的k-means聚類分析的示例。在k-means示例中,可以論證BC-BSP系統(tǒng)也可以有效地處理非圖數(shù)據(jù)的數(shù)據(jù)挖掘算法。
5.1 PageRank
使用BC-BSP系統(tǒng)實(shí)現(xiàn)PageRank計(jì)算中,首先將一個(gè)頂點(diǎn)的PageRank值按照一定的規(guī)則(如各個(gè)出邊頂點(diǎn)平分),通過發(fā)送消息的方式發(fā)送給出邊頂點(diǎn),同時(shí)獲得來自入邊頂點(diǎn)的消息;之后按照PageRank算法的PageRank值計(jì)算公式,將一個(gè)頂點(diǎn)的消息值(即PageRank貢獻(xiàn)值)累加,計(jì)算當(dāng)前頂點(diǎn)新的PageRank值。因此用戶可以提供combine方法實(shí)現(xiàn)消息發(fā)送前的合并,再基于頂點(diǎn)的新PageRank值重復(fù)上面的計(jì)算過程,直到滿足收斂條件結(jié)束計(jì)算,并按預(yù)先的用戶配置輸出計(jì)算結(jié)果。
5.2 多維數(shù)值型數(shù)據(jù)集的k-means
聚類
使用BC-BSP系統(tǒng)對多維數(shù)值型數(shù)據(jù)集進(jìn)行k-means聚類,不需要進(jìn)行頂點(diǎn)間的消息傳遞,但是需要利用聚集器計(jì)算新的聚類中心,可以通過各個(gè)聚簇的所有數(shù)據(jù)點(diǎn)的累計(jì)和與累計(jì)數(shù)據(jù)點(diǎn)計(jì)數(shù)兩種聚集器實(shí)現(xiàn)。因此,用戶可以實(shí)現(xiàn)BC-BSP系統(tǒng)提供的staffStartup接口,完成整個(gè)聚類作業(yè)開始之前的聚類中心初始化工作,例如讀取預(yù)先設(shè)定好的存儲在分布式文件中的初始聚類中心,利用系統(tǒng)提供的聚集器接口實(shí)現(xiàn)聚簇內(nèi)數(shù)據(jù)點(diǎn)累計(jì)和與累計(jì)計(jì)數(shù)計(jì)算新的聚類中心,這樣就需要每個(gè)任務(wù)計(jì)算自己任務(wù)內(nèi)的局部累計(jì)和與累計(jì)計(jì)數(shù),然后在BSP主節(jié)點(diǎn)控制器計(jì)算各個(gè)類的總累計(jì)和以及總類內(nèi)數(shù)據(jù)點(diǎn)數(shù),在新的超步開始時(shí)計(jì)算聚集中心。
當(dāng)k-means聚類的k值較?。ɡ鐜资畟€(gè))時(shí),這種利用聚集器的方法是可行的。然而,實(shí)驗(yàn)中我們發(fā)現(xiàn):當(dāng)k值上百或更大時(shí),就會出現(xiàn)異常。這是因?yàn)樾枰騔ookeeper寫的內(nèi)容太多。因?yàn)橄到y(tǒng)框架中聚集器的實(shí)現(xiàn)利用了Zookeeper,所以在實(shí)現(xiàn)k-means聚類時(shí),使用了分布式文件暫存各個(gè)任務(wù)的局部聚集結(jié)果。在執(zhí)行超步計(jì)算前讀取這些臨時(shí)文件,計(jì)算新的聚類中心,可以解決k值較大時(shí)引起的異常問題。
6 BC-BSP系統(tǒng)的實(shí)驗(yàn)
選擇同樣基于BSP模型的Hama[4]和Giraph[5]作為參照比較系統(tǒng),并且使用它們的API實(shí)現(xiàn)了PageRank算法。實(shí)驗(yàn)軟硬件配置是:30個(gè)工作節(jié)點(diǎn),一個(gè)作為控制節(jié)點(diǎn),29個(gè)用作存儲和計(jì)算的工作節(jié)點(diǎn),Java虛擬機(jī)(JVM)的內(nèi)存設(shè)置為2 GB。每個(gè)節(jié)點(diǎn)的配置如下:Intel Core i3-2100雙核中央處理器(CPU)、8 GB雙倍速率同步動態(tài)隨機(jī)存儲器(DDR)3內(nèi)存、500 G/7200 RPM磁盤,安裝了Red Hat Centos 6.0操作系統(tǒng)、JDK1.6.0-30、Hadoop-0.20.2和Zookeeper-3.3.2。統(tǒng)計(jì)了運(yùn)行PageRank 10次迭代的運(yùn)行時(shí)間開銷。
測試數(shù)據(jù)采用不同規(guī)模的真實(shí)數(shù)據(jù)和人工合成數(shù)據(jù);人工合成數(shù)據(jù)集由數(shù)據(jù)生成器生成。實(shí)驗(yàn)中我們選擇了定點(diǎn)規(guī)模不同的5個(gè)真實(shí)數(shù)據(jù)集[8],它們的統(tǒng)計(jì)信息見表1。
6.1 真實(shí)數(shù)據(jù)集測試結(jié)果
利用表1中描述的5個(gè)真實(shí)數(shù)據(jù)集,在Giraph1.0.0的內(nèi)存版(Giraph 1.0.0_MEM)和磁盤版(Giraph 1.0.0_HDD)、Hama 0.6.4和BC-BSP 2.0系統(tǒng)上分別運(yùn)行了PageRank算法,得到了圖5所示的結(jié)果。
由圖5展示的結(jié)果可得出:BC-BSP2.0的性能優(yōu)于另外3個(gè)對比系統(tǒng),總體上比Giraph1.0.0的內(nèi)存版的性能好。
6.2虛擬數(shù)據(jù)集測試結(jié)果
通過測試虛擬數(shù)據(jù)集進(jìn)行系統(tǒng)可擴(kuò)展性的對比,我們可知:數(shù)據(jù)從1 000萬頂點(diǎn)至11 000萬頂點(diǎn),主要用于測試系統(tǒng)的可擴(kuò)展性和計(jì)算性能,平均出度規(guī)模為11.5。
由圖6展示的結(jié)果可得出:圖數(shù)據(jù)的頂點(diǎn)從1 000萬到11 000萬,BC-BSP 2.0在數(shù)據(jù)吞吐量以及在相同數(shù)據(jù)集的處理效率上都要優(yōu)于HAMA-0.6.4,并優(yōu)于GIRAPH-1.0.0_HDD,效率略低于GIRAPH-1.0.0_MEM,但可擴(kuò)展性更好。
7 結(jié)束語
文章描述了在Java語言環(huán)境下基于BSP模型實(shí)現(xiàn)的用于大規(guī)模圖數(shù)據(jù)迭代處理的系統(tǒng)BC-BSP。該系統(tǒng)在Pregel思想的基礎(chǔ)上,實(shí)現(xiàn)了它的基本功能,同時(shí)增加了若干優(yōu)化策略,包括增加了均衡的數(shù)據(jù)劃分策略,使得每個(gè)任務(wù)處理的節(jié)點(diǎn)數(shù)量盡可能相近,圖數(shù)據(jù)處理和消息通信過程中的磁盤暫存使得在計(jì)算節(jié)點(diǎn)及其內(nèi)存資源有限的情況下可以處理較大的數(shù)據(jù),具有更高的可擴(kuò)展性。
盡管在系統(tǒng)開發(fā)過程中已經(jīng)做了大量的優(yōu)化工作,但是系統(tǒng)還有可優(yōu)化的地方。例如,關(guān)于圖數(shù)據(jù)結(jié)構(gòu)的優(yōu)化與改進(jìn):(1)目前不論是圖頂點(diǎn)對象還是邊對象都采用字符串方式存儲,可以改成支持泛型的實(shí)現(xiàn);(2)系統(tǒng)利用寫檢查點(diǎn)機(jī)制實(shí)現(xiàn)了故障恢復(fù),但是對于故障類型的捕獲和診斷還有待進(jìn)一步加強(qiáng);(3)在系統(tǒng)實(shí)現(xiàn)中發(fā)現(xiàn)Java環(huán)境對內(nèi)存的開銷巨大,因此對數(shù)據(jù)結(jié)構(gòu)的設(shè)計(jì)以及使用需要仔細(xì)地斟酌。
致謝
本研究得到東北大學(xué)于戈教授和谷峪副教授的幫助,以及中國移動(蘇州)研發(fā)中心錢嶺博士的支持,謹(jǐn)致謝意!
本系統(tǒng)開發(fā)工作是由東北大學(xué)計(jì)算機(jī)軟件所王志剛博士研究生以及許多已經(jīng)畢業(yè)的研究生共同完成,對他們謹(jǐn)致謝意!
參考文獻(xiàn)
[1] SERGEY B, LARRY P. The Anatomy of a Large-Scale Hypertextual Web Search Engine [J]. Computer Networks and ISDN Systems, 1998, 30(98): 1-7
[2] GUERON M, LLIA R, MARGULIS G. Pregel: A System for Large-Scale Graph Processing [J]. American Journal of Emergency Medicine, 2009, 18(18):135-146
[3] VALIANT L G. Bulk-Synchrony: A Bridging Model for Parallel Computation [J]. Communications of the ACM, 1990, 33(8):103-111
[4] Welcome to Hama Project [EB/OL].[2011-07-13] . http://incubator.apache.org/hama/
[5] AVERY C, CHRISTAN K. Giraph: Large-Scalegraph Processing Infrastructure on Hadoop [EB/OL]. [2011-06-29]. Hadoop Summit 2011, https://github.com/aching/Giraph
[6] LOW Y, BICKSON D, GONZALEZ J, GUESTRIN C, et al. Distributed GraphLab: A Framework for Machine Learning and Data Mining in the Cloud [J]. Proceedings of the VLDB Endowment, 2012, 5(8): 716-727
[7] MAMOU H. An Experimental Comparison of Pregel-Like Graph Processing Systems [C]// Proceedings of Vldb Endowment. USA: ACM 2014: 7(12):1047-1058
[8] Using the Stanford Large Network Dataset Collection [EB/OL], https://snap.stanford.edu /data/index.html