本文針對(duì)海量數(shù)據(jù)處理過程中的處理速度、存儲(chǔ)空間、容錯(cuò)性、訪問時(shí)間等方面存在的問題,通過對(duì)Google MapReduce編程模型的原理、執(zhí)行流程等進(jìn)行分析研究,介紹5種主要的MapReduce實(shí)現(xiàn)框架:Hadoop MapReduce、Apache Spark、Phoenix、Disco、Mars,以期對(duì)MapReduce編程模型在行業(yè)內(nèi)的使用前景有一個(gè)較全面的認(rèn)識(shí)。
MapReduce介紹
海量數(shù)據(jù)的處理對(duì)服務(wù)器CPU、I/O的吞吐都是嚴(yán)峻的考驗(yàn),傳統(tǒng)的技術(shù)架構(gòu)和僅靠單臺(tái)計(jì)算機(jī)基于串行的方式越來(lái)越不能適應(yīng)當(dāng)前海量數(shù)據(jù)處理的要求。只有將這些計(jì)算進(jìn)行并行化處理,通過提取出處理過程中存在的可并行工作的分量,用分布式模型來(lái)實(shí)現(xiàn)這些并行分量的并行執(zhí)行過程。
MapReduce是由谷歌推出的一個(gè)編程模型,是一個(gè)能處理和生成超大規(guī)模數(shù)據(jù)集的算法模型,該架構(gòu)能夠在大量普通配置的計(jì)算機(jī)上實(shí)現(xiàn)并行化處理。
MapReduce編程模型結(jié)合用戶實(shí)現(xiàn)的Map和Reduce函數(shù)。用戶自定義的Map函數(shù)處理一個(gè)輸入的基于key/value pair的集合,輸出中間基于key/value pair的集合,MapReduce庫(kù)把中間所有具有相同key值的value值集合在一起后傳遞給Reduce函數(shù),用戶自定義的Reduce函數(shù)合并所有具有相同key值的value值,形成一個(gè)較小value值的集合。一般地,一個(gè)典型的MapReduce程序的執(zhí)行流程如圖1所示。
MapReduce執(zhí)行過程主要包括:
將輸入的海量數(shù)據(jù)切片分給不同的機(jī)器處理;
執(zhí)行Map任務(wù)的Worker將輸入數(shù)據(jù)解析成key/value pair,用戶定義的Map函數(shù)把輸入的key/value pair轉(zhuǎn)成中間形式的key/value pair;
按照key值對(duì)中間形式的key/value進(jìn)行排序、聚合;
把不同的key值和相應(yīng)的value集分配給不同的機(jī)器,完成Reduce運(yùn)算;
任務(wù)成功完成后,MapReduce的輸出存放在R個(gè)輸出文件中,一般情況下,這R個(gè)輸出文件不需要合并成一個(gè)文件,而是作為另外一個(gè)MapReduce的輸入,或者在另一個(gè)可處理多個(gè)分割文件的分布式應(yīng)用中使用。
MapReduce主要框架介紹
Hadoop MapReduce是一個(gè)在計(jì)算機(jī)集群上分布式處理海量數(shù)據(jù)集的軟件框架,包括一個(gè)JobTracker和一定數(shù)量的TaskTracker。用戶將MapReduce作業(yè)發(fā)送給Jobtracker所在集群的其他機(jī)器上分割工作,集群中其他機(jī)器執(zhí)行Tasktracker的Map或Reduce任務(wù)。
Spark是一個(gè)基于內(nèi)存計(jì)算的開源的集群計(jì)算系統(tǒng),目的是讓數(shù)據(jù)分析更加快速。Spark非常小巧玲瓏,由加州伯克利大學(xué)AMP實(shí)驗(yàn)室的Matei為主的小團(tuán)隊(duì)所開發(fā)。使用的語(yǔ)言是Scala,項(xiàng)目的核心core部分的代碼只有63個(gè)Scala文件,非常短小精悍。Spark 啟用了內(nèi)存分布數(shù)據(jù)集,除了能夠提供交互式查詢外,它還可以優(yōu)化迭代工作負(fù)載。
Phoenix作為斯坦福大學(xué)EE382a課程的一類項(xiàng)目,由斯坦福大學(xué)計(jì)算機(jī)系統(tǒng)實(shí)驗(yàn)室開發(fā)。Phoenix對(duì)MapReduce的實(shí)現(xiàn)原則和最初由Google實(shí)現(xiàn)的MapReduce基本相同。不同的是,它在集群中以實(shí)現(xiàn)共享內(nèi)存系統(tǒng)為目的,共享內(nèi)存能最小化由任務(wù)派生和數(shù)據(jù)間的通信所造成的間接成本。Phoenix可編程多核芯片或共享內(nèi)存多核處理器(SMPs和ccNUMAs),用于數(shù)據(jù)密集型任務(wù)處理。
Disco是由Nokia研究中心開發(fā)的,基于MapReduce的分布式數(shù)據(jù)處理框架,核心部分由Erlang語(yǔ)言開發(fā),外部編程接口為Python語(yǔ)言。Disco是一個(gè)開放源代碼的大規(guī)模數(shù)據(jù)分析平臺(tái),支持大數(shù)據(jù)集的并行計(jì)算,能運(yùn)行在不可靠的集群計(jì)算機(jī)上。Disco可部署在集群和多核計(jì)算機(jī)上,還可部署在Amazon EC2 上。
Mars是香港科技大學(xué)與微軟、新浪合作開發(fā)的基于GPU的MapReduce框架。目前已經(jīng)包含字符串匹配、矩陣乘法、倒排索引、字詞統(tǒng)計(jì)、網(wǎng)頁(yè)訪問排名、網(wǎng)頁(yè)訪問計(jì)數(shù)、相似性評(píng)估和K均值等8項(xiàng)應(yīng)用,能夠在32位與64位的Linux平臺(tái)上運(yùn)行。
針對(duì)5種框架的特點(diǎn)筆者進(jìn)行了如下分類。
編程語(yǔ)言
Hadoop MapReduce: Hadoop采用Java開發(fā),所以能很好地支持Java語(yǔ)言編寫的MapReduce作業(yè),如果采用C/C++或其他語(yǔ)言編寫MapReduce作業(yè),需要用到Hadoop Streaming或Hadoop Pipes工具;
Spark:Spark 是在 Scala 語(yǔ)言中實(shí)現(xiàn)的,它將 Scala 用作其應(yīng)用程序框架。與 Hadoop 不同,Spark 和 Scala 能夠緊密集成,其中的 Scala 可以像操作本地集合對(duì)象一樣輕松地操作分布式數(shù)據(jù)集;
Phoenix:采用全C++編寫,總代碼量不超過1萬(wàn)行,提供C和C++的應(yīng)用程序接口;
Disco:核心部分采用并發(fā)性能很高的Erlang語(yǔ)言開發(fā),其外部編程接口為易于編程的Python語(yǔ)言;
Mars:采用C++編寫,提供C、C++的應(yīng)用程序編程接口,支持最新的CUDA SDK。
構(gòu)建平臺(tái)
Hadoop MapReduce:需要首先架構(gòu)基于Hadoop的集群系統(tǒng),通過HDFS完成運(yùn)算的數(shù)據(jù)存儲(chǔ)工作;
Spark:可以的單獨(dú)運(yùn)行,也可以與Hadoop框架完整結(jié)合;
Phoenix:獨(dú)立運(yùn)行,不需要提前部署集群,運(yùn)行時(shí)系統(tǒng)的實(shí)現(xiàn)是建立在PThread之上的,也可方便地移植到其他共享內(nèi)存線程庫(kù)上;
Disco:整個(gè)Disco平臺(tái)由分布式存儲(chǔ)系統(tǒng)DDFS和MapReduce框架組成,DDFS與計(jì)算框架高度耦合,通過監(jiān)控各個(gè)節(jié)點(diǎn)上的磁盤使用情況進(jìn)行負(fù)載均衡;
Mars:運(yùn)行時(shí)為Map或Reduce任務(wù)初始化大量的GPU線程,并為每個(gè)線程自動(dòng)分配少量的key/value對(duì)來(lái)運(yùn)行任務(wù)。
功能特點(diǎn)
Hadoop MapReduce:計(jì)算能力非常強(qiáng),適合超大數(shù)據(jù)集的應(yīng)用程序,但是由于系統(tǒng)開銷等原因,處理小規(guī)模數(shù)據(jù)的速度不一定比串行程序快,并且本身集群的穩(wěn)定性不高;
Spark:在保證容錯(cuò)的前提下,用內(nèi)存來(lái)承載工作集,內(nèi)存的存取速度快于磁盤多個(gè)數(shù)量級(jí),從而可以極大提升性能;
Phoenix:利用共享內(nèi)存緩沖區(qū)實(shí)現(xiàn)通信,從而避免了因數(shù)據(jù)復(fù)制產(chǎn)生的開銷,但Phoenix也存在不能自動(dòng)執(zhí)行迭代計(jì)算、沒有高效的錯(cuò)誤發(fā)現(xiàn)機(jī)制等不足;
Disco:由一個(gè)Master服務(wù)器和一系列Worker節(jié)點(diǎn)組成,Master和Worker之間采用基于輪詢的通信機(jī)制,通過HTTP的方式傳輸數(shù)據(jù)。輪詢的時(shí)間間隔不好確定,若時(shí)間間隔設(shè)置不當(dāng),會(huì)顯著降低程序的執(zhí)行性能;
Mars:由于GPU線程不支持運(yùn)行時(shí)動(dòng)態(tài)調(diào)度,所以給每個(gè)GPU線程分配的任務(wù)是固定的,若輸入數(shù)據(jù)劃分布均勻,將導(dǎo)致Map或Reduce階段的負(fù)載不均衡,使得整個(gè)系統(tǒng)性能急劇降低。同時(shí)由于GPU不支持運(yùn)行時(shí)在設(shè)備內(nèi)存中分配空間,需要預(yù)先在設(shè)備內(nèi)存中分配好輸入數(shù)據(jù)和輸出數(shù)據(jù)的存放空間,但是Map和Reduce階段輸出數(shù)據(jù)大小是未知的,并且當(dāng)多個(gè)GPU線程同時(shí)向共享輸出區(qū)域中寫數(shù)據(jù)時(shí),易造成寫沖突。
五類實(shí)現(xiàn)框架對(duì)海量文本數(shù)據(jù)的統(tǒng)計(jì)實(shí)驗(yàn)
單詞計(jì)數(shù)(WordCount)是最簡(jiǎn)單也是最能體現(xiàn)MapReduce思想的程序之一,可以稱為MapReduce版“Hello World”。單詞計(jì)數(shù)主要完成功能是:統(tǒng)計(jì)一系列文本文件中每個(gè)單詞出現(xiàn)的次數(shù)。
WordCount的實(shí)現(xiàn)步驟:
1、將文件拆分成splits,由于測(cè)試用的文件較小,所以每個(gè)文件為一個(gè)split,并將文件按行分割形成對(duì),如圖2-1所示。這一步由MapReduce框架自動(dòng)完成,其中偏移量(即key值)包括了回車所占的字符數(shù)(Windows和Linux環(huán)境會(huì)不同)。
2、將分割好的對(duì)交給用戶定義的map方法進(jìn)行處理,生成新的對(duì)。
3、得到map方法輸出的對(duì)后,Mapper會(huì)將它們按照key值進(jìn)行排序,并執(zhí)行Combine過程,將key至相同value值累加,得到Mapper的最終輸出結(jié)果。
4、Reducer先對(duì)從Mapper接收的數(shù)據(jù)進(jìn)行排序,再交由用戶自定義的reduce方法進(jìn)行處理,得到新的對(duì),并作為WordCount的輸出結(jié)果。
本次實(shí)驗(yàn)的硬件資源基于x86服務(wù)器1臺(tái),配置內(nèi)存為32GB DDR3,E5 CPU/12核,GPU。實(shí)驗(yàn)數(shù)據(jù)樣本為10M/50M/100M/500M/1000M的文本文件五個(gè),我們使用Hadoop MapReduce、Spark、Phoenix、Disco、Mars等MapReduce框架分別運(yùn)行文本分析程序,基于結(jié)果一致的前提下統(tǒng)計(jì)出運(yùn)行時(shí)間、運(yùn)行時(shí)CPU占有率、運(yùn)行時(shí)內(nèi)存占有率等數(shù)據(jù),并采用這些數(shù)據(jù)繪制成柱狀圖。
Hadoop MapReduce的運(yùn)行時(shí)間最長(zhǎng),原因是Hadoop生態(tài)環(huán)境包含內(nèi)容過多,所以每次任務(wù)啟動(dòng)時(shí)首先需要加載所需資源包,然后緩慢地發(fā)起任務(wù),并且由于本身是用性能較差的Java語(yǔ)言編寫的,所以導(dǎo)致整體計(jì)算時(shí)間長(zhǎng)、性能差。Phoenix由于采用匯編和C語(yǔ)言編寫,內(nèi)核很小,運(yùn)行時(shí)所用資源很少,所以整個(gè)測(cè)試過程耗時(shí)也較少。Mars由于必須在GPU上運(yùn)行,本身GPU由于價(jià)格因素,導(dǎo)致不太可能在實(shí)際應(yīng)用場(chǎng)景里推廣,所以Phoenix的性價(jià)比是最高的。需要時(shí)長(zhǎng)從高到低分別是Hadoop MapReduce、Disco、Spark、Phoenix、Mars。
Hadoop MapReduce、Disco這兩個(gè)框架需要占用的CPU資源在1000M文本處理時(shí)基本到達(dá)最大飽和度(大于90%),Apache Spark的CPU使用率沒有完全伴隨著文本文件增大而大幅上漲,Phoenix和Mars基本控制在性價(jià)比較高的范圍內(nèi)。
Mars和Phoenix使用的內(nèi)存在數(shù)據(jù)量較小時(shí)是最少的,Apache Spark為隨著數(shù)據(jù)量增大而大幅增加,在數(shù)據(jù)量最大時(shí)它對(duì)內(nèi)存的消耗是最小的。Hadoop MapReduce和Disco都需要占用較多的內(nèi)存。
從上面的測(cè)試結(jié)果我們得出,如果用戶只需要處理海量的文本文件,不需要考慮存儲(chǔ)、二次數(shù)據(jù)挖掘等,采用Phoenix是最大性價(jià)比的選擇。如果應(yīng)用程序需要處理的數(shù)據(jù)量非常大,并且客戶希望計(jì)算出的數(shù)據(jù)可以被存儲(chǔ)和二次計(jì)算或數(shù)據(jù)挖掘,那Hadoop MapReduce較好,因?yàn)檎麄€(gè)Hadoop生態(tài)圈龐大,支持很好。Apache Spark由于架構(gòu)層面設(shè)計(jì)不同,所以對(duì)于CPU、內(nèi)存的使用率一直保持較低狀態(tài),它未來(lái)可以用于海量視頻分析用途。
五類實(shí)現(xiàn)框架結(jié)合視頻人臉分析的實(shí)驗(yàn)
安防行業(yè)的并行測(cè)試實(shí)驗(yàn)大多是基于智能視頻分析技術(shù)基礎(chǔ)之上的。智能視頻分析技術(shù)是一種基于人工智能的識(shí)別模式。它綜合了各種高科技研究成果,主要借助智能視頻分析技術(shù)的處理方法,在結(jié)合一些硬件設(shè)施,對(duì)某些對(duì)象(比如人員、車輛等)進(jìn)行研究和處理,形成一種核心算法。
在本次測(cè)試中,我們針對(duì)的是人臉特征抓取實(shí)驗(yàn),即通過對(duì)一段指定錄像分析,提取出錄像中所有出現(xiàn)的人臉圖片的過程。對(duì)錄像中出現(xiàn)的人臉圖片分析過程大致上可以分為三個(gè)階段:取流、解碼及分析、提取物發(fā)送。碼流分析提取服務(wù)即我們本次實(shí)驗(yàn)所需要的三個(gè)階段。
主計(jì)算節(jié)點(diǎn)把錄像文件讀入到內(nèi)存中,將碼流分割為若干個(gè)子塊分發(fā)給從計(jì)算節(jié)點(diǎn)。由于我們采用的是MapReduce框架,所以程序會(huì)自動(dòng)分為若干個(gè)線程執(zhí)行,每個(gè)線程對(duì)應(yīng)一個(gè)Map,每個(gè)Map都會(huì)執(zhí)行解碼、分析、結(jié)果輸出三個(gè)步驟。
我們?cè)趚86機(jī)器上進(jìn)行了本次實(shí)驗(yàn),實(shí)驗(yàn)數(shù)據(jù)是一個(gè)2.66GB大小的包含1092個(gè)人臉的錄像文件,錄像分辨率為1080P。我們通過分別采用不同的MapReduce框架來(lái)運(yùn)行程序,對(duì)程序運(yùn)行結(jié)果進(jìn)行匹配,5個(gè)框架的運(yùn)行結(jié)果完全一致,即抓取出1092個(gè)人臉圖片。我們對(duì)程序運(yùn)行時(shí)間、運(yùn)行過程中CPU使用率、運(yùn)行過程中內(nèi)存使用率做了統(tǒng)計(jì)并生成柱狀圖供參考。本次實(shí)驗(yàn)過程中所使用的人臉檢測(cè)算法是筆者公司圖像處理與智能分析部門自主研發(fā)的算法。
Mars和Phoenix框架處理錄像所需時(shí)間最短,運(yùn)行智能分析程序時(shí)CPU使用率對(duì)于所有框架基本上都達(dá)到最大飽和度(90%以上)。由于所做的實(shí)驗(yàn)是對(duì)碼流進(jìn)行分析,碼流本身需要占用較大的內(nèi)存空間,解碼、分析等處理過程也許要占用內(nèi)存用于存放中間結(jié)果,所以內(nèi)存基本上也達(dá)到最大使用飽和度(90%以上)。綜上所述,CPU和內(nèi)存的使用率在本類實(shí)驗(yàn)過程中不需要過多考慮,最主要的對(duì)比點(diǎn)是運(yùn)行時(shí)間??紤]到Mars必須基于GPU運(yùn)行,并且GPU的價(jià)格較高,所以Phoenix的性價(jià)比更高。Hadoop MapReduce雖然處理時(shí)間最長(zhǎng),但是它具有強(qiáng)大的生態(tài)環(huán)境,利于對(duì)處理結(jié)果數(shù)據(jù)進(jìn)行保存和數(shù)據(jù)挖掘,所以對(duì)于大型公司來(lái)說(shuō)它依然是很好的選擇。Apache Spark雖然在本次實(shí)驗(yàn)中沒有太多亮點(diǎn),但是從各類大數(shù)據(jù)學(xué)術(shù)會(huì)議上得到的反饋較好,它基于內(nèi)存方式的運(yùn)算模式可以幫助處理海量數(shù)據(jù),未來(lái)一定可以在智能分析領(lǐng)域有很大的作為。
結(jié)語(yǔ)
現(xiàn)實(shí)世界很多實(shí)例都可用MapReduce編程模型來(lái)表示,MapReduce作為一個(gè)通用可擴(kuò)展的、高容錯(cuò)性的并行處理模型,可有效地處理海量數(shù)據(jù),不斷地從中分析挖掘出有價(jià)值的信息。MapReduce封裝了并行處理、負(fù)載均衡、容錯(cuò)、數(shù)據(jù)本化等技術(shù)難點(diǎn)細(xì)節(jié)。通過本文的兩例測(cè)試用例可以證明MapReduce 適用于海量文本分析、海量視頻智能分析等安防行業(yè)密切相關(guān)的應(yīng)用場(chǎng)景,諸如行為分析、車牌識(shí)別、人臉抓拍、客流統(tǒng)計(jì)等智能化技術(shù)的應(yīng)用,尤其是對(duì)海量視頻執(zhí)行高并發(fā)處理,可以很好地在平安城市、智慧城市等大型安防項(xiàng)目中落地,為公安機(jī)關(guān)治安管理、案件偵破等提供有力的技術(shù)支持。