深入理解Spark Streaming流量控制及反压机制

目录

流量控制简介

在流式处理系统中,流量控制(rate control/rate limit)是一个非常重要的话题。对系统进行流控,主要目的是为了保证运行的稳定性,防止突发大流量造成整个系统的扰动(throttle),长时间或剧烈的扰动甚至会使系统宕机。另外,为了保证系统的吞吐量最大化,也需要设计合理的流控门槛,避免系统空转使资源利用率降低。

Spark Streaming作为基于微批次(micro-batch)的流处理框架,其流量的理想状态就是官方文档中所说的“batches of data should be processed as fast as they are being generated”,即每一批次的处理时长batch_process_time需要小于(但是又比较接近)我们设定的批次间隔batch_interval。如果batch_process_time > batch_interval,说明程序的处理能力不足,积累的数据越来越多,最终会造成Executor内存溢出。如果batch_process_time << batch_interval,说明系统有很长时间是空闲的,应该适当提升流量。

Spark Streaming流控基本设置

Spark Streaming通过Executor里的Receiver组件源源不断地接收外部数据,并通过BlockManager将外部数据转化为Spark中的块进行存储。Spark Streaming机制的简单框图如下所示。


要限制Receiver接收数据的速率,可以在SparkConf中设置配置项spark.streaming.receiver.maxRate,单位为数据条数/秒。如果采用的是基于Direct Stream方式的Kafka连接,不经过Receiver,就得设置配置项spark.streaming.kafka.maxRatePerPartition来限流,单位是每分区的数据条数/秒。

这两种方式的优点是设置非常简单,只需要通过实际业务的吞吐量估算一下使批次间隔和处理耗时基本达到平衡的速率就可以了。缺点是一旦业务量发生变化,就只能手动修改参数并重启Streaming程序。另外,人为估计的参数毕竟有可能不准,设置得太激进或太保守都不好。

所以,Spark后来提出了动态流量控制的方案,能够根据当前系统的处理速度智能地调节流量阈值,名为反压(back pressure)机制。其在1.5版本开始加入,ASF JIRA中对应的issue是SPARK-7398。要启用它,只需要将配置项spark.streaming.backpressure.enabled设为true就可以(默认值为false)。

反压机制看似简单,但它背后有一套非常精巧的控制逻辑,下面就来深入看一看。

Spark Streaming反压机制的具体实现

动态流量控制器

o.a.s.streaming.scheduler.RateController抽象类是动态流量控制的核心。其源码不甚长,抄录如下。

private[streaming] abstract class RateController(val streamUID: Int, rateEstimator: RateEstimator)
    extends StreamingListener with Serializable {
  init()

  protected def publish(rate: Long): Unit

  @transient
  implicit private var executionContext: ExecutionContext = _

  @transient
  private var rateLimit: AtomicLong = _

  private def init() {
    executionContext = ExecutionContext.fromExecutorService(
      ThreadUtils.newDaemonSingleThreadExecutor("stream-rate-update"))
    rateLimit = new AtomicLong(-1L)
  }

  private def readObject(ois: ObjectInputStream): Unit = Utils.tryOrIOException {
    ois.defaultReadObject()
    init()
  }

  private def computeAndPublish(time: Long, elems: Long, workDelay: Long, waitDelay: Long): Unit =
    Future[Unit] {
      val newRate = rateEstimator.compute(time, elems, workDelay, waitDelay)
      newRate.foreach { s =>
        rateLimit.set(s.toLong)
        publish(getLatestRate())
      }
    }

  def getLatestRate(): Long = rateLimit.get()

  override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) {
    val elements = batchCompleted.batchInfo.streamIdToInputInfo

    for {
      processingEnd <- batchCompleted.batchInfo.processingEndTime
      workDelay <- batchCompleted.batchInfo.processingDelay
      waitDelay <- batchCompleted.batchInfo.schedulingDelay
      elems <- elements.get(streamUID).map(_.numRecords)
    } computeAndPublish(processingEnd, elems, workDelay, waitDelay)
  }
}

可见,RateController抽象类继承自StreamingListener特征,表示它是一个Streaming监听器。在之前的Spark Core源码精读系列文章中已经讲过了监听器和事件总线机制,因此不再多说了。

RateController的主要工作如下:

  • 监听StreamingListenerBatchCompleted事件,该事件表示一个批次已经处理完成。
  • 从该事件的BatchInfo实例中取得:处理完成的时间戳processingEndTime、实际处理时长processingDelay(从批次的第一个job开始处理到最后一个job处理完成经过的时间)、调度时延schedulingDelay(从批次被提交给Streaming JobScheduler到第一个job开始处理经过的时间)。
  • 另外从事件的StreamInputInfo实例中取得批次输入数据的条数numRecords。
  • 将取得的以上4个参数传递给速率估算器RateEstimator,计算出新的流量阈值,并将其发布出去。

通过RateController的子类ReceiverRateController实现的publish()抽象方法可知,新的流量阈值是发布给了ReceiverTracker。

  private[streaming] class ReceiverRateController(id: Int, estimator: RateEstimator)
      extends RateController(id, estimator) {
    override def publish(rate: Long): Unit =
      ssc.scheduler.receiverTracker.sendRateUpdate(id, rate)
  }

不过下面先看速率估算器RateEstimator的实现,稍后再回来看ReceiverTracker之后的事情。

基于PID机制的速率估算器

o.a.s.streaming.scheduler.rate.RateEstimator是一个很短的特征,其中只给出了计算流量阈值的方法compute()的定义。它还有一个伴生对象用于创建速率估算器的实例,其中写出了更多关于反压机制的配置参数。

object RateEstimator {
  def create(conf: SparkConf, batchInterval: Duration): RateEstimator =
    conf.get("spark.streaming.backpressure.rateEstimator", "pid") match {
      case "pid" =>
        val proportional = conf.getDouble("spark.streaming.backpressure.pid.proportional", 1.0)
        val integral = conf.getDouble("spark.streaming.backpressure.pid.integral", 0.2)
        val derived = conf.getDouble("spark.streaming.backpressure.pid.derived", 0.0)
        val minRate = conf.getDouble("spark.streaming.backpressure.pid.minRate", 100)
        new PIDRateEstimator(batchInterval.milliseconds, proportional, integral, derived, minRate)

      case estimator =>
        throw new IllegalArgumentException(s"Unknown rate estimator: $estimator")
    }
}

目前RateEstimator的唯一实现类是PIDRateEstimator,亦即spark.streaming.backpressure.rateEstimator配置项的值只能为pid。其具体代码如下。

private[streaming] class PIDRateEstimator(
    batchIntervalMillis: Long,
    proportional: Double,
    integral: Double,
    derivative: Double,
    minRate: Double
  ) extends RateEstimator with Logging {
  private var firstRun: Boolean = true
  private var latestTime: Long = -1L
  private var latestRate: Double = -1D
  private var latestError: Double = -1L

  def compute(
      time: Long, 
      numElements: Long,
      processingDelay: Long,
      schedulingDelay: Long 
    ): Option[Double] = {
    this.synchronized {
      if (time > latestTime && numElements > 0 && processingDelay > 0) {
        val delaySinceUpdate = (time - latestTime).toDouble / 1000
        val processingRate = numElements.toDouble / processingDelay * 1000
        val error = latestRate - processingRate

        val historicalError = schedulingDelay.toDouble * processingRate / batchIntervalMillis
        val dError = (error - latestError) / delaySinceUpdate

        val newRate = (latestRate - proportional * error -
                                    integral * historicalError -
                                    derivative * dError).max(minRate) 

        latestTime = time
        if (firstRun) {
          latestRate = processingRate
          latestError = 0D
          firstRun = false
          None
        } else {
          latestRate = newRate
          latestError = error
          Some(newRate)
        }
      } else {
        None
      }
    }
  }
}

PIDRateEstimator充分运用了工控领域中常见的PID控制器的思想。所谓PID控制器,即比例(Proportional)-积分(Integral)-微分(Derivative)控制器,本质上是一种反馈回路(loop feedback)。它把收集到的数据和一个设定值(setpoint)进行比较,然后用它们之间的差计算新的输入值,该输入值可以让系统数据尽量接近或者达到设定值。

下图示出PID控制器的基本原理。


亦即:


其中e(t)代表误差,即设定值与回授值之间的差。也就是说,比例单元对应当前误差,积分单元对应过去累积误差,而微分单元对应将来误差。控制三个单元的增益因子分别为Kp、Ki、Kd

回到PIDRateEstimator的源码来,对应以上的式子,我们可以得知:

  • 处理速率的设定值其实就是上一批次的处理速率latestRate,回授值就是这一批次的速率processingRate,误差error自然就是两者之差。
  • 过去累积误差在这里体现为调度时延的过程中数据积压的速度,也就是schedulingDelay * processingRate / batchInterval。
  • 将来误差就是上面算出的error对时间微分的结果。

将上面三者综合起来,就可以根据Spark Streaming在上一批次以及这一批次的处理速率,估算出一个合适的用于下一批次的流量阈值。比例增益Kpspark.streaming.backpressure.pid.proportional控制,默认值1.0;积分增益Kispark.streaming.backpressure.pid.integral控制,默认值0.2;微分增益Kdspark.streaming.backpressure.pid.derived控制,默认值0.0。

除了上述参数之外,还有两个参数与反压机制相关。一是spark.streaming.backpressure.initialRate,用于控制初始化时的处理速率。二是spark.streaming.backpressure.pid.minRate,用于控制最小处理速率,默认值100条/秒。

通过RPC发布流量阈值

回来看ReceiverTracker,顾名思义,它负责追踪Receiver的状态。其sendRateUpdate()方法如下。

  def sendRateUpdate(streamUID: Int, newRate: Long): Unit = synchronized {
    if (isTrackerStarted) {
      endpoint.send(UpdateReceiverRateLimit(streamUID, newRate))
    }
  }

其中endpoint是RPC端点的引用,具体来说,是ReceiverTrackerEndpoint的引用。这个方法会将流ID与新的流量阈值包装在UpdateReceiverRateLimit消息中发送过去。

ReceiverTrackerEndpoint收到这条消息后,会再将其包装为UpdateRateLimit消息并发送给Receiver注册时的RPC端点(位于ReceiverSupervisorImpl类中)。

  private val endpoint = env.rpcEnv.setupEndpoint(
    "Receiver-" + streamId + "-" + System.currentTimeMillis(), new ThreadSafeRpcEndpoint {
      override val rpcEnv: RpcEnv = env.rpcEnv

      override def receive: PartialFunction[Any, Unit] = {
        case StopReceiver =>
          logInfo("Received stop signal")
          ReceiverSupervisorImpl.this.stop("Stopped by driver", None)
        case CleanupOldBlocks(threshTime) =>
          logDebug("Received delete old batch signal")
          cleanupOldBlocks(threshTime)
        case UpdateRateLimit(eps) =>
          logInfo(s"Received a new rate limit: $eps.")
          registeredBlockGenerators.asScala.foreach { bg =>
            bg.updateRate(eps)
          }
      }
    })

可见,收到该消息之后调用了BlockGenerator.updateRate()方法。BlockGenerator是RateLimiter的子类,它负责将收到的流数据转化成块存储。updateRate()方法是在RateLimiter抽象类中实现的。

  private[receiver] def updateRate(newRate: Long): Unit =
    if (newRate > 0) {
      if (maxRateLimit > 0) {
        rateLimiter.setRate(newRate.min(maxRateLimit))
      } else {
        rateLimiter.setRate(newRate)
      }
    }

这里最终借助了Guava中的限流器RateLimiter实现限流(Spark是不会重复造轮子的),其中maxRateLimit就是前面提到过的spark.streaming.receiver.maxRate参数。至此,新的流量阈值就设置好了。

以上就是与反压机制有关的全部细节,整个流程可以用下面的框图表示。


还有最后一个小问题,流量阈值设定好之后是如何生效的?这其实已经超出了本文的范畴,简单看一下。

借助Guava令牌桶完成流量控制

Receiver在收到一条数据之后,会调用BlockGenerator.addData()方法,将数据存入缓存。然后再从缓存取数据,并包装成一个个block。

  def addData(data: Any): Unit = {
    if (state == Active) {
      waitToPush()
      synchronized {
        if (state == Active) {
          currentBuffer += data
        } else {
          throw new SparkException(
            "Cannot add data as BlockGenerator has not been started or has been stopped")
        }
      }
    } else {
      throw new SparkException(
        "Cannot add data as BlockGenerator has not been started or has been stopped")
    }
  }

注意到在真正存入缓存之前,先调用了waitToPush()方法,它本质上就是Guava的RateLimiter.acquire()方法。

  @CanIgnoreReturnValue
  public double acquire() {
    return acquire(1);
  }

  @CanIgnoreReturnValue
  public double acquire(int permits) {
    long microsToWait = reserve(permits);
    stopwatch.sleepMicrosUninterruptibly(microsToWait);
    return 1.0 * microsToWait / SECONDS.toMicros(1L);
  }

  @Override
  final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
    resync(nowMicros);
    long returnValue = nextFreeTicketMicros;
    double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
    double freshPermits = requiredPermits - storedPermitsToSpend;
    long waitMicros =
        storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
            + (long) (freshPermits * stableIntervalMicros);

    this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros);
    this.storedPermits -= storedPermitsToSpend;
    return returnValue;
  }

Guava的限流器是计算机网络中经典限流方法——令牌桶(token bucket)算法的典型实现。acquire()方法的作用是从RateLimiter获取一个令牌(这里叫permit),如果能够取到令牌才将数据缓存,如果不能取到令牌就会被阻塞。RateLimiter.setRate()方法就是通过改变向令牌桶中放入令牌的速率(参数名称permitsPerSecond)来实现流量控制的。

关于令牌桶算法的细节,可以参见英文维基,也可以参考Guava源码,内容十分丰富。下图只是一个简单的示意。

The End

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,686评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,668评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,160评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,736评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,847评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,043评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,129评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,872评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,318评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,645评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,777评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,470评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,126评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,861评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,095评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,589评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,687评论 2 351