侯敬儒 吳 晟 李英娜
(昆明理工大學(xué)信息工程與自動(dòng)化學(xué)院 昆明 650500)
聚類分析是在沒(méi)有給定具體分類或者clusters(群組)的情況下,通過(guò)探索性計(jì)算數(shù)據(jù)之間指定屬性上的相識(shí)性的一種無(wú)監(jiān)督統(tǒng)計(jì)分析學(xué)習(xí)的過(guò)程,然后將數(shù)據(jù)劃分為相交或不相交簇[1]。聚類作為數(shù)據(jù)挖掘領(lǐng)域中一種分析工具,已經(jīng)在包括經(jīng)濟(jì)學(xué)、生物學(xué)、信息檢索等許多領(lǐng)域廣泛應(yīng)用[2]。大數(shù)據(jù)時(shí)代背景下,互聯(lián)網(wǎng)信息爆炸式增長(zhǎng),數(shù)據(jù)量、數(shù)據(jù)類型等產(chǎn)生速度越來(lái)越迅猛[3],越來(lái)越大的數(shù)據(jù)規(guī)模等待著聚類分析計(jì)算,而作為一種有效且常用的KMeans均值聚類算法,它單機(jī)執(zhí)行的時(shí)間復(fù)雜度比較高[4],所以其處理能力有一定的局限性。
Hadoop是目前廣泛使用的并行計(jì)算平臺(tái)[5],但Hadoop的MapReduce并不適合迭代式計(jì)算。在Hadoop的MapReduce計(jì)算模型中,一個(gè)任務(wù)只有map映射和reduce化簡(jiǎn)兩個(gè)計(jì)算階段[6],復(fù)雜的計(jì)算任務(wù)需要設(shè)計(jì)為多個(gè)Job共同完成,各個(gè)Job之間的依賴關(guān)系是由MapReduce設(shè)計(jì)開(kāi)發(fā)者自己進(jìn)行管理的;而且map映射階段的中間結(jié)果需要寫到本地磁盤當(dāng)中去,這對(duì)需要多次迭代才能完成的計(jì)算是不適合的。而迭代式計(jì)算在數(shù)據(jù)處理科學(xué)中是經(jīng)常見(jiàn)到的,特別是在Data Mining、Information Retrieval、Machine Learning等領(lǐng)域[7],大部分算法都是通過(guò)多次迭代來(lái)計(jì)算完成的。
新一代分布式大數(shù)據(jù)并行處理框架Spark,彌補(bǔ)了Hadoop在迭代式計(jì)算方面的不足。它是一個(gè)基于內(nèi)存計(jì)算的開(kāi)源集群計(jì)算系統(tǒng),是由加州大學(xué)伯克利分校AMP實(shí)驗(yàn)室使用Scala語(yǔ)言開(kāi)發(fā)的,目前已經(jīng)是Apache的頂級(jí)開(kāi)源項(xiàng)目,是Apache社區(qū)最火熱的項(xiàng)目之一,并且已經(jīng)成為發(fā)展最快的大數(shù)據(jù)處理平臺(tái)之一[8]。文章論述了使用KMeans算法在Spark分布式計(jì)算平臺(tái)的并行實(shí)現(xiàn)原理并且給出了其實(shí)現(xiàn)過(guò)程。
在大數(shù)據(jù)分布式計(jì)算主流技術(shù)中,Hadoop依靠廉價(jià)PC[9]研究設(shè)計(jì)了分布式并行存儲(chǔ)、計(jì)算框架,實(shí)現(xiàn)了分布式集群的高可靠、高吞吐、易擴(kuò)展性,很快發(fā)展成為大數(shù)據(jù)分布式科學(xué)計(jì)算中的重要技術(shù),它主要以分布式存儲(chǔ)框架HDFS和分布式計(jì)算框架MapReduce為核心組件,匯集和派生出許多周圍的數(shù)據(jù)采集、集群管理、數(shù)據(jù)分析應(yīng)用等工具。但是,分布式計(jì)算框架MapReduce在程序runtime時(shí)需要在集群的各個(gè)Worker節(jié)點(diǎn)進(jìn)行data-copy,從而消耗大量時(shí)間在網(wǎng)絡(luò)和磁盤I/O讀寫上,而不是在任務(wù)計(jì)算上。所以,Hadoop的MapReduce架構(gòu)在Stream Computing(流式計(jì)算)和Ad Hoc(即席查詢)等應(yīng)用場(chǎng)景中不太適用[10]。
Apache Spark是一個(gè)分布式計(jì)算框架,旨在簡(jiǎn)化運(yùn)行于計(jì)算機(jī)集群上的并行程序的編寫。該框架對(duì)資源調(diào)度,任務(wù)的提交、執(zhí)行和跟蹤,節(jié)點(diǎn)間的通信以及數(shù)據(jù)并行處理的內(nèi)在底層操作都進(jìn)行了抽象。它提供了一個(gè)更高級(jí)別的API用于處理分布式數(shù)據(jù)。從這方面說(shuō),它與Apache Hadoop的MapReduce分布式計(jì)算框架類似。但在底層架構(gòu)上,Spark與它有所不同。
Spark起源于加利福尼亞大學(xué)伯克利分校AMP實(shí)驗(yàn)室的一個(gè)研究項(xiàng)目[11]。AMP實(shí)驗(yàn)室當(dāng)時(shí)關(guān)注分布式機(jī)器學(xué)習(xí)算法的應(yīng)用情況。因此,Spark從一開(kāi)始便為應(yīng)對(duì)迭代式計(jì)算應(yīng)用的高性能需求而設(shè)計(jì)。在這類應(yīng)用中,相同的數(shù)據(jù)會(huì)被多次訪問(wèn)。該設(shè)計(jì)主要靠利用數(shù)據(jù)集內(nèi)存緩存以及啟動(dòng)任務(wù)時(shí)的低延遲和低系統(tǒng)開(kāi)銷來(lái)實(shí)現(xiàn)高性能。再加上其容錯(cuò)性、靈活的分布式數(shù)據(jù)結(jié)構(gòu)和強(qiáng)大的函數(shù)式編程接口,Spark在各類基于機(jī)器學(xué)習(xí)和迭代式分析的大規(guī)模數(shù)據(jù)處理任務(wù)上有廣泛的應(yīng)用,這也表明了其實(shí)用性。
傳統(tǒng)串行的KMeans均值算法的思想一般是先初始化隨機(jī)給定K個(gè)clusters(簇)中心,將待分類的樣本數(shù)據(jù)點(diǎn)依照最近鄰原則分配到各個(gè)cluster中。之后以一定法則重新計(jì)算各個(gè)cluster的質(zhì)心,以確定新的cluster。不停的循環(huán)計(jì)算,直到cluster的質(zhì)心移動(dòng)距離小于某一個(gè)實(shí)際給定的具體值。
傳統(tǒng)KMeans均值聚類算法“三步走”如下簡(jiǎn)介:
1)尋找待聚類的樣本數(shù)據(jù)點(diǎn)的聚類中心。
2)計(jì)算每個(gè)樣本數(shù)據(jù)點(diǎn)到之前尋找到的聚類中心的距離,將每個(gè)樣本數(shù)據(jù)點(diǎn)聚類到離該樣本數(shù)據(jù)點(diǎn)最近的聚類中去。
3)計(jì)算每個(gè)聚類中所有樣本數(shù)據(jù)點(diǎn)坐標(biāo)的平均值,然后將這個(gè)計(jì)算的平均值作為新的聚類中心。
循環(huán)執(zhí)行第2)、3)步,一直計(jì)算到新的聚類中心移動(dòng)距離小于某個(gè)給定的具體值或者聚類次數(shù)達(dá)到了設(shè)定值為止[12]。
根據(jù)以上對(duì)KMeans算法的描述,同時(shí)結(jié)合Spark平臺(tái)的并行編程模型,設(shè)計(jì)了基于Spark的KMeans并行聚類模型方案,如圖1所示。在程序運(yùn)行之前,將模型訓(xùn)練數(shù)據(jù)集上傳到HDFS中去。Spark集群的任務(wù)調(diào)度系統(tǒng)DAGScheduler的Task-Scheduler則會(huì)為分割后每個(gè)數(shù)據(jù)子集在Executor里創(chuàng)建新job,并利用分布式集群的資源調(diào)度器給相應(yīng)的job分配計(jì)算資源。然后模型在Spark集群上進(jìn)行并行KMeans訓(xùn)練,訓(xùn)練完成后,輸出全局最優(yōu)模型到HDFS分布式文件存儲(chǔ)系統(tǒng)。
圖1 基于Spark的并行KMeans聚類模型
3.2.1生成初始聚類中心點(diǎn)
對(duì)于初始化聚類中心點(diǎn),我們可以在輸入的數(shù)據(jù)集中隨機(jī)地選取k個(gè)點(diǎn)作為中心點(diǎn),但是隨機(jī)選擇初始中心點(diǎn)可能會(huì)造成聚類的結(jié)果和數(shù)據(jù)的實(shí)際分布相差很大。文章使用KMeans++算法選擇聚類初始中心點(diǎn)。
KMeans++算法選擇初始聚類中心點(diǎn)的基本思想為:初始的聚類中心之間的坐標(biāo)距離要盡可能遠(yuǎn),過(guò)程如下。
1)從讀取到的樣本數(shù)據(jù)點(diǎn)中隨機(jī)選擇一個(gè)樣本點(diǎn)作為第一個(gè)聚類中心;
2)對(duì)于讀取到的樣本數(shù)據(jù)集中的每一個(gè)樣本數(shù)據(jù)點(diǎn)x,計(jì)算它與最近聚類中心的坐標(biāo)距離D(x);
3)選擇新的樣本數(shù)據(jù)點(diǎn)作為新的聚類中心,規(guī)則是選擇D(x)較大的樣本數(shù)據(jù)點(diǎn)作為聚類中心;
迭代執(zhí)行第2)、3)步驟,一直等到K個(gè)聚類中心通過(guò)距離計(jì)算被選擇出來(lái);然后用這K個(gè)被選出來(lái)的初始聚類中心開(kāi)始并行運(yùn)行KMeans算法。
從上面的算法描述可以看到,算法的關(guān)鍵是第3)步,如何將D(x)反映到點(diǎn)被選擇的概率上。文章采用算法如下。
1)隨機(jī)從點(diǎn)集D中選擇一個(gè)點(diǎn)作為初始的中心點(diǎn);
2)計(jì)算每一個(gè)點(diǎn)到最近中心點(diǎn)的距離Si,對(duì)所有Si求和得到sum;
3)然后再取一個(gè)隨機(jī)值,用權(quán)重的方式計(jì)算下一個(gè)“種子點(diǎn)”。取隨機(jī)值random(0<random<sum),對(duì)點(diǎn)集 D循環(huán),做random-=Si運(yùn)算,直到random<0,那么點(diǎn)i就是下一個(gè)中心點(diǎn)。
重復(fù)2)和3),直到 K個(gè)聚類中心被選出來(lái)。利用這K個(gè)初始的聚類中心來(lái)運(yùn)行KMeans算法。
3.2.2迭代計(jì)算樣本的中心點(diǎn)
迭代計(jì)算中心點(diǎn)的并行實(shí)現(xiàn):首先計(jì)算每個(gè)樣本屬于哪個(gè)中心點(diǎn),之后采用聚合函數(shù)統(tǒng)計(jì)屬于每個(gè)中心點(diǎn)的樣本值之和以及樣本數(shù)量,然后求得最新中心點(diǎn),并且判斷中心點(diǎn)是否發(fā)生改變。其中對(duì)于計(jì)算樣本屬于哪個(gè)中心點(diǎn),采用一種快速查找、計(jì)算距離的方法,其方法如下。
首先定義lowerBoundOfSqDist距離公式,假設(shè)中心點(diǎn) center是(a1,b1),需要計(jì)算的點(diǎn) point是(a2,b2),那么 lowerBoundOfSqDist是:
可輕易證明lowerBoundOfSqDist將會(huì)小于或等于EuclideanDist,因此在進(jìn)行距離比較的時(shí)候,先計(jì)算很容易計(jì)算的lowerBoundOfSqDist(只需要計(jì)算center、point的L2范數(shù))。如果lowerBoundOfSqDist都不小于之前計(jì)算得到的最小距離bestDistance,那真正的歐氏距離也不可能小于bestDistance了。因此在這種情況下就不需要去計(jì)算歐氏距離了,省去了很多計(jì)算工作。
如果lowerBoundOfSqDist小于bestDistance,則進(jìn)行距離的計(jì)算,調(diào)用 fastSquaredDistance,該方法是一種快速計(jì)算距離的方法。fastSquaredDistance方法會(huì)先計(jì)算一個(gè)精度,有關(guān)精度的計(jì)算:
precisionBound1=2.0*EPSILON*
sumSquaredNorm/(normDiff*normDiff+EPSILON)
如果精度滿足條件,則歐氏距離為EuclideanDist=sumSquaredNorm-2.0*v1.dot(v2),其中sumSquaredNorm為+++,2.0*v1.dot(v2)為2(a1a2+b1b2)。這里可以直接利用之前計(jì)算的L2范數(shù)。如果精度不滿足要求,則進(jìn)行原始的距離計(jì)算,公式為(a1-a2)2+(b1-b2)2。
文章實(shí)驗(yàn)基于QJM(Quorum Journal Manager)下的HA(High Available)大數(shù)據(jù)平臺(tái),其中,以Hadoop的HDFS為基礎(chǔ)存儲(chǔ)框架,主要以Spark為計(jì)算框架,使用Zookeeper統(tǒng)籌HA下的大數(shù)據(jù)平臺(tái),管理整個(gè)集群配置。平臺(tái)包括2個(gè)Master節(jié)點(diǎn)和4個(gè)Worker節(jié)點(diǎn),節(jié)點(diǎn)之間局域網(wǎng)連接;平臺(tái)的資源管理和任務(wù)調(diào)度采用Yarn模式。軟件配置:平臺(tái)搭建使用spark-1.6.3-bin-hadoop2.6和hadoop-2.6.5,Scala選用 2.10.5版本,Java選 用JDK1.8.0_101(Linux版),操作系統(tǒng)為Centos6.5。
文章使用MovieLens數(shù)據(jù)集:Ml-100k(含10萬(wàn)條評(píng)分集)、Ml-1m(含100萬(wàn)條評(píng)分集)、ml-10m100k(含1000萬(wàn)條評(píng)分集)。
4.2.1模型訓(xùn)練過(guò)程分析
文章基于KMeans并行聚類模型編寫的Spark Application主要包含兩個(gè)過(guò)程:1)加載數(shù)據(jù)集;2)模型訓(xùn)練。整個(gè)程序被拆分為33個(gè)job,54個(gè)stage,432個(gè)task。圖2是執(zhí)行過(guò)程中所完成的jobs:包含每一個(gè)由Spark的Action操作觸發(fā)的jo-bid、觸發(fā)job的action操作詳細(xì)信息、job提交時(shí)間、jobs間隔時(shí)間、該job被分為幾個(gè)stage、該job的所有stages中包含的task數(shù)量。
圖2 Spark Application jobs執(zhí)行過(guò)程
基于Spark的KMeans模型訓(xùn)練過(guò)程中的幾個(gè)具有代表性的stage的DAG如圖3所示。從左至右依次為數(shù)據(jù)采樣stage、各個(gè)分區(qū)數(shù)據(jù)聚合stage、collect操作stage、collectAsMap操作stage。
圖3 KMeans模型的Stage-DAG拓?fù)浣Y(jié)構(gòu)
4.2.2模型訓(xùn)練結(jié)果與數(shù)據(jù)partition的關(guān)系
為了評(píng)估模型在不同分區(qū)下訓(xùn)練時(shí)間的變化情況,選用了MovieLens數(shù)據(jù)集的Ml-10m100k(1000萬(wàn))進(jìn)行實(shí)驗(yàn),每個(gè)分區(qū)在數(shù)據(jù)集上的訓(xùn)練時(shí)間取3次訓(xùn)練的平均時(shí)間,最終結(jié)果如圖4所示。
圖4 分區(qū)數(shù)對(duì)運(yùn)行時(shí)間的影響
從圖4可以看到,隨著分區(qū)數(shù)目的增加,模型的訓(xùn)練時(shí)間先是急劇減少隨后緩慢增加。這是由于分區(qū)數(shù)目變大導(dǎo)致算法的并行度提高,雖然降低了分區(qū)的計(jì)算時(shí)間,但同時(shí)也增加了分區(qū)數(shù)據(jù)傳輸?shù)木W(wǎng)絡(luò)通信負(fù)擔(dān)。開(kāi)始時(shí)partition分區(qū)數(shù)量的增加會(huì)明顯降低每個(gè)partition分區(qū)的jobs計(jì)算時(shí)間,進(jìn)而減少整個(gè)程序的運(yùn)行時(shí)間;但是partition分區(qū)數(shù)達(dá)到一定的量以后,單個(gè)partition分區(qū)的jobs計(jì)算時(shí)間趨于穩(wěn)定,而網(wǎng)絡(luò)通信負(fù)擔(dān)則會(huì)隨著partition分區(qū)數(shù)量的增大而迅速增加,所以訓(xùn)練時(shí)間則會(huì)呈現(xiàn)出小幅度增加的態(tài)勢(shì)。
4.2.3算法運(yùn)行時(shí)間對(duì)比
為了評(píng)估算法在不同節(jié)點(diǎn)下訓(xùn)練時(shí)間的變化情況,選用了MovieLens數(shù)據(jù)集的Ml-100k(10萬(wàn))、ml-1m(100萬(wàn))、Ml-10m100k(1000萬(wàn))進(jìn)行實(shí)驗(yàn),最終結(jié)果如圖5所示。
圖5 算法運(yùn)行時(shí)間對(duì)比
圖5記錄對(duì)比了模型在不同規(guī)模數(shù)據(jù)集、不同節(jié)點(diǎn)數(shù)量上的運(yùn)行時(shí)間。當(dāng)數(shù)據(jù)集較小時(shí)單機(jī)(圖5橫軸為0的是單機(jī)版運(yùn)行時(shí)間,橫軸為1的是含有一個(gè)節(jié)點(diǎn)的集群運(yùn)行時(shí)間,以此類推)的KMeans要比在基于Spark集群(一個(gè)worker節(jié)點(diǎn))環(huán)境下運(yùn)行KMeans算法訓(xùn)練時(shí)間少,這是因?yàn)榧河?jì)算需要初始化相關(guān)任務(wù),包括job的新建、資源的分配和調(diào)度,而且不同的任務(wù)之間也需要通信,這都是需要消費(fèi)時(shí)間成本的。隨著數(shù)據(jù)規(guī)模的增大,節(jié)點(diǎn)數(shù)量的增加,基于Spark的并行KMeans模型在集群上運(yùn)行的優(yōu)勢(shì)也就越明顯,產(chǎn)生這種結(jié)果是因?yàn)樵诖笠?guī)模數(shù)據(jù)下訓(xùn)練KMeans聚類模型的任務(wù)初始化和通信時(shí)間占整個(gè)作業(yè)運(yùn)行時(shí)間的比重較小,同時(shí)該并行KMeans算法會(huì)將樣本訓(xùn)練數(shù)據(jù)在加載到集群的時(shí)候就分割為固定大小的Data Block數(shù)據(jù)塊在不同Worker、不同處理器上并行計(jì)算,從而節(jié)省大量訓(xùn)練時(shí)間。
文章基于Spark分布式計(jì)算框架,在基于QJM的HA大數(shù)據(jù)平臺(tái)下,設(shè)計(jì)并實(shí)現(xiàn)了并行KMeans聚類模型,通過(guò)模型訓(xùn)練實(shí)驗(yàn)表明,該分布式聚類模型適合運(yùn)行在大數(shù)據(jù)分布式平臺(tái)下;并且通過(guò)repartition算子分片加載數(shù)據(jù)集優(yōu)化了并行方案。
在接下來(lái)的學(xué)習(xí)中,將對(duì)基于Spark的在線KMeans聚類模型展開(kāi)研究,實(shí)現(xiàn)流數(shù)據(jù)的實(shí)時(shí)KMeans聚類。
[1]劉澤燊,潘志松.基于Spark的并行SVM算法研究[J].計(jì)算機(jī)科學(xué),2016,43(5):238-242.LIU Zeshen,PAN Zhisong.Research on parallel algorithm for the SVM based on Spark[J].Computer science,2016,43(5):238-242.
[2]Alsheikh MA,Niyato D,Lin S等.基于深度學(xué)習(xí)和Spark的移動(dòng)大數(shù)據(jù)分析[J].IEEE 網(wǎng)絡(luò),2016,30(3):22-29.Alsheikh MA,Niyato D,Lin S,etal.Mobile big data analytics using deep learning and apache spark[J].IEEE Network,2016,30(3):22-29.
[3]孟建良,劉德超.一種基于Spark和聚類分析的辨識(shí)電力系統(tǒng)不良數(shù)據(jù)新方法[J].電力系統(tǒng)保護(hù)與控制,2016,44(3):85-91.MENG Jianliang,LIU Dechao.A new method for identifying bad data of power system based on Spark and clustering analysis[J].Power System Protection and Control,2016,44(3):85-91.
[4]Bosagh Zadeh R,Meng X,Ulanov A等.基于Spark的矩陣計(jì)算與優(yōu)化[J].2016:31-38.Bosagh Zadeh R,Meng X,Ulanov A,et al.Matrix Computations and Optimization in Apache Spark[J].2016:31-38.
[5]Marcu O C,Costan A,Antoniu G,等.Spark vs Flink:理解大數(shù)據(jù)分析框架性能[C]//IEEE國(guó)際集群計(jì)算會(huì)議.IEEE計(jì)算機(jī)協(xié)會(huì),2016:433-442.Marcu O C,Costan A,Antoniu G,et al.Spark Versus Flink:Understanding Performance in Big Data Analytics Frameworks[C]//IEEE International Conference on CLUSTER Computing.IEEEComputer Society,2016:433-442.
[6]孫科.基于Spark的機(jī)器學(xué)習(xí)應(yīng)用框架研究與實(shí)現(xiàn)[D].上海:上海交通大學(xué),2015.SUN Ke.Research and implementation ofmachine learning application framework on spark[D].Shanghai:Shanghai Jiao Tong University,2015.
[7]Shkapsky A,Yang M,Interlandi M,等.基于 Spark的Datalog查 詢 大 數(shù) 據(jù) 分 析[C]//國(guó) 際 會(huì) 議 ,2016:1135-1149.Shkapsky A,Yang M,InterlandiM,etal.Big Data Analytics with Datalog Queries on Spark[C]//International Conference,2016:1135-1149.
[8]Liu J,Liang Y,AnsariN.基于Spark的大數(shù)據(jù)處理之大規(guī)模矩陣求逆[J].IEEEAccess,2016,4:1-1.Liu J,Liang Y,Ansari N.Spark-Based Large-Scale Matrix Inversion for Big Data Processing[J].IEEE Access,2016,4:1-1.
[9]寧永恒.基于Spark的若干數(shù)據(jù)挖掘技術(shù)研究[D].杭州:中國(guó)計(jì)量學(xué)院,2015.NING Yongheng.A number of datamining technology research based on the Spark[D].Hangzhou:Journal of China Jiliang University,2015.
[10]黎文陽(yáng).大數(shù)據(jù)處理模型Apache Spark研究[J].現(xiàn)代計(jì)算機(jī):普及版,2015(3):55-60.LIWenyang.Research on Apache Spark for Big Data Processing[J].Modern Computer,2015(3):55-60.
[11]薩初日拉,周國(guó)亮,時(shí)磊,等.Spark環(huán)境下并行立方體計(jì)算方法[J].計(jì)算機(jī)應(yīng)用,2016,36(2):348-352.SACHURILA,ZHOU Guoliang,SHI Lei,等.Parallel cube computing in Spark[J].Journalof Computer Applications,2016,36(2):348-352.
[12]陳夢(mèng)杰,陳勇旭,賈益斌,等.基于Hadoop的大數(shù)據(jù)查詢系統(tǒng)簡(jiǎn)述[J].計(jì)算機(jī)與數(shù)字工程,2013,41(12):1939-1942.CHENMengjie,CHEN Yongxu,JIA Yibin,etal.A Brief Introduction Hadoop—based Big Data Query System[J].Computer&Digital Engineering,2013,41(12):1939-1942.