2014年的大數(shù)據(jù)領(lǐng)域,Apache Spark(以下簡(jiǎn)稱(chēng)Spark)無(wú)疑最受矚目。Spark,出自名門(mén)伯克利AMPLab之手,目前由商業(yè)公司Databricks保駕護(hù)航。自2014年3月份躋身Apache頂級(jí)項(xiàng)目(TLP),Spark已然成為ASF最活躍的項(xiàng)目之一,得到了業(yè)內(nèi)廣泛的支持——2014年12月發(fā)布的Spark 1.2版本包含了來(lái)自172位Contributor貢獻(xiàn)的1000多個(gè)commits。而在2014一整年中,Spark共發(fā)布了大小9個(gè)版本(包含5月底發(fā)布具有里程碑意義的1.0版本),其社區(qū)活躍度可見(jiàn)一斑。值得一提的是,2014年11月,Databricks基于AWS完成了一個(gè)Daytona Gray類(lèi)別的Sort Benchmark,并創(chuàng)造了該測(cè)試的新紀(jì)錄。本文將概括性地總結(jié)Spark在2014年的發(fā)展。
Spark 2014,星星之火已成燎原之勢(shì)
首先,Spark會(huì)議及相關(guān)交流。目前,世界范圍內(nèi)最權(quán)威的Spark領(lǐng)域會(huì)議無(wú)疑是Spark Summit,已于2013年與2014年連續(xù)成功舉辦兩屆,來(lái)自全球各地的工程師們與會(huì)分享了各自的Spark使用案例。鑒于目前Spark的火爆態(tài)勢(shì),Spark Summit將在2015年分Spark Summit East與Spark Summit West兩次舉行。著眼國(guó)內(nèi),首屆中國(guó)Spark技術(shù)峰會(huì)(Spark Summit China)于2014年4月在北京舉辦,據(jù)統(tǒng)計(jì),全國(guó)各大互聯(lián)網(wǎng)公司幾乎都出席了會(huì)議。因此,大家可以期待下今年的Spark Summit China又會(huì)帶來(lái)怎樣的驚喜。除去這樣比較大型的會(huì)議,Spark Meetup也不定期地在全球各地舉行,截止本文寫(xiě)作時(shí),已有來(lái)自13個(gè)不同國(guó)家的33個(gè)城市舉辦過(guò)Spark Meetup,國(guó)內(nèi)目前已經(jīng)舉辦Spark Meetup的城市有四個(gè),分別是北京、杭州、上海和深圳。除了線(xiàn)下交流,線(xiàn)上也會(huì)組織一些公開(kāi)課,供那些不方便到線(xiàn)下交流的朋友參加。由此可以看出,2014年關(guān)于Spark的交流活動(dòng)非常頻繁,這對(duì)推動(dòng)Spark發(fā)展是大有裨益的。
其次,在2014年,各大廠商相繼宣布與Databricks進(jìn)行合作。其中,Cloudera早在2013年底即宣布將在其發(fā)行版中添加Spark,而后又有更多的企業(yè)加入進(jìn)來(lái),如Datastax、MapR、Pivotal及Hortonworks等。由此可見(jiàn),Spark已得到了眾多大數(shù)據(jù)企業(yè)的認(rèn)可,而這些企業(yè)也確實(shí)將自己的產(chǎn)品與Spark進(jìn)行了緊密的集成。譬如Datastax將Cassandra與Spark進(jìn)行了集成,使得Spark可以操作Cassandra內(nèi)的數(shù)據(jù),又譬如ElasticSearch也和Spark進(jìn)行了集成,更多這方面的動(dòng)作可參考Spark Summit 2014中提到的相關(guān)內(nèi)容。
此外,Spark在2014年也吸引了更多企業(yè)的落地使用。國(guó)外比較知名的有Yahoo! 、eBay、Twitter、Amazon、SAP、Tableau及MicroStrategy等;同時(shí),值得高興的是,在Spark落地實(shí)踐上,國(guó)內(nèi)企業(yè)也不遑多讓?zhuān)詫?、騰訊、百度、小米、京東、唯品會(huì)、愛(ài)奇藝、搜狐、七牛、華為及亞信等知名企業(yè)都進(jìn)行了生產(chǎn)環(huán)境使用,從而也促成了越來(lái)越多的華人工程師為Spark提交代碼,特別是Spark SQL這個(gè)組件,甚至有一半左右的Contributor都是華人工程師。各大知名企業(yè)的使用,大幅度提升了整個(gè)業(yè)界使用Spark的興趣和信心,我們有理由相信,在2015年,使用Spark的企業(yè)數(shù)量必會(huì)是井噴式的爆發(fā)。與此同時(shí),已經(jīng)出現(xiàn)了一批基于Spark做應(yīng)用的創(chuàng)業(yè)公司,而其中有不少發(fā)展得相當(dāng)不錯(cuò),如Adatao和TupleJump。
隨著市場(chǎng)上對(duì)Spark工程師需求的日益加強(qiáng),Databricks也適時(shí)地推出了Spark開(kāi)發(fā)者認(rèn)證計(jì)劃,第一次線(xiàn)下測(cè)試已經(jīng)于2014年11月在西班牙巴塞羅那舉行。截止到本文寫(xiě)作時(shí)(2015年1月),Spark開(kāi)發(fā)者認(rèn)證還不支持線(xiàn)上測(cè)試,但線(xiàn)上測(cè)試平臺(tái)不久后就會(huì)上線(xiàn)。
基于Spark持續(xù)健康發(fā)展的生態(tài)系統(tǒng),越來(lái)越多的企業(yè)和機(jī)構(gòu)在Spark上面開(kāi)發(fā)應(yīng)用和擴(kuò)展庫(kù)。隨著這些庫(kù)的增長(zhǎng),Databricks在2014年圣誕節(jié)前夕上線(xiàn)了一個(gè)類(lèi)似pip的功能來(lái)跟蹤這些庫(kù)的網(wǎng)站:http://spark-packages.org,目前已經(jīng)有一些庫(kù)入駐Spark Packages,其中有幾個(gè)相當(dāng)不錯(cuò),比如:dibbhatt/kafka-spark-consumer、spark-jobserver/spark-jobserver和mengxr/spark-als。
Spark 2014,解析眾人拾柴下的技術(shù)演進(jìn)
如圖1所示,可以看出Spark包含了批處理、流處理、圖處理、機(jī)器學(xué)習(xí)、即席查詢(xún)與關(guān)系查詢(xún)等功能,這就意味著我們只需要一個(gè)框架就可以滿(mǎn)足各種使用場(chǎng)景的需求。如果放在以前,我們可能需要為每個(gè)功能都準(zhǔn)備一套框架,譬如采用Hadoop MapReduce來(lái)做批處理和采用Storm來(lái)做流式處理,這樣做帶來(lái)的結(jié)果是我們必須分別針對(duì)兩套計(jì)算框架編寫(xiě)不同的業(yè)務(wù)代碼,而編寫(xiě)出的業(yè)務(wù)代碼也幾乎無(wú)法重用;另一方面,為了使系統(tǒng)穩(wěn)定,我們還得額外投入人力去深入理解Hadoop MapReduce及Storm的原理,這將造成很大的人力開(kāi)銷(xiāo)。當(dāng)采用Spark后,我們只需要去理解Spark即可,另一個(gè)吸引人的地方在于Spark批處理與流計(jì)算的業(yè)務(wù)代碼幾乎可以完全重用,這也就意味著我們只需要編寫(xiě)一份邏輯代碼就可以分別運(yùn)行批處理與流計(jì)算。最后,Spark可以無(wú)縫使用存儲(chǔ)在HDFS上的數(shù)據(jù),無(wú)需任何數(shù)據(jù)遷移動(dòng)作。
圖1 Spark Stack
同時(shí),由于現(xiàn)存系統(tǒng)必須要與以HDFS為代表的分布式文件系統(tǒng)進(jìn)行數(shù)據(jù)共享和交換,由此造成的IO開(kāi)銷(xiāo)大幅度地降低了計(jì)算效率;除此之外,反復(fù)的序列化與反序列化也是不可忽略的開(kāi)銷(xiāo)。鑒于此,Spark中抽象出了RDD的概念,并基于RDD定義了一系列豐富的算子,MapReduce只是其中一個(gè)非常小的子集,與此同時(shí),RDD也可以被緩存在內(nèi)存中,從而迭代計(jì)算可以充分地享受內(nèi)存計(jì)算所帶來(lái)的加速效果。與MapReduce基于進(jìn)程的計(jì)算模型不一樣,Spark基于的是多線(xiàn)程模型,這也意味著Spark的任務(wù)調(diào)度延遲可以控制在亞秒級(jí),當(dāng)任務(wù)特別多的時(shí)候,這么做可以大幅度降低整體調(diào)度時(shí)間,并且為基于macro batch的流式計(jì)算打下基礎(chǔ)。Spark的另一個(gè)特色是基于DAG的任務(wù)調(diào)度與優(yōu)化,Spark不需要像MapReduce一樣為每一步操作都去調(diào)度一個(gè)作業(yè),相反,Spark豐富的算子可以更自然地以DAG形式表達(dá)運(yùn)算。同時(shí),在Spark中,每個(gè)stage內(nèi)部是有pipeline優(yōu)化的,所以即使我們不使用內(nèi)存緩存數(shù)據(jù),Spark的執(zhí)行效率也要比Hadoop高。最后Spark基于RDD的lineage信息來(lái)容錯(cuò),由于RDD是不可變的,Spark并不需要記錄中間狀態(tài),當(dāng)RDD的某些partition丟失時(shí),Spark可以利用RDD的lineage信息來(lái)進(jìn)行并行的恢復(fù),不過(guò)當(dāng)lineage較長(zhǎng)時(shí),還是推薦用戶(hù)適時(shí)checkpoint,從而減少恢復(fù)時(shí)間。
以下我們沿著2014年各主要版本的發(fā)布軌跡簡(jiǎn)單總結(jié)下Spark及各個(gè)組件(Spark Streaming、MLlib、GraphX及Spark SQL)在新功能及穩(wěn)定性上做出的努力。
Spark 0.9.x
2014年2月初,Databricks發(fā)布了Spark的第一個(gè)版本0.9.0,這一版本帶來(lái)的最直接的變化是將Scala從2.9.x升級(jí)到了2.10。由于Scala在那時(shí)并沒(méi)有做到二進(jìn)制向下兼容,所以大家不得不使用Scala2.10重新編譯業(yè)務(wù)代碼,這也算是個(gè)插曲吧。
這個(gè)版本最大的貢獻(xiàn)應(yīng)該是加入了配置系統(tǒng),即SparkConf。在這之前,各種屬性參數(shù)都直接作為Master的參數(shù)傳進(jìn)去,而有了SparkConf后,Master就不需要管這些了,各種參數(shù)在SparkConf中配置完成后,將SparkConf傳給Master即可,這在測(cè)試中是非常有用的。另外在提交任務(wù)時(shí),允許把Driver程序放到集群中的某臺(tái)服務(wù)器上運(yùn)行,以前只能放在集群外的服務(wù)器上運(yùn)行。
Spark Streaming終于在這個(gè)版本“自信”地結(jié)束了alpha版本,并且加入了HA模式,現(xiàn)在大家知道,其實(shí)那時(shí)的HA并不能保證數(shù)據(jù)不丟失,這一點(diǎn)到1.2的時(shí)候我們?cè)僬劇T赟park Streaming跳出alpha的同時(shí),新增加了alpha組件GraphX,GraphX是一個(gè)分布式圖計(jì)算框架,在這個(gè)版本中提供了一些標(biāo)準(zhǔn)算法,如PageRank、connected components、 strongly connected components與triangle counting等等,但穩(wěn)定性還有待加強(qiáng)。MLlib在這個(gè)版本中增加了常用的樸素貝葉斯算法,不過(guò)更引人注意的是,MLlib終于也開(kāi)始支持Python API了(需要NumPy的支持)。
社區(qū)分別于4月份與7月份發(fā)布了兩個(gè)maintena-nce版本:0.9.1與0.9.2,修復(fù)了一些Bug,無(wú)新的feature加入,不過(guò)0.9.1倒是Spark成為Apache頂級(jí)項(xiàng)目后的第一個(gè)發(fā)布。
Spark 1.0.x
用“千呼萬(wàn)喚始出來(lái)”形容Spark1.0一點(diǎn)都不為過(guò),作為一個(gè)里程碑式的發(fā)布,Spark社區(qū)也是非常謹(jǐn)慎,在發(fā)布了多個(gè)RC版本后,終于在5月底正式發(fā)布了1.0版本。這個(gè)版本有110多位Contributor,歷經(jīng)4個(gè)月的共同努力,而1.0版本也毫無(wú)懸念地成為了Spark誕生以來(lái)最大的一次發(fā)布。作為1.x的開(kāi)端版本,Spark社區(qū)也對(duì)API在以后所有1.x版本上的兼容性做了保證。另一方面,Spark 1.0的Java API開(kāi)始支持Java 8的lambda表達(dá)式,這多少讓一些必須用Java來(lái)寫(xiě)Spark程序的用戶(hù)得到了不小的便利。
萬(wàn)眾矚目的Spark SQL終于在這個(gè)版本中亮相,盡管只是alpha版本,但全球各地的Spark用戶(hù)們已經(jīng)迫不及待開(kāi)始嘗試,這一勢(shì)頭至今仍在延續(xù),Spark SQL現(xiàn)在是Spark中最活躍的組件,沒(méi)有之一。提到Spark SQL,不得不提Shark,Databricks在Spark Summit 2014上宣布Shark已經(jīng)完成了其學(xué)術(shù)使命,且Shark的整體設(shè)計(jì)架構(gòu)對(duì)Hive的依賴(lài)性太強(qiáng),難以支持其長(zhǎng)遠(yuǎn)發(fā)展,所以決定終止Shark開(kāi)發(fā),全面轉(zhuǎn)向Spark SQL。Spark SQL支持以SQL的形式來(lái)操作結(jié)構(gòu)化數(shù)據(jù),并且也支持使用HiveContext來(lái)操作Hive中的數(shù)據(jù)。在這個(gè)方面,業(yè)內(nèi)對(duì)SQL on Hadoop的超強(qiáng)需求決定了Spark SQL必將長(zhǎng)期處于快速發(fā)展的態(tài)勢(shì)。值得一提的是,Hive社區(qū)也推出了一個(gè)Hive on Spark的項(xiàng)目——將Hive的執(zhí)行引擎換成Spark。不過(guò)從目標(biāo)上看,Hive on Spark更注重于針對(duì)Hive徹底地向下兼容性,而Spark SQL更注重于Spark與其他組件的互操作和多元化數(shù)據(jù)處理。
MLlib方面也有一個(gè)較大的進(jìn)步,1.0開(kāi)始終于支持稀疏矩陣了,這對(duì)MLlib的使用者來(lái)說(shuō)絕對(duì)是一個(gè)讓人歡欣鼓舞的特性。在算法方面,MLlib也增加了決策樹(shù)、SVD及PCA等。Spark Streaming與GraphX的性能在這個(gè)版本中都得到了增強(qiáng)。
此外,Spark提供了一個(gè)新的提交任務(wù)的工具,稱(chēng)為spark-submit,無(wú)論是運(yùn)行在Standalone模式,還是運(yùn)行在YARN上,都可以使用這個(gè)工具提交任務(wù)。從這一點(diǎn)上說(shuō),Spark統(tǒng)一了提交任務(wù)的入口。
最后,社區(qū)在7月和8月份分別發(fā)布了1.0.1與1.0.2兩個(gè)maintenance版本。
Spark 1.1.x
Spark 1.1.0在9月如期而至。此版本加入了sort-based的shuffle實(shí)現(xiàn),之前hash-based的shuffle需要為每個(gè)reducer都打開(kāi)一個(gè)文件,導(dǎo)致的結(jié)果是大量的buffer開(kāi)銷(xiāo)與低效的I/O,而最新sort-based的shuffle實(shí)現(xiàn)能很好地解決上述問(wèn)題,當(dāng)shuffle數(shù)據(jù)量特別大的時(shí)候,sort-based的shuffle優(yōu)勢(shì)尤其明顯。需要指出的是,和MapReduce針對(duì)KV排序不一樣,sort-based是按照partition序號(hào)進(jìn)行排序的,在partition內(nèi)部并不排序。但是1.1中默認(rèn)的shuffle方式還是基于hash的,到1.2中才會(huì)把sort-based作為默認(rèn)的shuffle方式。
Spark SQL在這個(gè)版本里加入了不少新特性。最值得關(guān)注的是加入了JDBC Server的功能,這意味著用戶(hù)可以只寫(xiě)JDBC代碼就可以享受Spark SQL的各種功能。
MLlib引入了一個(gè)用于完成抽樣、相關(guān)性、估計(jì)、測(cè)試等任務(wù)的統(tǒng)計(jì)庫(kù)。之前呼聲很高的特征抽取工具Word2Vec和TF-IDF也被加進(jìn)了此版本。除了增加一些新的算法之外,MLlib性能在這一版本中得也到了較大的提升。比起MLlib,GraphX在這一版并無(wú)特別大的改變。
Spark Streaming在這一版本的數(shù)據(jù)源中加入了對(duì)Amazon Kinesis的支持,只不過(guò)國(guó)內(nèi)用戶(hù)對(duì)這個(gè)數(shù)據(jù)源支持的興趣不是很大,對(duì)于國(guó)外用戶(hù)的意義更多一些。不過(guò)在這個(gè)版本中,Spark Streaming改變了從Flume取得數(shù)據(jù)的方式,之前是Flume push數(shù)據(jù)到executor/worker中,但在這種模式下,當(dāng)executor/worker掛掉后,F(xiàn)lume便無(wú)法再正常地push數(shù)據(jù)。所以現(xiàn)在把push改成了pull,這意味著即使某個(gè)receiver掛掉后,也能保證在其他worker上新啟動(dòng)的receiver也能繼續(xù)正常地接收數(shù)據(jù)。另一個(gè)重要的改進(jìn)是加入了限流的功能,譬如之前Spark Streaming在讀取Kafka中topic數(shù)據(jù)時(shí)經(jīng)常會(huì)發(fā)生OOM,而加入限流后,OOM基本不再發(fā)生。Spark Streaming與MLlib的結(jié)合是另一個(gè)不得不提的全新特性,利用Streaming的實(shí)時(shí)性在線(xiàn)訓(xùn)練模型,但當(dāng)下只是一個(gè)比較初級(jí)的實(shí)現(xiàn)。
在11月底發(fā)布的maintenance版本1.1.1中修復(fù)了一個(gè)較大的問(wèn)題,之前在使用外部數(shù)據(jù)結(jié)構(gòu)時(shí)(ExternalAppendOnlyMap與ExternalSorter)會(huì)產(chǎn)生大量非常小的中間文件,這不但會(huì)造成“too many open files”的異常,也會(huì)極大地影響性能,1.1.1版本對(duì)其進(jìn)行了修復(fù)。
Spark 1.2.0
12月中旬發(fā)布了1.2,不得不說(shuō)Spark社區(qū)在控制發(fā)布進(jìn)度工作上做得很贊。在此版本中,首當(dāng)其沖的就是把sort-based shuffle設(shè)置成了默認(rèn)的shuffle策略。另一方面,在數(shù)據(jù)傳輸量非常大的情況下,connection manager終于換成Netty-based的實(shí)現(xiàn)了,以前的實(shí)現(xiàn)非常慢的原因是每次都要從磁盤(pán)讀到內(nèi)核態(tài),再到用戶(hù)態(tài),再回到內(nèi)核態(tài)進(jìn)入網(wǎng)卡,現(xiàn)在用zero-copy來(lái)實(shí)現(xiàn),效率高了很多。
對(duì)于Spark Streaming說(shuō),終于也算是個(gè)小小的里程碑,開(kāi)始支持fully H/A模式。以前當(dāng)driver掛掉的時(shí)候,可能會(huì)丟失掉一小部分?jǐn)?shù)據(jù)?,F(xiàn)在加上了一層WAL(Write Ahead Log),每次receiver收到數(shù)據(jù)后都會(huì)存在HDFS上,這樣即使driver掛掉,當(dāng)它重啟起來(lái)后,還是可以接著處理。同時(shí)大家也需要注意 unreliable receivers和reliable receivers的區(qū)別,只有用戶(hù)使用reliable receivers才能保證數(shù)據(jù)零丟失。
MLlib最大變動(dòng)是引入了新的pipeline API,可以更加便捷地搭建機(jī)器學(xué)習(xí)相關(guān)的全套流水線(xiàn),其中還包括了以Spark SQL SchemaRDD為基礎(chǔ)的dataset API。
GraphX結(jié)束alpha正式發(fā)布,同時(shí)提供了stable API,這意味著用戶(hù)不需要擔(dān)心現(xiàn)有代碼以后會(huì)因API的變化而改動(dòng)了。此外,新的核心API aggregateMessages也替代掉了mapReduceTriplet,大家要注意這個(gè)變動(dòng)。
Spark SQL最重要的特性毫無(wú)疑問(wèn)應(yīng)該屬于external data source,此API讓開(kāi)發(fā)者可以更容易地開(kāi)發(fā)出對(duì)接外部數(shù)據(jù)源的spark connector,統(tǒng)一用SQL操作所有數(shù)據(jù)源,同時(shí)也可以push predicates to data source,譬如你要從HBase取數(shù)據(jù)后做一些篩選,一般我們需要把數(shù)據(jù)從HBase全取出來(lái)后在Spark引擎中篩選,現(xiàn)在可以把這個(gè)步驟推到data source端,讓用戶(hù)在取數(shù)據(jù)的時(shí)候就可以篩選。另一個(gè)值得一提的是現(xiàn)在cacheTable和原生的cache已經(jīng)統(tǒng)一了語(yǔ)義,并且性能和穩(wěn)定性也有顯著提升,不但內(nèi)存表支持predicates pushdown,可以基于統(tǒng)計(jì)信息跳過(guò)批量數(shù)據(jù),而且建內(nèi)存buffer時(shí)分段建立,因此在cache較大的表時(shí)也不再會(huì)OOM。
由于篇幅原因,以上我們簡(jiǎn)單總結(jié)了Spark在2014年的各個(gè)版本中比較重要的特性,但有一個(gè)功能的增強(qiáng)始終貫穿其中——YARN,由于目前很多公司都把不同的計(jì)算框架跑在YARN上,所以Spark對(duì)YARN的支持肯定會(huì)越來(lái)越好,事實(shí)上Spark確實(shí)在這方面做了很多工作。
結(jié)語(yǔ)
2014年對(duì)Spark是非常重要的一年,不僅因?yàn)榘l(fā)布了里程碑式的1.0版本,更重要的是通過(guò)整個(gè)社區(qū)的努力,Spark變得越來(lái)越穩(wěn)定與高效,也正在被越來(lái)越多的企業(yè)采用。在2015年,隨著社區(qū)不斷的努力,相信Spark一定會(huì)達(dá)到一個(gè)新的高度,在更多的企業(yè)中扮演更重要的角色。
感謝來(lái)自Databricks公司的Reynold Xin和連城給本文review,并提供寶貴建議。