了解hadoop,首先就需要先了解hadoop的數(shù)據(jù)流,就像了解servlet的生命周期似的。hadoop是一個(gè)分布式存儲(chǔ)(hdfs)和分布式計(jì)算框架(mapreduce),但是hadoop也有一個(gè)很重要的特性:hadoop會(huì)將mapreduce計(jì)算移動(dòng)到存儲(chǔ)有部分?jǐn)?shù)據(jù)的各臺(tái)機(jī)器上。
術(shù)語(yǔ)
MapReduce 作業(yè)(job)是客戶端需要執(zhí)行的一個(gè)工作單元:它包括輸入數(shù)據(jù)、mapreduce程序和配置信息。hadoop將作業(yè)分成若干個(gè)小任務(wù)(task)來(lái)執(zhí)行,其中包括兩類任務(wù):map任務(wù)和reduce任務(wù)。
有兩類節(jié)點(diǎn)控制著作業(yè)執(zhí)行過(guò)程:一個(gè)jobtracker及一系列tasktracker。 jobtracker通過(guò)調(diào)度tasktracker上運(yùn)行的任務(wù),來(lái)協(xié)調(diào)所有運(yùn)行在系統(tǒng)上的作業(yè)。tasktracker在運(yùn)行任務(wù)的同時(shí)將運(yùn)行進(jìn)度報(bào) 告發(fā)送給jobtracker,jobtracker由此記錄每項(xiàng)作業(yè)任務(wù)的整體進(jìn)度情況。如果其中一個(gè)任務(wù)失敗,jobtracker可以在另外一個(gè) tasktracker節(jié)點(diǎn)上重新調(diào)度該任務(wù)。
輸入
hadoop將mapreduce的輸入數(shù)據(jù)劃分成等長(zhǎng)的小數(shù)據(jù)塊,稱為輸入分片(input split)或簡(jiǎn)稱分片。hadoop為每個(gè)分片構(gòu)建一個(gè)map任務(wù),并由該任務(wù)來(lái)運(yùn)行用戶自定義的map函數(shù)從而處理分片中的每條記錄。 對(duì)于大多數(shù)作業(yè)來(lái)說(shuō),一個(gè)合理的分片大小趨向于HDFS的一個(gè)塊的大小,默認(rèn)是64M,不過(guò)可以針對(duì)集群調(diào)整這個(gè)默認(rèn)值。分片的大小一定要根據(jù)運(yùn)行的任務(wù)來(lái)定,如果分片過(guò)小,那么管理分片的總時(shí)間和構(gòu)建map任務(wù)的總時(shí)間將決定著作業(yè)的整個(gè)執(zhí)行時(shí)間。
hadoop在存儲(chǔ)有輸入數(shù)據(jù)的節(jié)點(diǎn)上運(yùn)行map任務(wù),可以獲得最佳性能,這就是所謂的數(shù)據(jù)本地化優(yōu)化。 因?yàn)閴K是hdfs存儲(chǔ)數(shù)據(jù)的最小單元,每個(gè)塊可以在多個(gè)節(jié)點(diǎn)上同時(shí)存在(備份),一個(gè)文件被分成的各個(gè)塊被隨機(jī)分部在多個(gè)節(jié)點(diǎn)上,因此如果一個(gè)map任務(wù) 的輸入分片跨越多個(gè)數(shù)據(jù)塊,那么基本上沒(méi)有一個(gè)節(jié)點(diǎn)能夠恰好同時(shí)存在這幾個(gè)連續(xù)的數(shù)據(jù)塊,那么map任務(wù)就需要首先通過(guò)網(wǎng)絡(luò)將不存在于此節(jié)點(diǎn)上的數(shù)據(jù)塊遠(yuǎn) 程復(fù)制到本節(jié)點(diǎn)上再運(yùn)行map函數(shù),那么這種任務(wù)顯然效率非常低。
輸出
map任務(wù)將其輸出寫(xiě)入到本地磁盤(pán),而非HDFS。這是因?yàn)閙ap的輸出是中間結(jié)果:該中間結(jié)果有reduce任務(wù)處理后才產(chǎn)生最終結(jié)果(保存在hdfs中)。而一旦作業(yè)完成,map的輸出結(jié)果可以被刪除。
reduce任務(wù)并不具備數(shù)據(jù)本地化優(yōu)勢(shì):?jiǎn)蝹€(gè)reduce任務(wù)的輸入通常來(lái)自于所有的mapper任務(wù)的輸出。reduce任務(wù)的輸出通常存儲(chǔ)于HDFS中以實(shí)現(xiàn)可靠存儲(chǔ)。
數(shù)據(jù)流
作業(yè)根據(jù)設(shè)置的reduce任務(wù)的個(gè)數(shù)不同,數(shù)據(jù)流也不同,但大同小異。reduce任務(wù)的數(shù)量并非由輸入數(shù)據(jù)的大小決定的,而是可以通過(guò)手動(dòng)配置指定的。
單個(gè)reduce任務(wù)
多個(gè)reduce任務(wù)
如果是多個(gè)reduce任務(wù)的話,則每個(gè)map任務(wù)都會(huì)對(duì)其輸出進(jìn)行分區(qū)(partition),即為每個(gè)reduce任務(wù)創(chuàng)建一個(gè)分區(qū)。分區(qū)有用戶定義的分區(qū)函數(shù)控制,默認(rèn)的分區(qū)器(partitioner) 通過(guò)哈希函數(shù)來(lái)分區(qū)。
map任務(wù)和reduce任務(wù)之間的數(shù)據(jù)流稱為shuffle(混洗)。
沒(méi)有reduce任務(wù)
當(dāng)然也可能出現(xiàn)不需要執(zhí)行reduce任務(wù)的情況,即數(shù)據(jù)可以完全的并行。
combiner(合并函數(shù))
順便在這說(shuō)下combiner吧,hadoop運(yùn)行用戶針對(duì)map任務(wù)的輸出指定一個(gè)合并函數(shù),合并函數(shù)的輸出作為reduce函數(shù)的輸入。其實(shí)合并函數(shù) 就是一個(gè)優(yōu)化方案,說(shuō)白了就是在map任務(wù)執(zhí)行后在本機(jī)先執(zhí)行合并函數(shù)(通常就是reduce函數(shù)的拷貝),減少網(wǎng)絡(luò)傳輸量。