李俊麗
(晉中學院信息技術與工程學院 晉中 030619)
在過去的幾十年中,數(shù)據(jù)挖掘任務中使用的數(shù)據(jù)集的維度顯著增加。對于這些領域的研究人員來說,這是一個前所未有的挑戰(zhàn),因為現(xiàn)有的算法在處理這些新的非常高的維度時,并不總是有足夠的響應時間。在20世紀90年代,數(shù)據(jù)的最大維度大約是6000000;在21世紀頭十年,這個數(shù)字增加到超過1600萬;在2010年,它又進一步發(fā)展。隨著上面提到的高維數(shù)據(jù)集的出現(xiàn),現(xiàn)有的離群挖掘方法[1~6]在處理大數(shù)據(jù)時,其效率可能顯著下降甚至不適用,因此預計不會很好地擴展。在過去的十年中,可擴展的分布式編程框架[7~8]已經(jīng)出現(xiàn),用以管理大數(shù)據(jù)。第一個編程模型是 MapReduce[9~10]和它的開源實現(xiàn) Apache Hadoop[11~12]。近幾年來出現(xiàn)了一個新的分布式框架,Apache Spark[13~14],這是一個快速和通用地進行大規(guī)模數(shù)據(jù)處理的平臺。
隨著大數(shù)據(jù)技術的發(fā)展,離群數(shù)據(jù)挖掘也面臨著很大的挑戰(zhàn),傳統(tǒng)分布式文件系統(tǒng)由于自身的限制,實現(xiàn)分布式數(shù)據(jù)挖掘算法非常耗時和消耗磁盤容量,很難滿足大數(shù)據(jù)處理和分析的需要?;趦?nèi)存計算的Spark平臺,天然地適應于大數(shù)據(jù)處理和分析。本文的主要目的是在Spark大數(shù)據(jù)平臺中設計實現(xiàn)一個離群挖掘算法STCS[15]的并行化,從而提高算法性能。
Apache Hadoop是MapReduce的開源實現(xiàn),用于可靠、可伸縮、分布式計算。盡管Hadoop是Ma?pReduce最受歡迎的開源實現(xiàn),但它在很多情況下并不適用,比如在線和迭代計算、高進程間通信模式或內(nèi)存計算等。
近年來,Apache Spark在Hadoop生態(tài)系統(tǒng)中被引入。這個框架的目的是通過使用內(nèi)存原語來實現(xiàn)對大數(shù)據(jù)的更快的分布式計算,這使得它能夠比Hadoop在某些應用程序上運行速度快100倍。這個平臺允許用戶程序?qū)?shù)據(jù)加載到內(nèi)存中并反復計算,使其成為在線和迭代處理(特別是機器學習算法)的一個非常合適的工具,從而簡化了編程任務。
Spark對應用非常廣泛的MapReduce計算模型在速度方面進行了擴展,并有效地對更多計算模式包括交互式查詢和流處理進行支持。Spark是基于一種稱為彈性分布式數(shù)據(jù)集(RDDs)的分布式數(shù)據(jù)結(jié)構(gòu)。RDD是Spark的核心概念,Spark中的RDD是分布式對象的不可變集合。每個RDD被劃分為在集群中的不同節(jié)點上運行的分區(qū)。通過使用RDDs,我們可以實現(xiàn)像Pregel或MapReduce這樣的分布式編程模型,這要歸功于它們的通用性。這些并行數(shù)據(jù)結(jié)構(gòu)還允許程序員在內(nèi)存中保存中間結(jié)果,并管理分區(qū)以優(yōu)化數(shù)據(jù)放置。
在分布式環(huán)境中,Spark集群使用主/從結(jié)構(gòu)。在Spark集群中,有一個節(jié)點負責分布式工作節(jié)點的中央?yún)f(xié)調(diào)和調(diào)度。這個中心協(xié)調(diào)節(jié)點稱為驅(qū)動節(jié)點,相應的工作節(jié)點稱為executor節(jié)點。驅(qū)動節(jié)點可以與大量的executor節(jié)點通信,這些節(jié)點也作為獨立的Java進程運行。驅(qū)動節(jié)點與所有執(zhí)行器節(jié)點一起被稱為Spark應用程序。Spark應用程序通過一個名為Cluster Manager的外部服務在集群中的機器上啟動。Spark附帶的集群管理器稱為獨立集群管理器。Spark還可以運行在兩個大型開源集群管理器YARN和Mesos上。
Spark集群基本工作流程如圖1所示。
圖1 Spark基本工作流程圖
用戶通過客戶端提交作業(yè)給集群,驅(qū)動器節(jié)點將開始初始化操作執(zhí)行環(huán)境(包括任務調(diào)度,作業(yè)階段調(diào)度,等等),作業(yè)被分為多個任務,然后主節(jié)點向集群管理器Cluster Manager申請資源,集群管理器根據(jù)報告的資源使用情況分配資源,Executor負責執(zhí)行具體的任務,最后釋放集群資源直到任務執(zhí)行完成。
Hadoop平臺上實現(xiàn)的算法計算任務需要考慮具體的功能劃分和合并運算結(jié)果。算法會根據(jù)需要分成小任務,每個任務的實現(xiàn)都要使用Map函數(shù)或Reduce函數(shù),算法實現(xiàn)的時候,可能會有多個Map任務和Reduce任務,該任務從磁盤讀取數(shù)據(jù)或中間結(jié)果,然后在磁盤中保存處理結(jié)果,多個Map-Reduce的形式有可能就會形成。以內(nèi)存計算為核心的Spark平臺,其數(shù)據(jù)處理對象為RDD彈性分布式數(shù)據(jù)集,處理數(shù)據(jù)的時候會根據(jù)算法設計動作操作和轉(zhuǎn)換操作來完成任務。
本文除了基于Spark平臺實現(xiàn)算法,還重新設計了算法的框架,為經(jīng)典方法的性能添加了一些新的重要改進,但同時也保留了一些特性。
RDD存儲:當調(diào)用RDD的persist()或cache()方法時,這個RDD的分區(qū)會被存儲到緩存區(qū)中。Spark會根據(jù)spark.storage.memoryFraction限制用來緩存的內(nèi)存占整個JVM堆空間的比例大小。如果超出限制,舊的分區(qū)數(shù)據(jù)會被移出內(nèi)存。
Broadcast:此操作允許在每個節(jié)點上保留給定變量的只讀副本,而不是將副本發(fā)送到每個任務。這通常用于大型永久變量(例如大哈希表)。
我們還從Spark API中使用了一些復雜的操作,如下所示。Spark源碼擴展了MapReduce的思想,以提供更復雜的操作來簡化代碼的并行化。在這里,我們概述那些與方法更相關的內(nèi)容:
mapPartitions:類似于Map,它在每個分區(qū)上獨立運行一個函數(shù)。對于每個分區(qū),將獲取一個元組的迭代器,并生成另一個相同類型的迭代器。
groupByKey:該操作將這些元組在一個值向量(使用shuffle操作)中使用相同的鍵。
sortByKey:合并排序的分布式版本。
我們采取sc.textFile從外部存儲中讀取數(shù)據(jù)集來創(chuàng)建RDD。讀取數(shù)據(jù)源采用的是map轉(zhuǎn)換操作,并且使用split實現(xiàn)了數(shù)據(jù)的分割,然后使用persist將RDD持久化。具體實現(xiàn)代碼:map(lines=>lines.split(‘,’)).persist(StorageLevel.MEMORY_AND_DISK)。讀取數(shù)據(jù)集時,map操作將按行進行,然后使用split方法將每行的數(shù)據(jù)用特定分隔符‘,’并且用persist操作進行數(shù)據(jù)集的緩存,最終通過幾種操作結(jié)合起來就把原來的數(shù)據(jù)集轉(zhuǎn)變?yōu)镽DD數(shù)據(jù)集。
基于Spark平臺的S-STCS算法的實現(xiàn)步驟主要為
步驟一:彈性分布式數(shù)據(jù)集RDD的轉(zhuǎn)換?;赗DD的Spark的并行計算首先要將原始數(shù)據(jù)集轉(zhuǎn)換成RDD,但是數(shù)據(jù)集原來是保存在磁盤里的。所以,第一步首先應該通過sc.textFile將原始數(shù)據(jù)集轉(zhuǎn)換為RDD,并命名為dataRDD。
步驟二:通過map,split和persist對dataRDD進行轉(zhuǎn)換并持久化,命名為datapreRDD。
步驟三:并行計算屬性的信息熵和互信息。信息熵可以反映屬性的冗余程度,互信息反映屬性之間的相關關系。計算屬性的信息熵和互信息是在datapreRDD基礎上計算的,主要使用了map操作。這個操作在算法開始時執(zhí)行一次,然后緩存。這也有助于通過在所有節(jié)點中廣播避免冗余計算。
步驟四:并行計算數(shù)據(jù)對象的離群程度。計算數(shù)據(jù)對象的離群程度是在datapreRDD基礎上計算的,通過map和countBykey操作計算,將結(jié)果RDD此命名為scoreRDD。
步驟五:通過sortBy排序最終確定得分最高的離群點。
實驗所使用集群配備了24個計算節(jié)點,每個節(jié)點配置為Intel E5系列3.7GHz的4核處理器、16G主存,2T硬盤。集群中的所有計算節(jié)點使用SSH協(xié)議通信。集群的軟件配置環(huán)境如表1所示。
表1 集群軟件配置
實驗數(shù)據(jù)集采用UCI機器學習庫中的connect4數(shù)據(jù)集作為測試數(shù)據(jù)集。我們使用這個真實的數(shù)據(jù)集來驗證S-STCS算法。Connect-4數(shù)據(jù)集包含了67557個數(shù)據(jù)對象和42個屬性。下面的實驗將不同的數(shù)據(jù)大小以及不同的節(jié)點數(shù)來設計實驗對比和測試算法性能。
1)可擴展性
本實驗通過在一定數(shù)量的節(jié)點情況下(該實驗節(jié)點數(shù)設定為4個)增加數(shù)據(jù)集的規(guī)模來驗證串行算法STCS和并行算法S-STCS兩種算法的可擴展性。實驗在Connect-4數(shù)據(jù)集的基礎上又構(gòu)建了3組較大的數(shù)據(jù)集 Connect-4(1)、Connect-4(2)、Connect-4(3),分別為 Connect-4數(shù)據(jù)集的 2、4、6倍,實驗結(jié)果如圖2所示。
圖2 不同數(shù)據(jù)集運行時間
由圖2可知,隨著數(shù)據(jù)對象的增加,S-STCS與STCS算法的時間消耗均有所增加,主要原因是數(shù)據(jù)集數(shù)據(jù)對象的增加使得掃描數(shù)據(jù)的時間更長。但與STCS算法相比,基于Spark平臺的S-STCS算法增加的時間并不明顯。隨著數(shù)據(jù)對象的增加,集群上每個計算節(jié)點所分配的數(shù)據(jù)塊也會增加,導致執(zhí)行時間有所增加。但S-STCS算法比STCS算法表現(xiàn)出了更好的可擴展性。
2)加速比
加速比指的是當處理的數(shù)據(jù)集規(guī)模大小相同的時候,通過增加計算節(jié)點的數(shù)量,對并行算法性能的影響。這組實驗數(shù)據(jù)集選擇Connect-4數(shù)據(jù)集,通過將Spark集群中的節(jié)點數(shù)量從4個增加到24個評價并行算法S-STCS的執(zhí)行性能。
圖3 節(jié)點個數(shù)對挖掘效率的影響
由圖3可以看出,當數(shù)據(jù)集固定不變的時候,S-STCS離群挖掘算法的效率會隨著計算節(jié)點數(shù)量的增加有所提高。這就說明Spark集群的計算節(jié)點影響了S-STCS算法的挖掘效率。這主要是因為增加了計算節(jié)點的個數(shù),更多的節(jié)點可以參與到并行挖掘計算中,即集群中用于存儲RDD數(shù)據(jù)集的內(nèi)存空間不斷增大,從而提高了算法的運行效率。但是,由于節(jié)點數(shù)量的增加會增加集群I/O的傳輸消耗,因此挖掘時間并沒有成比例縮短。
本文充分利用Spark平臺對內(nèi)存計算的支持,提出了一種基于Spark平臺的并行離群數(shù)據(jù)挖掘算法S-STCS,用于高維屬性數(shù)據(jù)集的離群挖掘。與大多數(shù)現(xiàn)有的算法不同,該算法基于Spark平臺,其最大的優(yōu)勢在于其將中間結(jié)果保存至內(nèi)存,而不再需要讀寫HDFS分布式文件系統(tǒng),這樣有效地降低I/O成本,并能提高數(shù)據(jù)分析和處理的效率。為了進行性能評價,通過UCI數(shù)據(jù)集對S-STCS算法進行了實驗驗證。結(jié)果表明,本文算法在高維屬性數(shù)據(jù)集上發(fā)現(xiàn)離群數(shù)據(jù)的能力和效率都有所提高。