在上一篇文章中,我們講了Spark大數(shù)據(jù)處理的可擴(kuò)展性和負(fù)載均衡,今天要講的是更為重點(diǎn)的容錯(cuò)處理,這涉及到Spark的應(yīng)用場(chǎng)景和RDD的設(shè)計(jì)來(lái)源。
Spark的應(yīng)用場(chǎng)景
Spark主要針對(duì)兩種場(chǎng)景:
機(jī)器學(xué)習(xí),數(shù)據(jù)挖掘,圖應(yīng)用中常用的迭代算法(每一次迭代對(duì)數(shù)據(jù)執(zhí)行相似的函數(shù))
交互式數(shù)據(jù)挖掘工具(用戶反復(fù)查詢一個(gè)數(shù)據(jù)子集)
Spark在spark-submit外,還提供了spark-shell,它就是專門用來(lái)做交互數(shù)據(jù)挖掘的工具
MapReduce等框架并不明確支持迭代中間結(jié)果/數(shù)據(jù)子集的共享,所以需要將數(shù)據(jù)輸出到磁盤,然后在每次查詢時(shí)重新加載,這帶來(lái)較大的開(kāi)銷。
既然反復(fù)寫磁盤和從磁盤加載數(shù)據(jù)使得性能下降,那就把數(shù)據(jù)放到內(nèi)存中,這就是Spark基于內(nèi)存的彈性分布式數(shù)據(jù)集(RDD)的出發(fā)點(diǎn)。
自動(dòng)容錯(cuò)
MapReduce是容錯(cuò)性非常好的系統(tǒng)。處理一步就放到磁盤,再處理一步又放到磁盤,一旦哪一步有問(wèn)題,重做就好了,真可謂是一步一個(gè)腳印。Spark為了上述場(chǎng)景下的性能,把數(shù)據(jù)放在內(nèi)存中,那整個(gè)系統(tǒng)的容錯(cuò)就成了最困難的地方。
一般來(lái)說(shuō),分布式數(shù)據(jù)集的容錯(cuò)性有兩種方式:即數(shù)據(jù)檢查點(diǎn)和記錄數(shù)據(jù)的更新。由于面向的是大規(guī)模數(shù)據(jù)分析,數(shù)據(jù)檢查點(diǎn)操作成本很高:需要通過(guò)數(shù)據(jù)中心的網(wǎng)絡(luò)連接在機(jī)器之間復(fù)制龐大的數(shù)據(jù)集,而網(wǎng)絡(luò)帶寬往往比內(nèi)存帶寬低得多,同時(shí)還需要消耗更多的存儲(chǔ)資源(在內(nèi)存中復(fù)制數(shù)據(jù)可以減少需要緩存的數(shù)據(jù)量,而存儲(chǔ)到磁盤則會(huì)拖慢應(yīng)用程序)。所以選擇記錄更新的方式。但是,如果更新太多,那么記錄更新成本也不低。因此,RDD只支持讀操作,并且只支持粗粒度轉(zhuǎn)換,即在大量記錄上執(zhí)行的單個(gè)操作。將創(chuàng)建RDD的一系列轉(zhuǎn)換記錄下來(lái)(即Lineage),以便恢復(fù)丟失的分區(qū)。
雖然只支持粗粒度轉(zhuǎn)換限制了編程模型,但是RDD仍然可以很好地適用于很多應(yīng)用,特別是支持?jǐn)?shù)據(jù)并行的批量分析應(yīng)用,包括數(shù)據(jù)挖掘、機(jī)器學(xué)習(xí)、圖算法等,因?yàn)檫@些程序通常都會(huì)在很多記錄上執(zhí)行相同的操作。
RDD抽象
RDD是只讀的、分區(qū)記錄的集合。RDD只能基于在穩(wěn)定物理存儲(chǔ)中的數(shù)據(jù)集和其他已有的RDD上執(zhí)行確定性操作來(lái)創(chuàng)建。這些確定性操作稱之為轉(zhuǎn)換,如map、filter、groupBy、join(轉(zhuǎn)換不是程開(kāi)發(fā)人員在RDD上執(zhí)行的操作)。
RDD含有如何從其他RDD計(jì)算出本RDD的相關(guān)信息(即Lineage),據(jù)此可以從物理存儲(chǔ)的數(shù)據(jù)計(jì)算出相應(yīng)的RDD分區(qū)。
在需要反復(fù)使用的某個(gè)數(shù)據(jù)集時(shí),使用RDD的持久化,即persist,這個(gè)持久化優(yōu)先是放在內(nèi)存中的。
再來(lái)看看WordCount
說(shuō)了這么多,我們依然拿WordCount來(lái)說(shuō)說(shuō),幫忙小伙伴們理解,還沒(méi)有看本系列前兩篇文章的童鞋抓緊去看看哈。
val file = "hdfs://127.0.0.1:9000/file.txt"
val lines = sc.textFile(file)
val words = lines.flatMap(line => line.split("\s+"))
val partialCountMap = words
.mapPartitions(convertWordsInPartitionToWordCountMap)
val wordCount = distCountMap.reduce(mergeMaps)
WordCount一共涉及到三個(gè)RDD,用于承載文本行的lines,用于承載單詞的words,用于承載每個(gè)文件塊上部分單詞計(jì)數(shù)的 partialCountMap。Lineage關(guān)系:partialCountMap的父RDD為words,words的父RDD為lines,如下圖:
有了Lineage和RDD的只讀特性,就可以輕松完成容錯(cuò)了。
如果words在slave1上的一個(gè)分區(qū)出問(wèn)題了,那么我們只需要加載slave1上對(duì)應(yīng)的文件塊,并重新計(jì)算其lines對(duì)應(yīng)的分區(qū),進(jìn)而計(jì)算得到words的這個(gè)分區(qū)。
圖中每個(gè)slave中只畫了一個(gè)文件塊,實(shí)際上可能有多個(gè)文件塊。一定要注意的是哪個(gè)分區(qū)出問(wèn)題了,只會(huì)重算這一個(gè)分區(qū),也就只會(huì)重新加載這個(gè)分區(qū)關(guān)聯(lián)的文件塊。
上面討論的是窄依賴的情況,如果像groupBy這種轉(zhuǎn)換,一個(gè)RDD分區(qū)需要依賴父RDD的多個(gè)分區(qū),那么一個(gè)分區(qū)掛了,就需要計(jì)算父RDD中的多個(gè)分區(qū)。
分布式系統(tǒng)的三個(gè)問(wèn)題:可擴(kuò)展性,負(fù)載均衡,容錯(cuò)處理,都解決了吧。
不知道看到這里的小伙伴,心里是否有個(gè)疑問(wèn),既然RDD的API只支持粗粒度的轉(zhuǎn)換,它真的能夠支持這么多千奇百怪的應(yīng)用場(chǎng)景嗎?下一篇,我們一起看RDD的API,以及它對(duì)其它大數(shù)據(jù)處理框架能夠處理的應(yīng)用場(chǎng)景的等效解決方案。