趙麗梅 黃小菊 宮學慶
(華東師范大學軟件工程學院 上海 200062)
Apache Spark[1]是一個通用的并行查詢引擎,能夠支持對鍵值數(shù)據(jù)源的數(shù)據(jù)分析處理,擴展鍵值數(shù)據(jù)庫[2-3]大規(guī)模復雜查詢分析的能力,例如對鍵值數(shù)據(jù)源的Join查詢。實際應用場景中常搭建的Spark-over-HBase架構[4],利用Spark查詢引擎支持HBase數(shù)據(jù)源[5-6]的復雜查詢,其存儲層利用HBase集群對海量數(shù)據(jù)進行持久化存儲,計算層采用Spark查詢引擎來執(zhí)行大規(guī)模查詢分析。該架構實現(xiàn)存儲層與計算層分離,解決了HBase數(shù)據(jù)庫僅能通過鍵來進行簡單查詢的不足,擴展了其支持復雜查詢的能力。如圖1所示,該架構底層采用HBase集群來進行數(shù)據(jù)持久化存儲,HBase集群包括一個HMaster主節(jié)點和若干個HRegionServer節(jié)點,利用Zookeeper集群來進行分布式協(xié)調(diào)。數(shù)據(jù)分布存儲在多個HRegionServer節(jié)點,通過主節(jié)點來協(xié)調(diào)各HRegionServer的負載均衡并維護集群的狀態(tài);上層利用Spark集群來實現(xiàn)對于數(shù)據(jù)的并行計算和查詢響應,Spark集群包括一個Master主節(jié)點和若干個Worker節(jié)點,通過主節(jié)點來管理Worker節(jié)點,用戶提交應用程序啟動Driver進程來觸發(fā)集群工作。當接收到一個SQL請求后,啟動Driver進程,Spark查詢引擎生成對應的任務調(diào)度,通過數(shù)據(jù)訪問接口提取HBase中的數(shù)據(jù)到各Worker節(jié)點進行數(shù)據(jù)并行處理,響應查詢請求。
圖1 Spark-over-HBase架構
Spark-over-HBase架構擴展了HBase數(shù)據(jù)庫的大規(guī)模查詢分析能力,但是對于復雜查詢分析中較常用、開銷較大的Join操作,Spark查詢引擎的處理過程仍然有以下兩方面不足:
(1) 網(wǎng)絡傳輸開銷大。根據(jù)文獻[7]可知,因為存儲模式簡單,所以大多數(shù)的鍵值數(shù)據(jù)庫不具備在存儲層進行謂詞下推和投影來過濾數(shù)據(jù),掃描數(shù)據(jù)效率低下。所以,大多情況下Spark查詢引擎需要提取全表數(shù)據(jù)進行處理。對于Join操作,Spark底層實現(xiàn)了Broadcast Hash Join算法、Repartition Join算法[8]。針對兩個大表進行Join的Repartition Join算法,是將兩表的全表數(shù)據(jù)進行Shuffle操作[9]。Spark當前采用的Shuffle策略是Sort Based Shuffle,這涉及大量的磁盤I/O開銷和計算開銷,尤其是網(wǎng)絡傳輸開銷。對于寬表或者網(wǎng)絡通信效率低下的情況,該部分開銷占比更大。
(2) 并行度設置不合理。在硬件資源滿足條件的情況下,并行處理大數(shù)據(jù)量往往能夠極大地提高工作性能。在Spark執(zhí)行Join查詢時,Join執(zhí)行的并行度與進行Shuffle操作時重分區(qū)的個數(shù)有關,即(1)中Reduce任務的個數(shù)。Spark提供spark.sql.shuffle.partitions參數(shù)來設定Shuffle時分區(qū)的個數(shù)。一般情況下,各Join實現(xiàn)算法取該參數(shù)的默認值。同時,用戶也可根據(jù)生產(chǎn)環(huán)境配置手動調(diào)整該參數(shù),但是手動調(diào)整人工代價大且難以做出正確估計,很難給出最佳參數(shù)配置。
如何減少網(wǎng)絡傳輸代價、合理設置Join的并行度,提升現(xiàn)有的Spark-over-HBase平臺下的大表Join性能對于大規(guī)模數(shù)據(jù)查詢分析意義重大。
基于此,本文重點從優(yōu)化算法流程以及動態(tài)設定并行度兩方面來提升Join操作性能,具體的貢獻如下:
(1) 借鑒Semi Join算法[11]的思想,首先提取左表Join列數(shù)據(jù)構建HashMap,利用該HashMap對右表進行過濾,過濾掉右表Join列數(shù)據(jù)不在HashMap中的元組,即不符合連接條件的元組。通過該預處理流程,可以減少右表參與Shuffle操作的數(shù)據(jù)量和進行Join匹配的數(shù)據(jù)量,降低網(wǎng)絡傳輸開銷和相關的CPU開銷。
(2) 考慮集群的CPU核數(shù)配置來動態(tài)設置Join操作的并行度,盡可能充分利用集群資源,提升Join操作效率。
基于以上的優(yōu)化方法,本文進行了理論分析和對比實驗驗證。實驗結果顯示,對于兩張表Join列數(shù)據(jù)不完全匹配的情況,右表與左表匹配數(shù)據(jù)量越少,本文所提方案優(yōu)化效果越明顯。
Join操作是大規(guī)模查詢分析中最常見且開銷最大的操作之一,在Spark查詢引擎中利用Broadcast Hash Join算法、Shuffle Hash Join算法和Sort Merge Join算法來實現(xiàn)。Broadcast Hash Join算法局限性大,主要適用于兩表數(shù)據(jù)量相差極大,且小表數(shù)據(jù)量小于規(guī)定閾值的場景,本文不予討論。后兩種算法均屬于Repartition Join,主要是在進行Join之前對兩表數(shù)據(jù)進行Shuffle操作。Repartition Join主要處理參與Join的兩個表數(shù)據(jù)量都很大的場景,通過Shuffle操作實現(xiàn)兩表分區(qū)數(shù)據(jù)有效匹配,但是Shuffle操作的磁盤I/O開銷、網(wǎng)絡通信開銷和內(nèi)存處理開銷很大。
目前,針對各類查詢引擎進行大表Join操作的查詢優(yōu)化已經(jīng)有了很多的研究。
文獻[12]提出了基于Hadoop框架的大數(shù)據(jù)集的Join優(yōu)化算法,算法主要利用Hadoop的分布式緩沖機制來優(yōu)化MapReduce框架的Reduce Side Join。對于參與Join操作的兩個表,算法先提取出其中一個表的連接屬性,然后利用Bit-map數(shù)據(jù)結構壓縮成小數(shù)據(jù)文件存入磁盤中,通過Hadoop的分布式緩存機制將小數(shù)據(jù)文件傳輸?shù)礁鱾€分布式節(jié)點。然后,在Map階段,利用讀取到的小數(shù)據(jù)文件對另一個表的數(shù)據(jù)進行過濾,過濾掉不在該小數(shù)據(jù)文件中的元組,即不滿足Join連接條件的元組。最后,在Reduce階段將兩表連接屬性值相同的元組執(zhí)行Join操作。該優(yōu)化算法利用其中一個表的連接屬性對另一個表的數(shù)據(jù)進行預過濾,可以減少Shuffle階段的數(shù)據(jù)量,降低網(wǎng)絡傳輸開銷。但是該算法利用Bit-map數(shù)據(jù)結構進行壓縮,對另一個表數(shù)據(jù)過濾時存在一定的誤判率,對數(shù)據(jù)的過濾性不好。而且,該算法需要利用Hadoop的分布式緩存機制將小數(shù)據(jù)文件存入磁盤,額外增加了I/O開銷,影響了最終的Join優(yōu)化效果。
文獻[13]提出了一種基于Bloom Filter數(shù)據(jù)結構的Spark大表等值連接的優(yōu)化算法。該算法首先對兩張數(shù)據(jù)表抽取連接屬性并進行去重,然后利用Bloom Filter數(shù)據(jù)結構對去重后的連接屬性分別進行壓縮得到兩個位數(shù)組,對兩個位數(shù)組進行“與”運算,生成BF位數(shù)組。利用這個BF位數(shù)組再分別對兩張表進行過濾,即過濾掉不符合連接條件的記錄。最后,對過濾后的兩張表執(zhí)行Hash Join算法,得到連接結果。該優(yōu)化算法利用Bloom Filter數(shù)據(jù)結構,同樣是過濾掉兩個表中不符合連接條件的元組,減少Shuffle操作的數(shù)據(jù)量。但是對兩張表的連接屬性進行去重時涉及Shuffle操作,且隨著連接屬性值的增多,該部分開銷隨之增加。而且Bloom Filter的數(shù)據(jù)結構壓縮效率沒有Bit-map數(shù)據(jù)結構好,且為了降低誤判率,位數(shù)組的長度還需適當增加,如何選定合適的誤判率以及對應的位數(shù)組大小仍需進行優(yōu)化。
文獻[14]在Spark平臺上針對大維表的等值連接提出了優(yōu)化算法。算法主要包括以下幾步:(1) 對事實表Fact的連接屬性值Key進行去重,得到無重集FactUK,F(xiàn)actUK中元組不僅包括Key鍵,也包括其在Fact表中的存儲位置。(2) 將FactUK與維表Dim進行預連接,執(zhí)行Partition Join。其中,重分區(qū)的個數(shù)按照Fact和Dim的大小進行動態(tài)設定,并利用一致性哈希算法來進行重分區(qū),避免了因數(shù)據(jù)傾斜產(chǎn)生的連接負載不均的問題,然后在各個分區(qū)上對FactUK和Dim按照Key進行cogroup分組并過濾掉不能匹配上的Key。(3) 將預連接結果按照Fact的分區(qū)號進行重分區(qū),在各個分區(qū)將預連接結果與Fact表通過zipPartition操作進行組裝,返回完整的連接結果。該優(yōu)化算法主要在于結合了Partition Join與Semi Join的優(yōu)勢,對兩表數(shù)據(jù)進行重分區(qū)和預連接,減少了對于事實表全表數(shù)據(jù)進行重分區(qū)的Shuffle開銷,同時也優(yōu)化了連接執(zhí)行的并行度、采用一致性哈希來進行數(shù)據(jù)分區(qū),以此獲得更好的連接性能。但是該算法也增加了對事實表連接屬性值去重的Shuffle開銷,并且該算法假設事實表和維度表的數(shù)據(jù)可以完全緩存到內(nèi)存中,而在實際生產(chǎn)環(huán)境中,很難將所有數(shù)據(jù)完全緩存到內(nèi)存中。
本方案中各分區(qū)的連接算法仍采用Sort Merge Join,主要利用Semi Join思想對參與Join的兩表數(shù)據(jù)進行預處理,并且動態(tài)設定Join操作的并行度,以獲得更好的優(yōu)化效果。假設參與Join的兩個表分別為R表、S表,連接條件為R.A=S.B,其中A為R表key鍵,B為S表列族中的對應列。定義該優(yōu)化方案為Semi Sort Merge Join,其對應的Join執(zhí)行流程如下所示。
基于Semi Join算法進行數(shù)據(jù)預處理的流程如下:
(1) 提取R表Join列。對R表中的元組進行投影,只保留Join列的信息,結果定義為joinSet數(shù)據(jù)集。因為HBase數(shù)據(jù)庫中Key鍵的唯一性,所以該Join列數(shù)據(jù)沒有重復值。并且只提取單列數(shù)據(jù),數(shù)據(jù)量少。
(2) 構建HashMap。對(1)中的joinSet數(shù)據(jù)集構建HashMap。為了盡可能減少HashMap的內(nèi)存開銷,設定HashMap的Key鍵為Join列值,value值統(tǒng)一設定為null。
(3) 過濾S表數(shù)據(jù)。利用(2)中HashMap的Key鍵匹配S表的Join列,對S表數(shù)據(jù)進行過濾,剔除掉S表中B列數(shù)值不包含在HashMap中的元組。
利用Semi Join算法的思想對數(shù)據(jù)進行預處理,在執(zhí)行Join操作前,在內(nèi)存中構建R表Join列數(shù)據(jù)的HashMap,利用該HashMap對S表數(shù)據(jù)進行精確過濾,過濾掉S表數(shù)據(jù)的Join列在R表中沒有相關匹配值的元組,可以減少后續(xù)操作中對S表的無用數(shù)據(jù)進行Shuffle操作的磁盤I/O開銷、網(wǎng)絡開銷和相關的CPU開銷,也減少了后續(xù)參與Sort Merge Join的數(shù)據(jù)量。
對于Repartition Join的實現(xiàn),Shuffle操作就是按照設定的重分區(qū)的個數(shù)對R表和S表數(shù)據(jù)按照Join列數(shù)據(jù)的Hash值進行重分區(qū)。在Spark SQL中,Shuffle操作重分區(qū)的個數(shù)主要由參數(shù)spark.sql.shuffle.partitions決定,默認值是200。因為重分區(qū)的個數(shù)直接關系到Join操作執(zhí)行的并行度,所以合理設置重分區(qū)的個數(shù)尤為重要。如果該數(shù)值設定過小,會導致集群處理性能低且資源利用不合理,未發(fā)揮集群優(yōu)勢;如果設定過大,則網(wǎng)絡連接超負荷、任務調(diào)度開銷大,也不利于提升集群的處理性能。
所以在本文方案中,考慮利用動態(tài)優(yōu)化的思想,在執(zhí)行過程中根據(jù)集群資源來動態(tài)設定重分區(qū)的個數(shù),以此來優(yōu)化Join操作的并行度。定義重分區(qū)個數(shù)為Partitionnum,其計算公式如下:
Partitionnum=w×corenum
(1)
式中:corenum表示集群中executor實例總的CPU核數(shù)。因為集群的計算能力受制于集群中CPU核數(shù)的個數(shù)[15],因此用w×corenum表示集群資源的限制。如果分區(qū)個數(shù)等于corenum,即Join操作的并行度等于corenum,w=1,則可能某些運行較快的任務較早運行完,空閑出相應的CPU核;如果設定w過大,則可能任務調(diào)度過于頻繁,開銷過大。本文設置多組實驗,設定不同的重分區(qū)個數(shù),測試得出w的最優(yōu)值為2。
綜上所述,優(yōu)化后該步驟進行Shuffle操作的具體流程如下所示:(1) 通過配置—num-executors、—executor-cores分別獲取Spark集群中每個節(jié)點上executor的實例數(shù)和每個executor所分配的CPU核數(shù),則集群中executor實例總的CPU核數(shù)為兩者的乘積;(2) 按照式(1)設定spark.sql.shuffle.partitions參數(shù);(3) 利用Spark的Sort Based Shuffle策略對兩表數(shù)據(jù)進行重分區(qū)。
通過Shuffle操作,將兩表的數(shù)據(jù)分為具有相同個數(shù)的多個分區(qū),然后對兩表具有相同分區(qū)號的分區(qū)數(shù)據(jù)進行合并,執(zhí)行Sort Merge Join操作,主要流程如下:(1) 在各分區(qū)上,對兩表數(shù)據(jù)按照相同排序規(guī)則進行排序;(2) 分別順序遍歷兩表數(shù)據(jù),按照Join連接條件進行匹配。
綜上所述,該優(yōu)化方案的整體執(zhí)行計劃如圖2所示。(1) 利用TableScan獲取R表數(shù)據(jù),通過Map映射Join列數(shù)據(jù),并利用CollectAsMap算子構建Join列的HashMap數(shù)據(jù)結構;(2) 利用TableScan獲取S表數(shù)據(jù),通過Map映射好各屬性后,利用R表Join列數(shù)據(jù)的HashMap對S表數(shù)據(jù)進行過濾,過濾掉S表Join列數(shù)據(jù)不在HashMap中的元組;(3) 通過Map映射R表的各屬性;(4) 按照應用配置,計算集群中executor實例總的CPU核數(shù),并根據(jù)Join列的Hash值以及2×corenum對R表和S表數(shù)據(jù)進行重分區(qū);(5) 在對應分區(qū)上分別對兩表數(shù)據(jù)進行Sort Merge Join。
圖2 Semi Sort Merge Join算法執(zhí)行計劃
Spark并行處理框架是基于分布式共享內(nèi)存進行計算處理的,即在任務執(zhí)行過程中,數(shù)據(jù)是緩存在內(nèi)存中進行計算處理的,必要情況下需要將中間結果存入磁盤,例如Shuffle操作。為了簡化分析思路,本節(jié)方案分析不考慮將中間結果存入磁盤的情況,同時假設內(nèi)存充足,所有中間結果可以有效緩存在內(nèi)存中并進行內(nèi)存計算。
本節(jié)的方案分析利用代價模型[16-17]進行代價估算。代價估算基于第2節(jié)中的例子,定義代價估計中各參數(shù)及參數(shù)意義如表1所示。
表1 代價模型中各參數(shù)及意義
為了方便分析,定義Filterability(過濾性)表示通過R表的A列對S表進行過濾后,S表過濾掉的元組數(shù)占原表元組數(shù)的比例,其計算公式如下:
(2)
對于分布式數(shù)據(jù)處理,主要考慮I/O代價、網(wǎng)絡傳輸代價和CPU的計算代價這三方面,總的代價估計如式(3)所示。
異丙托溴銨聯(lián)合布地奈德混懸液霧化吸入治療上呼吸道感染后慢性咳嗽的效果……………………………… 陳衍秋 陳英?。?)335
Costtotal=CostI/O+CostComm+CostCPU
(3)
I/O代價主要考慮讀取參與Join的表所耗費的時間,因為優(yōu)化前后均是對數(shù)據(jù)進行全表掃描,所以該部分代價不進行對比分析。在分布式環(huán)境下,網(wǎng)絡傳輸開銷占比較大,相對而言CPU的計算開銷占比很小,所以本節(jié)重點分析優(yōu)化方案的網(wǎng)絡傳輸代價。優(yōu)化方案的網(wǎng)絡傳輸代價主要包括對R表Join列構建的HashMap的網(wǎng)絡傳輸時間和Shuffle過程中進行數(shù)據(jù)重分區(qū)所耗費的時間,該部分的代價主要與網(wǎng)絡傳輸?shù)臄?shù)據(jù)量有關,數(shù)據(jù)量越大,網(wǎng)絡傳輸代價越大。具體如式(4)所示。
Costcomm=(sizeof(H)×|H|+
max(sizenR,sizenfilterS))×comm
(4)
本文方案主要是在Spark實現(xiàn)的Sort Merge Join算法基礎上進行改進的,在各分區(qū)上仍舊采用Sort Merge Join算法執(zhí)行Join操作,所以本節(jié)主要對比Semi Sort Merge Join算法與Sort Merge Join算法的網(wǎng)絡通信代價,對比情況如下:
Sort Merge Join算法未對數(shù)據(jù)進行過濾,通過TableScan后直接將兩表數(shù)據(jù)進行Shuffle操作,涉及的網(wǎng)絡傳輸代價如式(5)所示。
Costcomm=max(sizenR,sizenS)×comm
(5)
在Semi Sort Merge Join算法中,如果對R表Join列投影后行數(shù)和寬度都相對較小,而右表S表是寬表且行數(shù)較多,則sizeof(H)×|H|的網(wǎng)絡傳輸代價可以忽略不計。此時,過濾性對網(wǎng)絡傳輸代價影響較大,F(xiàn)T越大,其過濾數(shù)據(jù)量越多,網(wǎng)絡傳輸數(shù)據(jù)量越少,對應網(wǎng)絡傳輸代價越小。相較于文獻[12-13],如果誤判率較高,對S表數(shù)據(jù)的過濾性不好,不符合連接條件的數(shù)據(jù)產(chǎn)生的網(wǎng)絡傳輸量就比較大。但是,如果對R表Join列投影后行數(shù)和寬度都相對較大的情況下,sizeof(H)×|H|的網(wǎng)絡傳輸代價不容忽視。例如R表和S表全表匹配的情況下,生成HashMap以及將其進行網(wǎng)絡傳輸?shù)拈_銷會降低Semi Sort Merge Join的執(zhí)行效率。
綜上所述,對于R表和S表數(shù)據(jù)不完全匹配的情況下,Semi Sort Merge Join算法的性能提升主要在于通過R表的Join列構建HashMap后,過濾掉S表不符合連接條件的元組,從而減少Shuffle操作的數(shù)據(jù)量,減少Shuffle write和Shuffle read的開銷。
本實驗使用三臺服務器搭建HBase集群和Spark集群,服務器的硬件配置以及相關的軟件版本如表2所示。
表2 相關配置及軟件版本
本文的實驗均使用TPC-H Benchmark數(shù)據(jù)集[18]。TPC基準是被全球數(shù)據(jù)庫廠商公認的性能評價標準,其中的TPC-H測試基準是一組決策支持基準,可測試系統(tǒng)執(zhí)行復雜、高并發(fā)查詢的能力。
TPC-H數(shù)據(jù)集總共包括8張表,根據(jù)表之間的關聯(lián)性,選擇orders表(訂單表)和lineitem表(訂單明細表)來進行Join查詢性能的實驗。實驗數(shù)據(jù)表使用TPC-H提供的數(shù)據(jù)生成器生成數(shù)據(jù),數(shù)據(jù)大小由比例系數(shù)SF決定的,根據(jù)SF的不同大小生成不同數(shù)量的測試數(shù)據(jù)集。
本節(jié)實驗主要通過兩表Join查詢來測試Spark查詢引擎提供的Sort Merge Join算法的查詢處理性能以及本文方案的性能。實驗主要利用orders表和lineitem表進行等值連接,連接條件為orders.O_ORDERKEY=lineitem.L_ORDERKEY。
本文設置的每組實驗均執(zhí)行5次,實驗結果取平均值。
本節(jié)主要測試優(yōu)化方案中利用Semi Join算法進行數(shù)據(jù)預處理對Sort Merge Join算法的提升效果。實驗對應的lineitem表的行數(shù)為1 200萬,為了測試兩表之間匹配程度對算法的影響,設置orders表的數(shù)據(jù)量分別為1 000、10萬、100萬、150萬、300萬,查詢對應lineitem表中有效匹配元組數(shù)分別為4 000、40萬、400萬、600萬、1 200萬。
實驗的結果如圖3所示。其中橫坐標表示orders表的行數(shù),縱坐標表示查詢執(zhí)行時間,單位為秒。對比兩個算法的執(zhí)行時間,當orders表行數(shù)小于100萬時,可以明顯看到經(jīng)過Semi Join預處理后Join執(zhí)行時間更短。而且,隨著orders表行數(shù)的減少,對應linetime表中有效匹配元組數(shù)逐漸減少,過濾掉的數(shù)據(jù)量逐漸增多,通過Semi Join算法預處理后對于Spark的Sort Merge Join算法的提升效果更加明顯。但是,隨著orders表數(shù)據(jù)量逐漸增加,對應linetime表中有效匹配元組數(shù)也在增加,過濾掉的數(shù)據(jù)量也逐漸減少,利用Semi Join進行預處理反而不利于Sort Merge Join的執(zhí)行,主要是受提取orders表的Join列數(shù)據(jù)構建HashMap的內(nèi)存和網(wǎng)絡傳輸開銷的影響。
圖3 Sort Merge Join算法經(jīng)過Semi Join預處理與未經(jīng)Semi Join算法預處理響應時間對比
所以,Join之前利用Semi Join算法進行數(shù)據(jù)預處理主要適用于左表與右表Join列值不完全匹配的場景。右表與左表Join列值匹配數(shù)越少,過濾掉的右表數(shù)據(jù)量越多,則參與Shuffle操作以及Sort Merge Join的數(shù)據(jù)量越少,對Join操作的性能提升越有幫助,提升效果越顯著。
由4.4節(jié)可知,對于左表與右表Join列值不完全匹配的場景,經(jīng)過Semi Join預處理后,Sort Merge Join算法的提升效率顯著。在此基礎上,進一步測試Join操作的并行度對Sort Merge Join算法的提升效果,即對比本文所定義的Semi Sort Merge Join算法的完整優(yōu)化方法與Sort Merge Join算法的性能。
在本節(jié)實驗中,選用100萬行的orders表和1 200萬行的lineitem表進行實驗。在搭建的Spark集群中,集群Worker節(jié)點數(shù)為2,每個節(jié)點上分配2個executor,對每個executor分配2個核,則集群中的總核數(shù)為8。在實驗設計中,調(diào)整參數(shù)spark.sql.shuffle.partitions來設定shuffle時不同的重分區(qū)個數(shù),從而影響到Join操作的并行度,以此來測試Join操作的并行度對執(zhí)行時間的影響,結果如圖4所示。
圖4 Join操作并行度對查詢時間的影響
可以看出,當設定重分區(qū)個數(shù)為16,即Join操作的并行度為16時,Semi Sort Merge Join算法執(zhí)行時間最短。所以,當設定重分區(qū)個數(shù)為Spark集群的CPU核數(shù)的2倍時,Join執(zhí)行性能最優(yōu)。因此,對于2.2節(jié)中的式(1),設定w系數(shù)為2。
4.4節(jié)中僅測試經(jīng)過Semi Join預處理后對于Sort Merge Join的提升效率,未測試Join操作的并行度優(yōu)化對算法的影響。在100萬行的orders表與1 200萬行的lineitem表進行Join操作的場景中,未經(jīng)過Semi Join預處理的執(zhí)行時間為266 s,經(jīng)過Semi Join預處理的執(zhí)行時間為249 s,優(yōu)化后算法性能提升了6.39%。在本節(jié)中進一步設置好最佳并行度后,Semi Sort Merge Join算法的執(zhí)行時間為222 s,相較于未經(jīng)過Semi Join預處理的Sort Merge Join算法性能提升了16.54%??梢姡ㄟ^動態(tài)設定連接并行度對Join操作的查詢也有很大的幫助。
Spark支持大規(guī)模數(shù)據(jù)處理,對任務進行分布式并行執(zhí)行。但其涉及開銷較大的Join操作,一直是大數(shù)據(jù)查詢分析的瓶頸。本文對Spark現(xiàn)有的大表Join實現(xiàn)算法進行了研究,發(fā)現(xiàn)其未考慮兩表Join列數(shù)據(jù)匹配關系對Shuffle操作的影響。因此,本文基于Semi Join,根據(jù)兩表Join列之間的匹配關系,提出了一種改進的Join實現(xiàn)算法。該算法利用左表Join列數(shù)據(jù)所構建的HashMap對右表數(shù)據(jù)進行過濾,主要適用于兩表Join列數(shù)據(jù)不完全匹配的情況,且右表與左表匹配數(shù)據(jù)量越少,該算法優(yōu)化效果越明顯。