Spark 是分布式計(jì)算框架,多臺機(jī)器之間必然存在著通信。Spark在早期版本采用Akka實(shí)現(xiàn)。現(xiàn)在在Akka的上層抽象出了一個(gè)RpcEnv。RpcEnv負(fù)責(zé)管理機(jī)器之間的通信。
成都創(chuàng)新互聯(lián)從2013年開始,是專業(yè)互聯(lián)網(wǎng)技術(shù)服務(wù)公司,擁有項(xiàng)目成都網(wǎng)站制作、做網(wǎng)站網(wǎng)站策劃,項(xiàng)目實(shí)施與項(xiàng)目整合能力。我們以讓每一個(gè)夢想脫穎而出為使命,1280元濮陽做網(wǎng)站,已為上家服務(wù),為濮陽各地企業(yè)和個(gè)人服務(wù),聯(lián)系電話:18982081108
RpcEnv包含了如下三大核心:
RpcEndpoint 消息循環(huán)體,負(fù)責(zé)接收并處理消息。Spark中的Master、Worker都是RpcEndpoint 。
RpcEndpointRef :RpcEndpoint的引用,如果需要和RpcEndpoint通信,就必須獲取它的RpcEndpointRef,通過RpcEndpointRef發(fā)送消息。
Dispatcher:消息調(diào)度器,負(fù)責(zé)RPC消息路由到適當(dāng)?shù)腞pcEndpoint。
RpcEnv被創(chuàng)建以后,RpcEndpoint可以注冊到RpcEnv中,被注冊的RpcEndpoint會生成一個(gè)相應(yīng)的RpcEndpointRef來引用它。如果你需要向RpcEndpoint發(fā)送消息,必須到RpcEnv中通過RpcEndpoint的名稱來獲取對應(yīng)的RpcEndpointRef,然后通過RpcEndpointRef向RpcEndpoint發(fā)送消息。
RpcEnv負(fù)責(zé)管理RpcEndpoint的整個(gè)生命周期
注冊RpcEndpoint,使用name或者uri
路由發(fā)送給RpcEndpoint的消息。
停止RpcEndpoint
注:一個(gè)RpcEndpoint只能注冊給一個(gè)RpcEnv
RpcAddress:RpcEnv的邏輯地址,使用主機(jī)名和端口表示。
RpcEndpointAddress:注冊到RpcEnv上的RpcEndpoint的地址,由RpcAddress和name構(gòu)成。
由此可見RpcEnv和RpcEndpoint是在相同的機(jī)器上(相同的JVM中)。而要想給遠(yuǎn)端機(jī)器發(fā)送消息,是獲取遠(yuǎn)端機(jī)器的RpcEndpointRef,而并不是遠(yuǎn)端的RpcEndpoint注冊到本地的RpcEnv中。
在Spark1.6版本中,默認(rèn)使用的是netty
private def getRpcEnvFactory(conf: SparkConf): RpcEnvFactory = { val rpcEnvNames = Map( "akka" -> "org.apache.spark.rpc.akka.AkkaRpcEnvFactory", "netty" -> "org.apache.spark.rpc.netty.NettyRpcEnvFactory") val rpcEnvName = conf.get("spark.rpc", "netty") val rpcEnvFactoryClassName = rpcEnvNames.getOrElse(rpcEnvName.toLowerCase, rpcEnvName) Utils.classForName(rpcEnvFactoryClassName).newInstance().asInstanceOf[RpcEnvFactory] }
RpcEndpoint是一個(gè)消息循環(huán)體,它的生命周期:
構(gòu)造(Constructor)->啟動(dòng)(onStart)->消息接收(receive&receiveAndReply)->停止(onStop)
receive():不斷的運(yùn)行,處理客戶端發(fā)送過來的消息。
receiveAndReply():處理消息,并且回應(yīng)對方。
我們看一下Master的代碼:
def main(argStrings: Array[String]) { SignalLogger.register(log) val conf = new SparkConf val args = new MasterArguments(argStrings, conf) //指定的主機(jī)名必須是start-master.sh腳本運(yùn)行的本地機(jī)器名稱 val (rpcEnv, _, _) = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, conf) rpcEnv.awaitTermination() } /** * Start the Master and return a three tuple of: * (1) The Master RpcEnv * (2) The web UI bound port * (3) The REST server bound port, if any */ def startRpcEnvAndEndpoint( host: String, port: Int, webUiPort: Int, conf: SparkConf): (RpcEnv, Int, Option[Int]) = { val securityMgr = new SecurityManager(conf) //創(chuàng)建Rpc環(huán)境,主機(jī)名和端口就是Standalone集群的訪問地址。SYSTEM_NAME=sparkMaster val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr) // 將Master實(shí)例注冊到RpcEnv中 val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME, new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf)) val portsResponse = masterEndpoint.askWithRetry[BoundPortsResponse](BoundPortsRequest) (rpcEnv, portsResponse.webUIPort, portsResponse.restPort) }
在main方法中創(chuàng)建了RpcEnv,并且實(shí)例化Master實(shí)例,然后注冊到RpcEnv中。
RpcEndpoint其實(shí)是注冊到Dispatcher中的,在netty中的代碼實(shí)現(xiàn)如下:
override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = { dispatcher.registerRpcEndpoint(name, endpoint) }
注:NettyRpcEnv.scala的第135行
而Dispatcher中使用如下數(shù)據(jù)結(jié)構(gòu)來存儲RpcEndpoint和RpcEndpointRef
private val endpoints = new ConcurrentHashMap[String, EndpointData] private val endpointRefs = new ConcurrentHashMap[RpcEndpoint, RpcEndpointRef]
EndpointData為一個(gè)case class:
private class EndpointData( val name: String, val endpoint: RpcEndpoint, val ref: NettyRpcEndpointRef) { val inbox = new Inbox(ref, endpoint) }
在Master中使用數(shù)據(jù)結(jié)構(gòu)WorkerInfo保存著每個(gè)Worker的信息,其中就包括每個(gè)Worker的RpcEndpointRef
備注:
1、DT大數(shù)據(jù)夢工廠微信公眾號DT_Spark
2、IMF晚8點(diǎn)大數(shù)據(jù)實(shí)戰(zhàn)YY直播頻道號:68917580
3、新浪微博: http://www.weibo.com/ilovepains
網(wǎng)頁題目:第43課:Spark1.6RPC內(nèi)幕解密:運(yùn)行機(jī)制、源碼詳解、Netty與Akka等
本文來源:http://m.newbst.com/article36/gpjjsg.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供定制網(wǎng)站、網(wǎng)頁設(shè)計(jì)公司、微信小程序、網(wǎng)站內(nèi)鏈、商城網(wǎng)站、網(wǎng)站改版
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)