SparkStreaming源码之receiver

SparkStreaming源码之receiver篇

ReceiverTracker简介

ReceiverTracker管理ReceiverInputDStreams接受者的执行,接受输入流数据并以block的形式将输入的数据以block的形式存储;

ReceiverTracker的实例化和启动

ReceiverTracker是在JobScheduler的start方法中进行初始化和启动的,详细代码如下;

  def start(): Unit = synchronized {
    if (eventLoop != null) return // scheduler has already been started

    //todo 内部的消息循环体
    logDebug("Starting JobScheduler")
    eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {
      override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)

      override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)
    }
    eventLoop.start()

    // attach rate controllers of input streams to receive batch completion updates
    for {
      inputDStream <- ssc.graph.getInputStreams
      rateController <- inputDStream.rateController
    } ssc.addStreamingListener(rateController)

    listenerBus.start(ssc.sparkContext)
    //todo receiverTracker的实例化
    receiverTracker = new ReceiverTracker(ssc)
    inputInfoTracker = new InputInfoTracker(ssc)
    //todo receiverTracker的启动
    receiverTracker.start()
    //todo jobGenerator的启动
    jobGenerator.start()
    logInfo("Started JobScheduler")
  }

ReceiverTracker的内部源码

从start方法入手,在这个方法里面,先实例化了一个消息循环体ReceiverTrackerEndpoint,然后在调用了launchReceivers()方法去启动receiver,见下源代码;

  /** Start the endpoint and receiver execution thread. */
  def start(): Unit = synchronized {
    if (isTrackerStarted) {
      throw new SparkException("ReceiverTracker already started")
    }

    if (!receiverInputStreams.isEmpty) {
      endpoint = ssc.env.rpcEnv.setupEndpoint(
        "ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv))
      if (!skipReceiverLaunch) launchReceivers()
      logInfo("ReceiverTracker started")
      trackerState = Started
    }
  }

进入launchReceivers()方法,此方法是从ReceiverInputDStreams中获取receiver,并且将这些receiver分布到不同的worker节点上运行,而ReceiverInputDStreams就是DstreamGraph中的inputStreams,而runDummySparkJob()这个方法其实运行的就是一个空的job,它的目的就是为了能够让所有的slave节点注册进来从而能够获取到最多的资源,之后就用endPoint给自己发送StartAllReceivers的消息,详细源代码如下:

private def launchReceivers(): Unit = {
    val receivers = receiverInputStreams.map(nis => {
      val rcvr = nis.getReceiver()
      rcvr.setReceiverId(nis.id)
      rcvr
    })

    runDummySparkJob()

    logInfo("Starting " + receivers.length + " receivers")
    endpoint.send(StartAllReceivers(receivers))
  }

下面是StartAllReceivers消息体执行的详细内容:
1,schedulingPolicy.scheduleReceivers(receivers, getExecutors)方法是为了让receiver在最大化数据本地性的需求下均匀的分步在各个executor上;
2,循环遍历每个receiver,采用逐个启动每个receiver的方法;
3,val executors = scheduledLocations(receiver.streamId)获取该receiver分步在哪些executor上;
4,updateReceiverScheduledExecutors(receiver.streamId, executors)更新维护receiverTrackingInfos数据结构,以receiver id为key,以receiver info为value的数据结构;
5,receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation,将该receiver的位置信息存放在receiverPreferredLocations数据结构中;
6,调用startReceiver(receiver, executors)方法,启动receiver;
详细源代码如下:

 override def receive: PartialFunction[Any, Unit] = {
   // Local messages
   case StartAllReceivers(receivers) =>
     val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers, getExecutors)
     for (receiver <- receivers) {
       val executors = scheduledLocations(receiver.streamId)
       updateReceiverScheduledExecutors(receiver.streamId, executors)
       receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation
       startReceiver(receiver, executors)
     }

进入startReceiver方法,该方法主要是启动一个receiver和对应的executor;
查询源码可见,receiver是被封装成了一个rdd,然后以Job的形式通过SparkContext启动,并且用future监听这个reveiver job的状态,因为每个job都是一个线程,一旦启动失败,或者执行异常都会重新发送self.send(RestartReceiver(receiver))消息给自己,重新启动该reveicer;

    /**
     * Start a receiver along with its scheduled executors
     */
    private def startReceiver(
        receiver: Receiver[_],
        scheduledLocations: Seq[TaskLocation]): Unit = {
      def shouldStartReceiver: Boolean = {
        // It's okay to start when trackerState is Initialized or Started
        !(isTrackerStopping || isTrackerStopped)
      }

      val receiverId = receiver.streamId
      if (!shouldStartReceiver) {
        onReceiverJobFinish(receiverId)
        return
      }

      val checkpointDirOption = Option(ssc.checkpointDir)
      val serializableHadoopConf =
        new SerializableConfiguration(ssc.sparkContext.hadoopConfiguration)

      // Function to start the receiver on the worker node
      val startReceiverFunc: Iterator[Receiver[_]] => Unit =
        (iterator: Iterator[Receiver[_]]) => {
          if (!iterator.hasNext) {
            throw new SparkException(
              "Could not start receiver as object not found.")
          }
          if (TaskContext.get().attemptNumber() == 0) {
            val receiver = iterator.next()
            assert(iterator.hasNext == false)
            val supervisor = new ReceiverSupervisorImpl(
              receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)
            supervisor.start()
            supervisor.awaitTermination()
          } else {
            // It's restarted by TaskScheduler, but we want to reschedule it again. So exit it.
          }
        }

      // Create the RDD using the scheduledLocations to run the receiver in a Spark job
      //todo 创建一个rdd
      val receiverRDD: RDD[Receiver[_]] =
        if (scheduledLocations.isEmpty) {
          ssc.sc.makeRDD(Seq(receiver), 1)
        } else {
          val preferredLocations = scheduledLocations.map(_.toString).distinct
          ssc.sc.makeRDD(Seq(receiver -> preferredLocations))
        }
      receiverRDD.setName(s"Receiver $receiverId")
      ssc.sparkContext.setJobDescription(s"Streaming job running receiver $receiverId")
      ssc.sparkContext.setCallSite(Option(ssc.getStartSite()).getOrElse(Utils.getCallSite()))
      //todo 以job的形式启动一个Receiver
      val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit](
        receiverRDD, startReceiverFunc, Seq(0), (_, _) => Unit, ())
      // We will keep restarting the receiver job until ReceiverTracker is stopped
      future.onComplete {
        case Success(_) =>
          if (!shouldStartReceiver) {
            //todo  启动成功
            onReceiverJobFinish(receiverId)
          } else {
            logInfo(s"Restarting Receiver $receiverId")
            self.send(RestartReceiver(receiver))//todo 启动失败,从新启动Receiver
          }
        case Failure(e) =>
          if (!shouldStartReceiver) {
            onReceiverJobFinish(receiverId)
          } else {
            logError("Receiver has been stopped. Try to restart it.", e)
            logInfo(s"Restarting Receiver $receiverId")
            self.send(RestartReceiver(receiver))//todo 启动失败,从新启动Receiver
          }
      }(submitJobThreadPool)
      logInfo(s"Receiver ${receiver.streamId} started")
    }

这个receiver rdd中重要的是startReceiverFunc这个方法,这个方法里面详细描述着receiver启动和它工作的内容,在这里它会实例化一个supervisor;

 // Function to start the receiver on the worker node
 val startReceiverFunc: Iterator[Receiver[_]] => Unit =
   (iterator: Iterator[Receiver[_]]) => {
     if (!iterator.hasNext) {
       throw new SparkException(
         "Could not start receiver as object not found.")
     }
     if (TaskContext.get().attemptNumber() == 0) {
       val receiver = iterator.next()
       assert(iterator.hasNext == false)
       val supervisor = new ReceiverSupervisorImpl(
         receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)
       supervisor.start()
       supervisor.awaitTermination()
     } else {
       // It's restarted by TaskScheduler, but we want to reschedule it again. So exit it.
     }
   }

然后调用ReceiverSupervisor的start方法

  /** Start the supervisor */
  def start() {
    onStart()
    startReceiver()
  }

我们先看下ReceiverSupervisorImpl实现的onStart()方法,接着进入start方法,可以看到在start方法中做了两件事:
1,启动blockIntervalTimer,它是一个定时器,具体执行的是updateCurrentBuffer方法,前面说过SocketInputDStream的例子,通过网络接受数据,并将数据store到currentBuffer中,而updateCurrentBuffer方法的作用就是将currentBuffer中的数据转换成block,然后存储到blocksForPushing数据结构中;
2,启动blockPushingThread,这个线程具体执行的方法是keepPushingBlocks(),在keepPushingBlocks()方法中将生成的block通过blockManager进行存储;

  //todo 在onstart方法里调用start方法
  override protected def onStart() {
    registeredBlockGenerators.foreach { _.start() }
  }
  
  /** Start block generating and pushing threads. */
  def start(): Unit = synchronized {
    if (state == Initialized) {
      state = Active
      //todo 定时器的启动
      blockIntervalTimer.start()
      //todo blockPushingThread线程的启动
      blockPushingThread.start()
      logInfo("Started BlockGenerator")
    } else {
      throw new SparkException(
        s"Cannot start BlockGenerator as its not in the Initialized state [state = $state]")
    }
  }
  
  /** Change the buffer to which single records are added to. */
  private def updateCurrentBuffer(time: Long): Unit = {
    try {
      var newBlock: Block = null
      synchronized {
        if (currentBuffer.nonEmpty) {
          val newBlockBuffer = currentBuffer
          currentBuffer = new ArrayBuffer[Any]
          val blockId = StreamBlockId(receiverId, time - blockIntervalMs)
          listener.onGenerateBlock(blockId)
          //todo 将内存中的数据buffer生成一个block
          newBlock = new Block(blockId, newBlockBuffer)
        }
      }

      if (newBlock != null) {
        //todo 将新生成的block放入到blocksForPushing中
        blocksForPushing.put(newBlock)  // put is blocking when queue is full
      }
    } catch {
      case ie: InterruptedException =>
        logInfo("Block updating timer thread was interrupted")
      case e: Exception =>
        reportError("Error in block updating thread", e)
    }
  }
  
  /** Keep pushing blocks to the BlockManager. */
  private def keepPushingBlocks() {
    logInfo("Started block pushing thread")

    def areBlocksBeingGenerated: Boolean = synchronized {
      state != StoppedGeneratingBlocks
    }

    try {
      // While blocks are being generated, keep polling for to-be-pushed blocks and push them.
      while (areBlocksBeingGenerated) {
        Option(blocksForPushing.poll(10, TimeUnit.MILLISECONDS)) match {
          case Some(block) => pushBlock(block)
          case None =>
        }
      }

      // At this point, state is StoppedGeneratingBlock. So drain the queue of to-be-pushed blocks.
      logInfo("Pushing out the last " + blocksForPushing.size() + " blocks")
      while (!blocksForPushing.isEmpty) {
        val block = blocksForPushing.take()
        logDebug(s"Pushing block $block")
        pushBlock(block)
        logInfo("Blocks left to push " + blocksForPushing.size())
      }
      logInfo("Stopped block pushing thread")
    } catch {
      case ie: InterruptedException =>
        logInfo("Block pushing thread was interrupted")
      case e: Exception =>
        reportError("Error in block pushing thread", e)
    }
  }

ok,ReceiverSupervisor的onStart()方法介绍完毕了,回过头来再看下startReceiver()方法,在这个方法内启动了receiver;

  /** Start receiver */
  def startReceiver(): Unit = synchronized {
    try {
      if (onReceiverStart()) {
        logInfo("Starting receiver")
        receiverState = Started
        //todo receiver启动
        receiver.onStart()
        logInfo("Called receiver onStart")
      } else {
        // The driver refused us
        stop("Registered unsuccessfully because Driver refused to start receiver " + streamId, None)
      }
    } catch {
      case NonFatal(t) =>
        stop("Error starting receiver " + streamId, Some(t))
    }
  }

我们再来看下receiver.onStart()具体的实现,还以SocketReceiver为列,可以看到receive方法中通过socket一条一条的接收数据,并将接收到的数据做存储,其实存储就是存储在上面所说的currentBuffer中了,这样就衔接上了,reveiver不停的接受数据并存储在内存中,ReceiverSupervisor将内存中的数据转换成block并用blockManager存储在内存或者磁盘中;

  def onStart() {
    // Start the thread that receives data over a connection
    new Thread("Socket Receiver") {
      setDaemon(true)
      //todo 调用receive方法
      override def run() { receive() }
    }.start()
  }
  
  /** Create a socket connection and receive data until receiver is stopped */
  def receive() {
    var socket: Socket = null
    try {
      logInfo("Connecting to " + host + ":" + port)
      socket = new Socket(host, port)
      logInfo("Connected to " + host + ":" + port)
      val iterator = bytesToObjects(socket.getInputStream())
      while(!isStopped && iterator.hasNext) {
        //todo 通过网络接受数据不断的尽心存储
        store(iterator.next)
      }
      if (!isStopped()) {
        restart("Socket data stream had no more data")
      } else {
        logInfo("Stopped receiving")
      }
    } catch {
      case e: java.net.ConnectException =>
        restart("Error connecting to " + host + ":" + port, e)
      case NonFatal(e) =>
        logWarning("Error receiving data", e)
        restart("Error receiving data", e)
    } finally {
      if (socket != null) {
        socket.close()
        logInfo("Closed socket to " + host + ":" + port)
      }
    }
  }
}
  

至此receiver介绍完毕;

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,496评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,407评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,632评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,180评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,198评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,165评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,052评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,910评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,324评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,542评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,711评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,424评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,017评论 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,668评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,823评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,722评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,611评论 2 353

推荐阅读更多精彩内容