這篇文章將為大家詳細講解有關Spark driver端得到executor返回值的方法,文章內(nèi)容質(zhì)量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關知識有一定的了解。
創(chuàng)新互聯(lián)從2013年開始,是專業(yè)互聯(lián)網(wǎng)技術服務公司,擁有項目成都做網(wǎng)站、成都網(wǎng)站設計、成都外貿(mào)網(wǎng)站建設網(wǎng)站策劃,項目實施與項目整合能力。我們以讓每一個夢想脫穎而出為使命,1280元靜樂做網(wǎng)站,已為上家服務,為靜樂各地企業(yè)和個人服務,聯(lián)系電話:18982081108
有人說spark的代碼不優(yōu)雅,這個浪尖就忍不了了。實際上,說spark代碼不優(yōu)雅的主要是對scala不熟悉,spark代碼我覺得還是很贊的,最值得閱讀的大數(shù)據(jù)框架之一。
今天這篇文章不是為了爭辯Spark 代碼優(yōu)雅與否,主要是講一下理解了spark源碼之后我們能使用的一些小技巧吧。
spark 使用的時候,總有些需求比較另類吧,比如有球友問過這樣一個需求:
浪尖,我想要在driver端獲取executor執(zhí)行task返回的結果,比如task是個規(guī)則引擎,我想知道每條規(guī)則命中了幾條數(shù)據(jù),請問這個怎么做呢?
這個是不是很騷氣,也很常見,按理說你輸出之后,在MySQL里跑條sql就行了,但是這個往往顯的比較麻煩。而且有時候,在 driver可能還要用到這些數(shù)據(jù)呢?具體該怎么做呢?
大部分的想法估計是collect方法,那么用collect如何實現(xiàn)呢?大家自己可以考慮一下,我只能告訴你不簡單,不如輸出到數(shù)據(jù)庫里,然后driver端寫sql分析一下。
還有一種考慮就是使用自定義累加器。這樣就可以在executor端將結果累加然后在driver端使用,不過具體實現(xiàn)也是很麻煩。大家也可以自己琢磨一下下~
那么,浪尖就給大家介紹一個比較常用也比較騷的操作吧。
其實,這種操作我們最先想到的應該是count函數(shù),因為他就是將task的返回值返回到driver端,然后進行聚合的。我們可以從idea count函數(shù)點擊進去,可以看到
def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
也即是sparkcontext的runJob方法。
Utils.getIteratorSize _這個方法主要是計算每個iterator的元素個數(shù),也即是每個分區(qū)的元素個數(shù),返回值就是元素個數(shù):
/** * Counts the number of elements of an iterator using a while loop rather than calling * [[scala.collection.Iterator#size]] because it uses a for loop, which is slightly slower * in the current version of Scala. */ def getIteratorSize[T](iterator: Iterator[T]): Long = { var count = 0L while (iterator.hasNext) { count += 1L iterator.next() } count }
然后就是runJob返回的是一個數(shù)組,每個數(shù)組的元素就是我們task執(zhí)行函數(shù)的返回值,然后調(diào)用sum就得到我們的統(tǒng)計值了。
那么我們完全可以借助這個思路實現(xiàn)我們開頭的目標。浪尖在這里直接上案例了:
import org.apache.spark.{SparkConf, SparkContext, TaskContext}
import org.elasticsearch.hadoop.cfg.ConfigurationOptions
object es2sparkRunJob {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName(this.getClass.getCanonicalName)
conf.set(ConfigurationOptions.ES_NODES, "127.0.0.1")
conf.set(ConfigurationOptions.ES_PORT, "9200")
conf.set(ConfigurationOptions.ES_NODES_WAN_ONLY, "true")
conf.set(ConfigurationOptions.ES_INDEX_AUTO_CREATE, "true")
conf.set(ConfigurationOptions.ES_NODES_DISCOVERY, "false")
conf.set("es.write.rest.error.handlers", "ignoreConflict")
conf.set("es.write.rest.error.handler.ignoreConflict", "com.jointsky.bigdata.handler.IgnoreConflictsHandler")
val sc = new SparkContext(conf)
import org.elasticsearch.spark._
val rdd = sc.esJsonRDD("posts").repartition(10)
rdd.count()
val func = (itr : Iterator[(String,String)]) => {
var count = 0
itr.foreach(each=>{
count += 1
})
(TaskContext.getPartitionId(),count)
}
val res = sc.runJob(rdd,func)
res.foreach(println)
sc.stop()
}
}
例子中driver端獲取的就是每個task處理的數(shù)據(jù)量。
關于Spark driver端得到executor返回值的方法就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。
分享題目:Sparkdriver端得到executor返回值的方法
網(wǎng)站網(wǎng)址:http://m.newbst.com/article36/gpgepg.html
成都網(wǎng)站建設公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站設計公司、網(wǎng)站建設、面包屑導航、、網(wǎng)站設計、全網(wǎng)營銷推廣
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉載內(nèi)容為主,如果涉及侵權請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉載,或轉載時需注明來源: 創(chuàng)新互聯(lián)