本期內容:
我們提供的服務有:成都做網站、成都網站建設、微信公眾號開發、網站優化、網站認證、柴桑ssl等。為1000+企事業單位解決了網站和推廣的問題。提供周到的售前咨詢和貼心的售后服務,是有科學管理、有技術的柴桑網站制作公司
1、Receiver啟動方式的設想
2、Receiver啟動源碼徹底分析
一:Receiver啟動方式的設想
1. Spark Streaming通過Receiver持續不斷的從外部數據源接收數據,并把數據匯報給Driver端,由此每個Batch Durations就可以根據匯報的數據生成不同的Job。
2. Receiver是在Spark Streaming應用程序啟動時啟動的,那么我們找Receiver在哪里啟動就應該去找Spark Streaming的啟動。
3. Receivers和InputDStreams是一一對應的,默認情況下一般只有一個Receiver.
如何啟動Receiver?
1. 從Spark Core的角度來看,Receiver的啟動Spark Core并不知道,就相當于Linux的內核之上所有的都是應用程序,因此Receiver是通過Job的方式啟動的
2. 一般情況下,只有一個Receiver,但是可以創建不同的數據來源的InputDStream.
final private[streaming] class DStreamGraph extends Serializable with Logging { private val inputStreams = new ArrayBuffer[InputDStream[_]]() //數組 private val outputStreams = new ArrayBuffer[DStream[_]]()
3. 啟動Receiver的時候,啟動一個Job,這個Job里面有RDD的transformations操作和action的操作,這個Job只有一個partition.這個partition的特殊是里面只有一個成員, 這個成員就是啟動的Receiver. 4. 這樣做的問題: a) 如果有多個InputDStream,那就要啟動多個Receiver,每個Receiver也就相當于分片partition,那我們啟動Receiver的時候理想的情況下是在不同的機器上啟動Receiver, 但是Spark Core的角度來看就是應用程序,感覺不到Receiver的特殊性,所以就會按照正常的Job啟動的方式來處理,極有可能在一個Executor上啟動多個Receiver. 這樣的話就可能導致負載不均衡。 b) 有可能啟動Receiver失敗,只要集群存在Receiver就不應該失敗。 c) 運行過程中,就默認的而言如果是一個partition的話,那啟動的時候就是一個Task,但是此Task也很可能失敗,因此以Task啟動的Receiver也會掛掉。
由此,可以得出,對于Receiver失敗的話,后果是非常嚴重的,那么Spark Streaming如何防止這些事的呢,下面就尋找Receiver的創建
這里先給出答案,后面源碼會詳細分析:
a) Spark使用一個Job啟動一個Receiver.最大程度的保證了負載均衡。
b) Spark Streaming指定每個Receiver運行在哪些Executor上。
c) 如果Receiver啟動失敗,此時并不是Job失敗,在內部會重新啟動Receiver.
接下來我們通過代碼一步一步解析Receiver是如何啟動的
1、首先我們在編寫具體的應用程序的時候,都會調用StreamingContext的start方法,其實這就是job啟動的源頭,我們先來看下start方法的源碼:
def start(): Unit = synchronized { state match { case INITIALIZED => startSite.set(DStream.getCreationSite()) StreamingContext.ACTIVATION_LOCK.synchronized { StreamingContext.assertNoOtherContextIsActive() try { validate() // Start the streaming scheduler in a new thread, so that thread local properties // like call sites and job groups can be reset without affecting those of the // current thread. ThreadUtils.runInNewThread("streaming-start") { sparkContext.setCallSite(startSite.get) sparkContext.clearJobGroup() sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false") scheduler.start() //啟動JobScheduler的start方法,啟動子線程,一方面為了本地初始化工作,另外一方面是不要阻塞主線程。 } state = StreamingContextState.ACTIVE } catch { case NonFatal(e) => logError("Error starting the context, marking it as stopped", e) scheduler.stop(false) state = StreamingContextState.STOPPED throw e } StreamingContext.setActiveContext(this) } shutdownHookRef = ShutdownHookManager.addShutdownHook( StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown) // Registering Streaming Metrics at the start of the StreamingContext assert(env.metricsSystem != null) env.metricsSystem.registerSource(streamingSource) uiTab.foreach(_.attach()) logInfo("StreamingContext started") case ACTIVE => logWarning("StreamingContext has already been started") case STOPPED => throw new IllegalStateException("StreamingContext has already been stopped") } }
2、上面調用start方法的時候,會調用JobScheduler的start()方法,在該方法里面,receiverTracker啟動了,源碼如下:
def start(): Unit = synchronized { if (eventLoop != null) return // scheduler has already been started logDebug("Starting JobScheduler") eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") { override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event) override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e) } eventLoop.start() // attach rate controllers of input streams to receive batch completion updates for { inputDStream <- ssc.graph.getInputStreams rateController <- inputDStream.rateController } ssc.addStreamingListener(rateController) listenerBus.start(ssc.sparkContext) receiverTracker = new ReceiverTracker(ssc) inputInfoTracker = new InputInfoTracker(ssc) receiverTracker.start() //啟動Receiver jobGenerator.start() logInfo("Started JobScheduler") }
3、我們接著看下receiverTracker的start()方法,在start方法里啟動了RPC消息通信體,為啥呢?因為receiverTracker會監控整個集群中的Receiver,Receiver轉過來要向ReceiverTrackerEndpoint匯報自己的狀態,接收的數據,包括生命周期等信息
/** Start the endpoint and receiver execution thread. */ def start(): Unit = synchronized { if (isTrackerStarted) { throw new SparkException("ReceiverTracker already started") } if (!receiverInputStreams.isEmpty) { //Receiver的啟動是依據數據流的 endpoint = ssc.env.rpcEnv.setupEndpoint( "ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv)) //匯報狀態信息 if (!skipReceiverLaunch) launchReceivers() //發起Receiver logInfo("ReceiverTracker started") trackerState = Started } }
4、基于ReceiverInputDStream(是在Driver端)來獲得具體的Receivers實例,然后再把他們分不到Worker節點上。一個ReceiverInputDStream只產生一個Receiver
/** * Get the receivers from the ReceiverInputDStreams, distributes them to the * worker nodes as a parallel collection, and runs them. */ private def launchReceivers(): Unit = { val receivers = receiverInputStreams.map(nis => { //一個輸入數據來源只產生一個Receiver val rcvr = nis.getReceiver() rcvr.setReceiverId(nis.id) rcvr }) runDummySparkJob() //啟動虛擬Job來分配Receiver到不同的executor上 logInfo("Starting " + receivers.length + " receivers") endpoint.send(StartAllReceivers(receivers)) }
5、其中runDummySparkJob()為了確保所有節點活著,而且避免所有的receivers集中在一個節點上。
private def runDummySparkJob(): Unit = { if (!ssc.sparkContext.isLocal) { ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect() } assert(getExecutors.nonEmpty) }
ReceiverInputDStream中的getReceiver()方法獲得receiver對象然后將它發送到worker節點上實例化receiver,然后去接收數據。
此方法必須要在子類中實現。
/** * Gets the receiver object that will be sent to the worker nodes * to receive data. This method needs to defined by any specific implementation * of a ReceiverInputDStream. */ def getReceiver(): Receiver[T]
ReceiverInputDStream是抽象類,所以getReceiver方法必須要在繼承的子類中實現
private[streaming] class SocketInputDStream[T: ClassTag]( ssc_ : StreamingContext, host: String, port: Int, bytesToObjects: InputStream => Iterator[T], storageLevel: StorageLevel ) extends ReceiverInputDStream[T](ssc_) { def getReceiver(): Receiver[T] = { new SocketReceiver(host, port, bytesToObjects, storageLevel) //調用SocketReceiver } } private[streaming] class SocketReceiver[T: ClassTag]( host: String, port: Int, bytesToObjects: InputStream => Iterator[T], storageLevel: StorageLevel ) extends Receiver[T](storageLevel) with Logging { def onStart() { // Start the thread that receives data over a connection new Thread("Socket Receiver") { setDaemon(true) override def run() { receive() } //啟動線程,調用Receiver方法 }.start() }
在receive()方法中啟動socket接收數據
/** Create a socket connection and receive data until receiver is stopped */ def receive() { var socket: Socket = null try { logInfo("Connecting to " + host + ":" + port) socket = new Socket(host, port) //根據我們應用程序傳入的host和post創建socket對象 logInfo("Connected to " + host + ":" + port) val iterator = bytesToObjects(socket.getInputStream()) //接收數據 while(!isStopped && iterator.hasNext) { store(iterator.next) //接收后的數據進行存儲 } if (!isStopped()) { restart("Socket data stream had no more data") } else { logInfo("Stopped receiving") } } catch { case e: java.net.ConnectException => restart("Error connecting to " + host + ":" + port, e) case NonFatal(e) => logWarning("Error receiving data", e) restart("Error receiving data", e) } finally { if (socket != null) { socket.close() logInfo("Closed socket to " + host + ":" + port) } } } }
6、ReceiverTrackerEndpoint源碼如下:
override def receive: PartialFunction[Any, Unit] = { // Local messages case StartAllReceivers(receivers) => val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers, getExecutors) // receivers就是要啟動的receiver,getExecutors獲得集群中的Executors的列表 for (receiver <- receivers) { val executors = scheduledLocations(receiver.streamId) updateReceiverScheduledExecutors(receiver.streamId, executors) receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation startReceiver(receiver, executors) //循環receivers,每次將一個receiver傳入過去。 } case RestartReceiver(receiver) => // Old scheduled executors minus the ones that are not active any more val oldScheduledExecutors = getStoredScheduledExecutors(receiver.streamId) val scheduledLocations = if (oldScheduledExecutors.nonEmpty) { // Try global scheduling again oldScheduledExecutors } else { val oldReceiverInfo = receiverTrackingInfos(receiver.streamId) // Clear "scheduledLocations" to indicate we are going to do local scheduling val newReceiverInfo = oldReceiverInfo.copy( state = ReceiverState.INACTIVE, scheduledLocations = None) receiverTrackingInfos(receiver.streamId) = newReceiverInfo schedulingPolicy.rescheduleReceiver( receiver.streamId, receiver.preferredLocation, receiverTrackingInfos, getExecutors) } // Assume there is one receiver restarting at one time, so we don't need to update // receiverTrackingInfos startReceiver(receiver, scheduledLocations) case c: CleanupOldBlocks => receiverTrackingInfos.values.flatMap(_.endpoint).foreach(_.send(c)) case UpdateReceiverRateLimit(streamUID, newRate) => for (info <- receiverTrackingInfos.get(streamUID); eP <- info.endpoint) { eP.send(UpdateRateLimit(newRate)) } // Remote messages case ReportError(streamId, message, error) => reportError(streamId, message, error) }
從注釋中可以看到,Spark Streaming指定receiver在那些Executors運行,而不是基于Spark Core中的Task來指定。 通過StartAllReceivers將消息發送給ReceiverTrackerEndpoint
在for循環中為每個receiver分配相應的executor。并調用startReceiver方法:
Receiver是以job的方式啟動的!!! 這里你可能會有疑惑,沒有RDD和來的Job呢?首先,在startReceiver方法中,會將Receiver封裝成RDD
receiverRDD: RDD[Receiver[_]] = (scheduledLocations.isEmpty) { ssc..makeRDD((receiver)) } { preferredLocations = scheduledLocations.map(_.toString).distinct ssc..makeRDD((receiver -> preferredLocations)) }
封裝成RDD后,將RDD提交到集群中運行
future = ssc.sparkContext.submitJob[Receiver[_]]( receiverRDDstartReceiverFunc()(__) => ())
task被發送到executor中,從RDD中取出“Receiver”然后對它執行startReceiverFunc:
// Function to start the receiver on the worker node val startReceiverFunc: Iterator[Receiver[_]] => Unit = (iterator: Iterator[Receiver[_]]) => { if (!iterator.hasNext) { throw new SparkException( "Could not start receiver as object not found.") } if (TaskContext.get().attemptNumber() == 0) { val receiver = iterator.next() assert(iterator.hasNext == false) val supervisor = new ReceiverSupervisorImpl( //Receiver注冊 receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption) supervisor.start() //啟動Receiver supervisor.awaitTermination() } else { // It's restarted by TaskScheduler, but we want to reschedule it again. So exit it. } }
在函數中創建了一個ReceiverSupervisorImpl對象。它用來管理具體的Receiver。
首先它會將Receiver注冊到ReceiverTracker中
override protected def onReceiverStart(): Boolean = {
val msg = RegisterReceiver(
streamId, receiver.getClass.getSimpleName, host, executorId, endpoint)
trackerEndpoint.askWithRetry[Boolean](msg)
}
如果注冊成功,通過supervisor.start()來啟動Receiver
/** Start the supervisor */ def start() { onStart() startReceiver() //啟動Receiver }
// We will keep restarting the receiver job until ReceiverTracker is stopped future.onComplete { case Success(_) => if (!shouldStartReceiver) { onReceiverJobFinish(receiverId) } else { logInfo(s"Restarting Receiver $receiverId") self.send(RestartReceiver(receiver)) } case Failure(e) => if (!shouldStartReceiver) { onReceiverJobFinish(receiverId) } else { logError("Receiver has been stopped. Try to restart it.", e) logInfo(s"Restarting Receiver $receiverId") self.send(RestartReceiver(receiver)) } }(submitJobThreadPool) logInfo(s"Receiver ${receiver.streamId} started")
回到receiverTracker的startReceiver方法中,只要Receiver對應的Job結束了(無論是正常還是異常結束),而ReceiverTracker還沒有停止。
它將會向ReceiverTrackerEndpoint發送一個ReStartReceiver的方法。
// We will keep restarting the receiver job until ReceiverTracker is stopped future.onComplete { case Success(_) => if (!shouldStartReceiver) { onReceiverJobFinish(receiverId) } else { logInfo(s"Restarting Receiver $receiverId") self.send(RestartReceiver(receiver)) } case Failure(e) => if (!shouldStartReceiver) { onReceiverJobFinish(receiverId) } else { logError("Receiver has been stopped. Try to restart it.", e) logInfo(s"Restarting Receiver $receiverId") self.send(RestartReceiver(receiver)) } }(submitJobThreadPool) logInfo(s"Receiver ${receiver.streamId} started")
重新為Receiver選擇一個executor,并再次運行Receiver。直到ReceiverTracker啟動為止。
在ReceiverTracker的receive方法中startReceiver方法第一個參數就是receiver,從實現的可以看出for循環不斷取出receiver,然后調用startReceiver。由此就可以得出一個Job只啟動一個Receiver. 如果Receiver啟動失敗,此時并不會認為是作業失敗,會重新發消息給ReceiverTrackerEndpoint重新啟動Receiver,這樣也就確保了Receivers一定會被啟動,這樣就不會像Task啟動Receiver的話如果失敗受重試次數的影響。
簡單的流程圖:
當前文章:(版本定制)第9課:SparkStreaming源碼解讀之
分享網址:http://m.newbst.com/article34/gegjse.html
成都網站建設公司_創新互聯,為您提供建站公司、動態網站、營銷型網站建設、云服務器、品牌網站建設、響應式網站
聲明:本網站發布的內容(圖片、視頻和文字)以用戶投稿、用戶轉載內容為主,如果涉及侵權請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網站立場,如需處理請聯系客服。電話:028-86922220;郵箱:631063699@qq.com。內容未經允許不得轉載,或轉載時需注明來源: 創新互聯