朱子龍,李玲娟
(南京郵電大學(xué) 計(jì)算機(jī)學(xué)院,江蘇 南京 210023)
聚類是將未知數(shù)據(jù)集合分組成由較高相似度的對(duì)象組成的若干類或者簇的過程[1]。聚類算法大致分為劃分方法、層次方法、基于密度的方法、基于網(wǎng)格的方法和基于模型的方法等五類[2-3]。DBSCAN算法作為典型的基于密度的方法,具有聚類速度快、有效處理“噪聲”點(diǎn)并且能夠發(fā)現(xiàn)任意形狀的簇等優(yōu)點(diǎn)[4],因此研究用該算法高效并行化處理海量數(shù)據(jù)具有重要的現(xiàn)實(shí)意義。
Hadoop作為近年來比較流行的大數(shù)據(jù)處理平臺(tái),利用HDFS分布式海量存儲(chǔ)和MapReduce分布式計(jì)算框架并行處理,其每次map計(jì)算過程中產(chǎn)生的中間結(jié)果需要反復(fù)讀寫本地磁盤,在進(jìn)行大量迭代計(jì)算時(shí),MapReduce計(jì)算模型將會(huì)耗費(fèi)大量讀寫時(shí)間[5]。2009年加州大學(xué)伯克利分校創(chuàng)立基于內(nèi)存的Spark大數(shù)據(jù)處理計(jì)算框架,更好地支持交互式查詢和迭代算法,擴(kuò)展了MapReduce計(jì)算框架,并且支持內(nèi)存式存儲(chǔ)和高效的容錯(cuò)機(jī)制。
文中研究了DBSCAN算法在Spark平臺(tái)的并行化實(shí)現(xiàn)方案,并通過實(shí)驗(yàn)驗(yàn)證了方案的高效性。
DBSCAN算法是一種典型的基于密度的聚類算法,它將簇定義為高密度相連的點(diǎn)的最大集合。該算法能夠?qū)⒏呙芏鹊臄?shù)據(jù)區(qū)域分為不同的簇,能夠在具有“噪聲”的數(shù)據(jù)集中識(shí)別出任意形狀的聚類[3]。對(duì)于數(shù)據(jù)量為n的數(shù)據(jù)集合,按照空間索引方法,DBSCAN的計(jì)算復(fù)雜度是O(nlogn),否則其計(jì)算復(fù)雜度為O(n2)。
(1)DBSCAN算法涉及到的定義。
定義1(Eps鄰域):對(duì)指定數(shù)據(jù)集D中以x為圓心的半徑Eps內(nèi)的球形區(qū)域稱為該點(diǎn)x的Eps鄰域。
定義2(核心對(duì)象):D中任意一點(diǎn)x的Eps鄰域內(nèi)包含大于或等于最小數(shù)目MinPts個(gè)對(duì)象,則稱該點(diǎn)x為核心對(duì)象。
定義3(邊界對(duì)象):D中任意一個(gè)不是核心對(duì)象的點(diǎn),當(dāng)其在其他核心對(duì)象的Eps鄰域中時(shí),稱它為邊界對(duì)象。不同核心對(duì)象的Eps鄰域中可能會(huì)有相同的邊界對(duì)象。
定義4(直接密度可達(dá)):如果D中的點(diǎn)y在點(diǎn)x的Eps鄰域中而且點(diǎn)x是核心對(duì)象,則稱點(diǎn)y是從點(diǎn)x關(guān)于參數(shù)Eps和MinPts直接密度可達(dá)的[6]。
定義5(密度可達(dá)):在給定半徑Eps和MinPts的數(shù)據(jù)集D中,存在對(duì)象鏈m1,m2,…,mn,其中m1=x,mn=y,對(duì)mi∈R(1≤i≤n),如果mi+1是從mi直接密度可達(dá)的,則點(diǎn)y是從點(diǎn)x關(guān)于Eps和MinPts密度可達(dá)的。密度可達(dá)關(guān)系是不對(duì)稱的。
定義6(密度相連):如果D中的點(diǎn)x和點(diǎn)y是從點(diǎn)p關(guān)于Eps和MinPts密度可達(dá)的,則稱點(diǎn)x和點(diǎn)y是密度相連的[6]。
定義7(簇):基于密度的DBSCAN算法的簇是密度相連的數(shù)據(jù)點(diǎn)的最大集合[7]。對(duì)給定數(shù)據(jù)集D的任意一個(gè)非空子集R,如果稱之為簇,必須滿足如下條件:
①最大性:任意對(duì)象點(diǎn)x,y∈D,若x∈R,且點(diǎn)y是從x密度可達(dá)的,則y∈R。
②連通性:任意對(duì)象點(diǎn)x,y∈R,則點(diǎn)x,y是密度相連的。
定義8(噪聲):如果D中的某點(diǎn)不被包含在任意簇中,則稱為噪聲。
綜上所述:當(dāng)基于密度聚類時(shí),數(shù)據(jù)集中的簇看作是被低密度區(qū)域分隔開的高密度數(shù)據(jù)區(qū)域[8]。數(shù)據(jù)集中的核心對(duì)象一定屬于某簇,而且密度相連的點(diǎn)在同一簇中。噪聲點(diǎn)是數(shù)據(jù)集中的干擾數(shù)據(jù),會(huì)被舍棄。
(2)DBSCAN算法的基本思想。
首先從給定數(shù)據(jù)對(duì)象集中隨機(jī)選定一個(gè)點(diǎn)x,在該點(diǎn)給定半徑Eps的區(qū)域?qū)ふ揖垲?。如果點(diǎn)x的Eps鄰域內(nèi)至少包含MinPts個(gè)對(duì)象,那么以點(diǎn)x為核心對(duì)象創(chuàng)建一個(gè)新簇,接著反復(fù)基于這些核心對(duì)象查找直接密度可達(dá)的數(shù)據(jù)對(duì)象,查找過程可能會(huì)涉及密度可達(dá)簇的相關(guān)合并[3]。直到?jīng)]有新的點(diǎn)被合并到其他簇時(shí),算法結(jié)束。
(3)DBSCAN算法的具體步驟。
輸入:包含n個(gè)數(shù)據(jù)對(duì)象的數(shù)據(jù)集D={x1,x2,…,xn},半徑Eps和最小對(duì)象數(shù)目MinPts。
輸出:簇集合{R1,R2,…,Rn}。
①輸入待處理數(shù)據(jù)集D后,任意選擇一個(gè)數(shù)據(jù)點(diǎn)x,檢查該對(duì)象的Eps鄰域;
②如果數(shù)據(jù)點(diǎn)x的Eps區(qū)域內(nèi)至少包含最小對(duì)象數(shù)MinPts,則以點(diǎn)x為核心對(duì)象形成新簇并從點(diǎn)x出發(fā)尋找所有密度可達(dá)的數(shù)據(jù)點(diǎn),隨之更新簇;
③如果數(shù)據(jù)點(diǎn)x不是核心對(duì)象,將點(diǎn)x當(dāng)作噪聲點(diǎn)處理;
④repeat以上步驟;
⑤until無新的數(shù)據(jù)點(diǎn)加入任何簇。
不同于Hadoop的MapReduce計(jì)算模型,Spark能夠使job中間結(jié)果保存在內(nèi)存中,減少對(duì)磁盤的大量讀寫操作,從而可以高效低延遲處理大型數(shù)據(jù)集[9-11]。Spark引入了彈性分布式數(shù)據(jù)集(resilient distributed dataset,RDD)概念,實(shí)現(xiàn)了任務(wù)調(diào)度、分發(fā)和處理等,同時(shí)提供了更多計(jì)算模式組件,如SparkSQL、Spark Streaming、MLib和GraghX等,可以適用于多種分布式平臺(tái)場(chǎng)景。
Spark的RDD核心概念,讓其能夠以基本一致的操作方式去處理不同的應(yīng)用場(chǎng)景。RDD本質(zhì)上是一個(gè)不可變只讀的分布式元素集合,每個(gè)RDD包含不同的分區(qū),這些分區(qū)就是多個(gè)dataset片段,它們分別運(yùn)行在不同的集群節(jié)點(diǎn)上可被同時(shí)并行處理。實(shí)際上,Spark并行框架計(jì)算流程就是通過待處理數(shù)據(jù)創(chuàng)建RDD、轉(zhuǎn)化成新的RDD和調(diào)用RDD行動(dòng)操作求值得到結(jié)果[12]。RDD支持兩種操作類型:轉(zhuǎn)化(transformation)和行動(dòng)(action)。其中轉(zhuǎn)化操作是將現(xiàn)有的RDD轉(zhuǎn)化成一個(gè)全新的RDD,Spark中轉(zhuǎn)化操作都是惰性求值,只有在行動(dòng)操作實(shí)際用到這些RDD時(shí)才會(huì)被計(jì)算。而行動(dòng)操作會(huì)觸發(fā)實(shí)際計(jì)算,并且向驅(qū)動(dòng)器程序返回結(jié)果或者將結(jié)果存入外部存儲(chǔ)系統(tǒng)中。通常情況下,每個(gè)轉(zhuǎn)化過的RDD會(huì)在對(duì)其執(zhí)行行動(dòng)操作時(shí)被重新計(jì)算,但是大數(shù)據(jù)迭代算法中經(jīng)常會(huì)多次使用同一RDD,為避免多次調(diào)用行動(dòng)操作對(duì)同一RDD計(jì)算而帶來的開銷,Spark支持對(duì)RDD進(jìn)行持久化,計(jì)算出RDD的節(jié)點(diǎn)會(huì)分別保存其所求得的分區(qū)數(shù)據(jù)。
Spark在分布式環(huán)境中采用了主從結(jié)構(gòu)計(jì)算模型,其中包含驅(qū)動(dòng)器(driver)節(jié)點(diǎn)和執(zhí)行節(jié)點(diǎn)(見圖1)。
圖1 Spark計(jì)算模型
驅(qū)動(dòng)器節(jié)點(diǎn)運(yùn)行Application中的main()方法,創(chuàng)建SparkContext、創(chuàng)建RDD和對(duì)RDD轉(zhuǎn)化、執(zhí)行行動(dòng)操作,它作為應(yīng)用邏輯執(zhí)行起點(diǎn)負(fù)責(zé)將用戶程序轉(zhuǎn)化為多個(gè)物理執(zhí)行單元,然后將其分發(fā)到不同執(zhí)行器節(jié)點(diǎn)執(zhí)行處理,并且會(huì)根據(jù)執(zhí)行器節(jié)點(diǎn)任務(wù)處理情況,負(fù)責(zé)執(zhí)行器節(jié)點(diǎn)的任務(wù)調(diào)度以盡可能地提高效率。執(zhí)行器節(jié)點(diǎn)接收到驅(qū)動(dòng)器節(jié)點(diǎn)指令后,主要負(fù)責(zé)分區(qū)中任務(wù)執(zhí)行并將任務(wù)執(zhí)行結(jié)果反饋給驅(qū)動(dòng)器節(jié)點(diǎn),執(zhí)行器節(jié)點(diǎn)可以為需要緩存的RDD進(jìn)行內(nèi)存式存儲(chǔ),從而可以提高運(yùn)行效率。
DBSCAN算法定義類簇是密度相連點(diǎn)的最大集合,需要反復(fù)迭代尋找核心對(duì)象的密度可達(dá)數(shù)據(jù)點(diǎn)[13],所以可以使用Spark大數(shù)據(jù)計(jì)算框架RDD實(shí)現(xiàn)。在Spark應(yīng)用中一個(gè)驅(qū)動(dòng)器(Driver)程序定義了集群中執(zhí)行器(executor)節(jié)點(diǎn)上的分布數(shù)據(jù)集,并實(shí)現(xiàn)任務(wù)分發(fā)、調(diào)度、執(zhí)行和聚合結(jié)果等操作。文中設(shè)計(jì)的DBSCAN算法并行化方案如下:
(1)配置Spark。
首先,驅(qū)動(dòng)器程序創(chuàng)建SparkConf對(duì)象,配置Spark如何連接到相關(guān)集群中,然后創(chuàng)建SparkContext對(duì)象來連接訪問Spark。如前文所說,Spark并行框架計(jì)算流程實(shí)際上是通過待處理數(shù)據(jù)創(chuàng)建RDD,轉(zhuǎn)化成新的RDD,并調(diào)用RDD行動(dòng)操作求值得到結(jié)果。一般可以通過兩種方式創(chuàng)建RDD:讀取外部文件系統(tǒng)中的數(shù)據(jù)集或者在驅(qū)動(dòng)器程序中對(duì)數(shù)據(jù)集進(jìn)行并行化,同時(shí)Spark支持常用文件格式和文件存儲(chǔ)系統(tǒng),比如HDFS、Amazon S3、HBase等。讀取待處理數(shù)據(jù)集創(chuàng)建RDD后,分發(fā)到集群中各個(gè)執(zhí)行器節(jié)點(diǎn)中,轉(zhuǎn)化為Spark中數(shù)據(jù)塊保存在內(nèi)存或者磁盤中,并通過BlockManager進(jìn)行管理。RDD中partition是邏輯數(shù)據(jù)塊,分別對(duì)應(yīng)BlockManager管理的物理分區(qū)中相應(yīng)的Block。由于Spark采用惰性求值的方式節(jié)省集群中的內(nèi)存使用,只有RDD行動(dòng)操作才能觸發(fā)Job的提交計(jì)算。RDD的Action算子觸發(fā)Job提交,Spark收到Job后生成具有邏輯性的有向循環(huán)圖(RDD DAG),隨后DAGScheduler會(huì)對(duì)DAG進(jìn)行Stage劃分。對(duì)應(yīng)的每個(gè)Stage都會(huì)生成一組Task集合并提交到TaskScheduler中,會(huì)由TaskScheduler將Task調(diào)度分發(fā)到各個(gè)執(zhí)行器節(jié)點(diǎn)的線程池中執(zhí)行。
(2)多執(zhí)行器節(jié)點(diǎn)并行執(zhí)行DBSCAN算法。
DBSCAN算法基于Spark的并行化流程見圖2。
圖2 DBSCAN算法并行化流程
每個(gè)執(zhí)行器節(jié)點(diǎn)通過多線程方式對(duì)其RDD分區(qū)內(nèi)容中的數(shù)據(jù)使用DBSCAN算法進(jìn)行計(jì)算,首先讀取數(shù)據(jù)集形成RDD_1,RDD_1啟動(dòng)Sample算子,逐一隨機(jī)選取某數(shù)據(jù)點(diǎn)x作為起始點(diǎn)并轉(zhuǎn)化為RDD_2。設(shè)計(jì)map函數(shù),計(jì)算點(diǎn)x的Eps鄰域內(nèi)是否包含大于或等于MinPts個(gè)數(shù)據(jù)對(duì)象,以判斷其是否為核心對(duì)象,如果是則形成新簇,生成RDD_3。RDD_3啟動(dòng)collectAsMap算子,將已處理的相同類簇匯總到RDD_4同一數(shù)據(jù)分片中,RDD_4啟動(dòng)reduceByKey算子,尋找從核心對(duì)象出發(fā)直接密度可達(dá)的數(shù)據(jù)點(diǎn),此過程中可能存在密度可達(dá)的類簇合并,重復(fù)迭代,直到無新的數(shù)據(jù)點(diǎn)加入任何簇時(shí)輸出聚類結(jié)果。
執(zhí)行過程中,執(zhí)行器節(jié)點(diǎn)會(huì)將需要緩存的RDD緩存在內(nèi)存中,并將各自處理數(shù)據(jù)匯總到驅(qū)動(dòng)器節(jié)點(diǎn),并再次迭代計(jì)算后終止。此時(shí)驅(qū)動(dòng)器節(jié)點(diǎn)調(diào)用saveAsTextFile算子,將結(jié)果存儲(chǔ)到分布式存儲(chǔ)系統(tǒng)HDFS中。最后,通過調(diào)用SparkContext的stop方法退出Spark應(yīng)用。由于Spark大數(shù)據(jù)計(jì)算框架的核心數(shù)據(jù)模型RDD是分發(fā)到不同executer節(jié)點(diǎn)上并行計(jì)算的,并將中間結(jié)果緩存到內(nèi)存中,因此對(duì)于需要大量迭代計(jì)算的DBSCAN算法可以節(jié)省大量時(shí)間。
為了測(cè)試和分析基于Spark的并行化DBSCAN算法的性能,分別用單機(jī)DBSCAN算法和基于Spark的并行DBSCAN算法對(duì)實(shí)驗(yàn)數(shù)據(jù)集進(jìn)行聚類操作,在不同運(yùn)行模式下,對(duì)聚類效果準(zhǔn)確度和時(shí)間效率進(jìn)行對(duì)比。
實(shí)驗(yàn)搭建的Spark集群包含1臺(tái)驅(qū)動(dòng)器節(jié)點(diǎn),2臺(tái)執(zhí)行器節(jié)點(diǎn)。每個(gè)節(jié)點(diǎn)的CPU為Intel CORE i5-4210H,每個(gè)節(jié)點(diǎn)配有2個(gè)處理器,硬盤數(shù)據(jù)讀寫速度為600.00MB/s,其中驅(qū)動(dòng)器節(jié)點(diǎn)擁有6G運(yùn)行內(nèi)存,執(zhí)行器節(jié)點(diǎn)擁有2G運(yùn)行內(nèi)存。操作系統(tǒng)為centos6.5;Java版本為JDK1.7.0_13;Spark版本為1.6.0;Scala版本為2.10.4。
具體節(jié)點(diǎn)配置見表1。
表1 節(jié)點(diǎn)配置
實(shí)驗(yàn)采用了UCI實(shí)驗(yàn)室提供的Wine數(shù)據(jù)集、Car Evaluation數(shù)據(jù)集和Adult數(shù)據(jù)集[14-15],詳情見表2。
表2 數(shù)據(jù)集
以正確聚類的樣本數(shù)占總樣本數(shù)的百分比作為準(zhǔn)確率,對(duì)DBSCAN算法的聚類結(jié)果進(jìn)行評(píng)估,得到的結(jié)果如表3所示。
表3 準(zhǔn)確度對(duì)比
可以看出,對(duì)3個(gè)數(shù)據(jù)集,基于Spark的并行DBSCAN算法比傳統(tǒng)單機(jī)運(yùn)行模式的DBSCAN算法的聚類準(zhǔn)確率都有所提高。算法準(zhǔn)確度不會(huì)因?yàn)閿?shù)據(jù)分布式處理而影響聚類結(jié)果,說明分布式數(shù)據(jù)處理具有良好的穩(wěn)定性和準(zhǔn)確度。
圖3展示了傳統(tǒng)單節(jié)點(diǎn)DBSCAN算法和基于Spark的并行DBSCAN算法處理不同數(shù)據(jù)集時(shí)的耗時(shí)情況。
圖3 算法運(yùn)行效率對(duì)比
可以看出,在數(shù)據(jù)量比較少的情況下,傳統(tǒng)單節(jié)點(diǎn)DBSCAN算法運(yùn)行時(shí)間比基于Spark的DBSCAN算法運(yùn)行時(shí)間要少,因?yàn)镾park平臺(tái)啟動(dòng)時(shí)的初始化需要消耗一定時(shí)間。伴隨著數(shù)據(jù)量逐漸增大,單節(jié)點(diǎn)DBSCAN算法執(zhí)行時(shí)間漲幅明顯,因?yàn)閿?shù)據(jù)逐步增多所消耗的處理器和內(nèi)存等資源也在逐步增多,單個(gè)工作節(jié)點(diǎn)由于資源限制,運(yùn)行速度會(huì)逐步降低導(dǎo)致消耗更多時(shí)間;相反,基于Spark的DBSCAN算法運(yùn)行時(shí)間隨著數(shù)據(jù)量增加漲幅明顯低于單節(jié)點(diǎn)運(yùn)行模式,此時(shí)分布式集群優(yōu)勢(shì)逐漸顯示出來??梢缘贸鲞@樣的結(jié)論:在處理規(guī)模較大的數(shù)據(jù)時(shí),基于Spark平臺(tái)的DBSCAN算法聚類時(shí)效性更好。
對(duì)DBSCAN算法如何在Spark平臺(tái)進(jìn)行并行化實(shí)現(xiàn)進(jìn)行了研究。通過設(shè)計(jì)該算法在Spark集群中的并行化實(shí)現(xiàn)方案和對(duì)3個(gè)數(shù)據(jù)集進(jìn)行聚類處理,驗(yàn)證了在大規(guī)模的數(shù)據(jù)處理中,基于Spark平臺(tái)的DBSCAN算法并行化能夠有效完成聚類工作,并且具有更高的準(zhǔn)確度和更好的執(zhí)行效率。
參考文獻(xiàn):
[1] HAN Jiawei,KAMBER M.數(shù)據(jù)挖掘概念與技術(shù)[M].范明,譯.北京:機(jī)械工業(yè)出版社,2001:232-236.
[2] 張 麗.無參數(shù)網(wǎng)格聚類算法的研究[D].鄭州:鄭州大學(xué),2009.
[3] 林建仁.聚類算法的研究與應(yīng)用[D].上海:復(fù)旦大學(xué),2007.
[4] CHEN Yixin,TU Li.Density-based clustering for real-time stream data[C]//Proceedings of the13th ACM SIGKDD international conference on knowledge discovery and data mining.San Jose,California,USA:ACM,2007:133-142.
[5] 李 帥,吳 斌,杜修明,等.基于Spark的BIRCH算法并行化的設(shè)計(jì)與實(shí)現(xiàn)[J].計(jì)算機(jī)工程與科學(xué),2017,39(1):35-41.
[6] 張世安.分布式環(huán)境下的數(shù)據(jù)挖掘算法的研究與實(shí)現(xiàn)[D].上海:復(fù)旦大學(xué),2004.
[7] 范 敏,李澤明,石 欣.一種基于區(qū)域中心點(diǎn)的聚類算法[J].計(jì)算機(jī)工程與科學(xué),2014,36(9):1817-1822.
[8] 高 兵.基于密度的數(shù)據(jù)流聚類方法研究[D].哈爾濱:哈爾濱工程大學(xué),2014.
[9] 張 泉.面向云計(jì)算數(shù)據(jù)中心的存儲(chǔ)服務(wù)質(zhì)量技術(shù)研究[D].武漢:華中科技大學(xué),2014.
[10] JAIN A K.Data clustering:50years beyond K-Means[J].Pattern Recognition Letters,2010,31(8):651-666.
[11] PAN Donghua,ZHAO Lilei.Uncertain data cluster based on DBSCAN[C]//Proceedings of IEEE international conference on multimedia technology.Hangzhou,China:IEEE,2011:3781-3784.
[12] KELLNER D,KLAPPSTEIN J,DIETMAYER K.Grid-based DBSCAN for clustering extended objects in radar data[C]//Intelligent vehicles symposium.Alcala de Henares,Spain:IEEE,2012:365-370.
[13] 唐振坤.基于Spark的機(jī)器學(xué)習(xí)平臺(tái)設(shè)計(jì)與實(shí)現(xiàn)[D].廈門:廈門大學(xué),2014.
[14] 羅啟福.基于云計(jì)算的DBSCAN算法研究[D].武漢:武漢理工大學(xué),2013.
[15] ZAHARIA M,CHOWDHURY M,FRANKLIN M J,et al.Spark:cluster computing with working sets[C]//Proceedings of the2nd USENIX conference on hot topics in cloud computing.Boston,MA:[s.n.],2010:1765-1773.