馬晟 劉雅倫 陳曉男 沈漪
摘? 要:城市的發(fā)展使得運(yùn)營(yíng)車(chē)輛日益增長(zhǎng),車(chē)輛調(diào)度愈發(fā)困難,傳統(tǒng)系統(tǒng)無(wú)法滿(mǎn)足現(xiàn)有眾多車(chē)輛的監(jiān)控調(diào)度與運(yùn)營(yíng)。該程序基于大數(shù)據(jù)流處理系統(tǒng),實(shí)現(xiàn)了大批量的車(chē)輛信息監(jiān)測(cè)和實(shí)時(shí)處理以及車(chē)輛的精細(xì)監(jiān)控與軌跡回放。可用于網(wǎng)約車(chē)、公交車(chē)以及貨運(yùn)集團(tuán)的調(diào)度中心進(jìn)行實(shí)時(shí)監(jiān)控和訂單把控,以提高車(chē)輛調(diào)度的靈活性,達(dá)到最優(yōu)調(diào)度、減少成本的效果。
關(guān)鍵詞:SparkStreaming? 大數(shù)據(jù)? 軌跡回放? 交通
中圖分類(lèi)號(hào):TP31? ? ? ?文獻(xiàn)標(biāo)識(shí)碼:A
Abstract: With the development of the city, the number of operating vehicles is increasing, and the vehicle scheduling is becoming more and more difficult. The traditional system can not meet the monitoring, scheduling and operation of many existing vehicles. Based on the large data stream processing system, the program realizes a large number of vehicle information monitoring and real-time processing, as well as vehicle fine monitoring and track playback. It can be used for real-time monitoring and order control in the dispatching center of online car hailing, buses and freight groups, so as to improve the flexibility of vehicle scheduling, achieve optimal scheduling and reduce costs.
Key Words:SparkStreaming;Bigdata;Track playback;Traffic
隨著城市的發(fā)展,運(yùn)營(yíng)車(chē)輛日益增長(zhǎng),車(chē)輛調(diào)度愈發(fā)困難,傳統(tǒng)系統(tǒng)無(wú)法滿(mǎn)足現(xiàn)有眾多車(chē)輛的監(jiān)控調(diào)度與運(yùn)營(yíng)?;诖髷?shù)據(jù)系統(tǒng)的車(chē)輛實(shí)時(shí)監(jiān)控與調(diào)度需求隨著大數(shù)據(jù)技術(shù)的日趨發(fā)展有了實(shí)現(xiàn)的可能。
1? 數(shù)據(jù)處理系統(tǒng)的設(shè)計(jì)
該系統(tǒng)實(shí)現(xiàn)對(duì)海量車(chē)輛軌跡數(shù)據(jù)的采集、存儲(chǔ)、實(shí)時(shí)處理、軌跡回放功能。軌跡數(shù)據(jù)在蓋亞數(shù)據(jù)平臺(tái)申請(qǐng)達(dá)到,編程模擬產(chǎn)生實(shí)時(shí)數(shù)據(jù)流,經(jīng)大數(shù)據(jù)平臺(tái)采集處理存入數(shù)據(jù)庫(kù),然后在前臺(tái)顯示實(shí)時(shí)的車(chē)輛軌跡[1]。
1.1系統(tǒng)整體架構(gòu)
基于系功能需求,該系統(tǒng)的總體設(shè)計(jì)為:先由車(chē)輛端上傳坐標(biāo)數(shù)據(jù)(編程模擬產(chǎn)生),flume多源采集,然后寫(xiě)入kafka的topic,接著通過(guò)SparkStreming實(shí)時(shí)消費(fèi)kafka,再根據(jù)訂單存入redis,最后實(shí)現(xiàn)訂單數(shù)據(jù)列表生成以及訂單車(chē)輛軌跡回放[2]。它異于傳統(tǒng)數(shù)據(jù)系統(tǒng)的地方是:采用大數(shù)據(jù)流處理框架,具有高吞吐率、高負(fù)載、高可用性、實(shí)時(shí)性高的優(yōu)點(diǎn)[3]。整個(gè)系統(tǒng)的邏輯實(shí)現(xiàn)如圖1所示。
1.2數(shù)據(jù)回放模塊設(shè)計(jì)
為了模擬真實(shí)業(yè)務(wù)場(chǎng)景,該程序基于蓋亞平臺(tái)坐標(biāo)數(shù)據(jù)通過(guò)數(shù)據(jù)回放模塊模擬數(shù)據(jù)流產(chǎn)生[4]。使用python讀出坐標(biāo)數(shù)據(jù),用多線程并行輸出,從而模擬實(shí)際場(chǎng)景中車(chē)輛移動(dòng)匯報(bào)的坐標(biāo)打點(diǎn)數(shù)據(jù),達(dá)到采集流數(shù)據(jù)的需求[5]。
核心代碼邏輯如下所示。
#坐標(biāo)數(shù)據(jù)文件寫(xiě)入
def consumer(queue, writer, csv_file):
while True:
line = queue.get()
deal_line(line, writer, csv_file)
queue.task_done()
#流數(shù)據(jù)文件生成
def producer(queue):
with open(‘test.txt’, ‘r’) as f:
for line in f:
queue.put(line)
queue = JoinableQueue(8)
pc = Process(target=producer, args=(queue,))
for _ in range(cpu_count()):
c1 = Process(target=consumer, args=(queue, writer, csv_file))
#等待生產(chǎn)者進(jìn)程全部生成完畢
pc.join()
#等待所有數(shù)據(jù)全部處理完畢
queue.join()
1.3數(shù)據(jù)采集消費(fèi)模塊設(shè)計(jì)
該模塊實(shí)現(xiàn)了通過(guò)flume采集車(chē)輛軌跡流數(shù)據(jù),進(jìn)而推送到消息隊(duì)列kafka中。
首先進(jìn)行flume數(shù)據(jù)采集,在采集過(guò)程中通過(guò)集群形式達(dá)到大數(shù)據(jù)量及多源數(shù)據(jù)采集情況下的負(fù)載均衡及并行采集。設(shè)置flume靜態(tài)攔截器實(shí)現(xiàn)在采集到的數(shù)據(jù)的頭數(shù)據(jù)中插入自定義的key-value鍵值對(duì)以區(qū)分不同數(shù)據(jù)源,主要配置如下:
a1.sources.r1.interceptors.i1.type = static? ? ? ? ? #設(shè)置靜態(tài)攔截器
a1.sources.r1.interceptors.i1.key = type
a1.sources.r1.interceptors.i1.value = test_gps_topic#不同的數(shù)據(jù)源取不同的名稱(chēng)
接著通過(guò)kafka集群接收f(shuō)lume采集的大量數(shù)據(jù),以實(shí)現(xiàn)數(shù)據(jù)高吞吐率、高可用數(shù)據(jù)傳遞以及數(shù)據(jù)的實(shí)時(shí)處理,同時(shí)通過(guò)不同的topic保證不同數(shù)據(jù)流的分區(qū)。
flume監(jiān)聽(tīng)的文件數(shù)據(jù)發(fā)送到此kafka的主題當(dāng)中,主要配置如下:
a1.sinks.k1.topic = test_gps_topic? ? ? ? ? ?#與前面的靜態(tài)攔截器value值配置相一致
1.4數(shù)據(jù)實(shí)時(shí)處理模塊設(shè)計(jì)
該模塊通過(guò)sparkStreaming程序?qū)崿F(xiàn)消費(fèi)kafka中的數(shù)據(jù)存到HBase中,其中的GPS位置經(jīng)緯度信息保存到redis中,存為后續(xù)實(shí)時(shí)監(jiān)控以及軌跡回放的數(shù)據(jù)源[6]。核心邏輯的Scala代碼如下:
//從kafka里消費(fèi)數(shù)據(jù),把經(jīng)緯度信息存到redis
val result: InputDStream[ConsumerRecord[String, String]] = Tools.getStreamingContextFromHBase
(streamingContext,kafkaParams,topics,group,"(.*)gps_topic")
result.foreachRDD(eachRdd =>{
eachRdd.foreachPartition(eachPartition =>{
val connection: Connection = HBaseUtil.getConnection
val jedis: Jedis = JedisUtil.getJedis
eachPartition.foreach(record =>{
Tools.saveToRedis(connection,jedis,record)
})
1.5軌跡回放模塊設(shè)計(jì)
得益于redis內(nèi)存數(shù)據(jù)庫(kù)高性能以及可持久化的穩(wěn)定性,該模塊實(shí)現(xiàn)回放每個(gè)訂單車(chē)輛軌跡同時(shí)并發(fā)實(shí)時(shí)讀取到前端,通過(guò)高德地圖提供的地圖api接口,訂單號(hào)為同一個(gè)key的value坐標(biāo)數(shù)據(jù)軌跡點(diǎn)按時(shí)間順序呈現(xiàn)在地圖上,從而監(jiān)控每條車(chē)輛訂單的車(chē)輛軌跡情況[7]。
2? 實(shí)驗(yàn)驗(yàn)證
2.1 實(shí)驗(yàn)環(huán)境
該次實(shí)驗(yàn)采用了一主二從的CDH集群,機(jī)器配置如圖2所示,集群角色配置如圖3所示。
2.2數(shù)據(jù)集
此次實(shí)驗(yàn)數(shù)據(jù)集來(lái)自滴滴蓋亞數(shù)據(jù)平臺(tái)的開(kāi)放數(shù)據(jù),形如表1所示。
首先是數(shù)據(jù)回放模塊的驗(yàn)證,通過(guò)多線程輸出,flume采集源目錄,數(shù)據(jù)如期以多訂單并發(fā)每秒三條的流數(shù)據(jù)形式生成。其次是數(shù)據(jù)實(shí)時(shí)處理模塊,經(jīng)檢查redis數(shù)據(jù)庫(kù),回放的流數(shù)據(jù)以秒級(jí)單位處理寫(xiě)入到數(shù)據(jù)庫(kù)。最后是數(shù)據(jù)回放模塊的驗(yàn)證,經(jīng)前端程序的讀取,車(chē)輛軌跡坐標(biāo)成功呈現(xiàn)在了高德地圖上。
3 結(jié)語(yǔ)
該系統(tǒng)實(shí)現(xiàn)了大規(guī)模軌跡數(shù)據(jù)的處理,數(shù)據(jù)的吞吐量、延遲性、精準(zhǔn)度已達(dá)到預(yù)期。程序通過(guò)Python模擬車(chē)輛軌跡數(shù)據(jù)流的產(chǎn)生,然后通過(guò)flume和kafka采集消費(fèi)數(shù)據(jù),sparkStreaming處理數(shù)據(jù)流,完成了模擬現(xiàn)實(shí)生活多車(chē)輛多數(shù)據(jù)流場(chǎng)景的數(shù)據(jù)產(chǎn)生、處理與軌跡回放。目前程序還停留在雛形階段,未來(lái)將在耦合度、靈活度上做出提高。
參考文獻(xiàn)
[1]? 楊小潤(rùn).基于深度學(xué)習(xí)的車(chē)輛軌跡特征識(shí)別與分析[D].南京:南京郵電大學(xué),2020.
[2]? 陸鍵,王可,蔣愚明.基于車(chē)輛行駛軌跡的道路不良駕駛行為實(shí)時(shí)辨識(shí)方法[J].交通運(yùn)輸工程學(xué)報(bào),2020,20(6):227-235.
[3] 潘偉博,汪海濤,姜瑛,等.Hadoop集群異常節(jié)點(diǎn)實(shí)時(shí)檢測(cè)與診斷算法[J].陜西理工大學(xué)學(xué)報(bào):自然科學(xué)版,2021,37(4):24-31.
[4]? 鮑裕麟.深度學(xué)習(xí)應(yīng)用場(chǎng)景下的HDFS性能優(yōu)化[D].合肥:中國(guó)科學(xué)技術(shù)大學(xué),2021.
[5] 謝楓,婁靜濤,趙凱,等.基于行為識(shí)別和曲率約束的車(chē)輛軌跡預(yù)測(cè)方法研究[J].汽車(chē)工程,2019,41(9):1036-1042.
[6] 柯杰.基于SparkStreaming日志實(shí)時(shí)監(jiān)測(cè)系統(tǒng)的設(shè)計(jì)與實(shí)現(xiàn)[D].南京:東南大學(xué),2017.
[7] 苗莉.大數(shù)據(jù)云計(jì)算環(huán)境下的數(shù)據(jù)安全[J].科技資訊,2021,19(2):31-33.