Spark Streaming资源动态申请和动态控制消费速率原理剖析

Spark是粗粒度的,即在默认情况下会预先分配好资源,再进行计算。

好处是资源提前分配好,有计算任务时就直接使用计算资源,不用再考虑资源分配。

不好的地方是,高峰值和低峰值时需要的资源是不一样的。资源如果是针对高峰值情况下考虑的,那势必在低峰值情况下会有大量的资源浪费。

Twitter最近推出了会秒杀Storm的Heron,非常值得关注。因为Heron能有更好的资源分配、 更低的延迟。Heron在语义上兼容了Storm,即原来在Storm上开发的应用程序可以马上在Heron上使用。Storm绝对要成为历史了。Heron的主要开发语言是C++、Java、Python。其API支持Java。

SparkCore的入口SparkContext:

// Optionally scale number of executors dynamically based on workload. Exposed for testing.

val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf)

if (!dynamicAllocationEnabled && _conf.getBoolean("spark.dynamicAllocation.enabled", false)) {

logWarning("Dynamic Allocation and num executors both set, thus dynamic allocation disabled.")

}

_executorAllocationManager =

if (dynamicAllocationEnabled) {

Some(new ExecutorAllocationManager(this, listenerBus, _conf))

} else {

None

}

已经支持资源的动态分配。

Utils.isDynamicAllocationEnabled:

/**

* Return whether dynamic allocation is enabled in the given conf

* Dynamic allocation and explicitly setting the number of executors are inherently

* incompatible. In environments where dynamic allocation is turned on by default,

* the latter should override the former (SPARK-9092).

*/

defisDynamicAllocationEnabled(conf: SparkConf): Boolean = {

conf.getBoolean("spark.dynamicAllocation.enabled", false) &&

conf.getInt("spark.executor.instances", 0) == 0

}

ExecutorAllocationManager:

...

// Clock used to schedule when executors should be added and removed

private var clock: Clock = newSystemClock()

...

有个时钟,基于时钟的定时器会不断的扫描Executor的情况,每过一段时间去看看资源情况。

Master.schedule:

/**

* Schedule the currently available resources among waiting apps. This method will be called

* every time a new app joins or resource availability changes.

*/

private defschedule(): Unit = {

if (state != RecoveryState.ALIVE) { return }

// Drivers take strict precedence over executors

val shuffledWorkers = Random.shuffle(workers) // Randomization helps balance drivers

for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) {

for (driver <- waitingDrivers) {

if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {

launchDriver(worker, driver)

waitingDrivers -= driver

}

}

}

startExecutorsOnWorkers()

}

原先默认的用于分配资源。

ExecutorAllocaionManager:

// Polling loop interval (ms)

private val intervalMillis: Long = 100

...

// A timestamp for each executor of when the executor should be removed, indexed by the ID

// This is set when an executor is no longer running a task, or when it first registers

private valremoveTimes= new mutable.HashMap[String, Long]

...

// Clock used to schedule when executors should be added and removed

private var clock: Clock = new SystemClock()

...

// Executor that handles the scheduling task.

private val executor =

ThreadUtils.newDaemonSingleThreadScheduledExecutor("spark-dynamic-executor-allocation")

...

removeTimes中有Executor的ID。

executor中有定时器,不断执行schedule。默认周期是intervalMillis(100ms)

ExecutorAllocaionManager.start:

/**

* Register for scheduler callbacks to decide when to add and remove executors, and start

* the scheduling task.

*/

defstart(): Unit = {

listenerBus.addListener(listener)

val scheduleTask = newRunnable() {

override def run(): Unit = {

try {

schedule()

} catch {

case ct: ControlThrowable =>

throw ct

case t: Throwable =>

logWarning(s"Uncaught exception in thread ${Thread.currentThread().getName}", t)

}

}

}

executor.scheduleAtFixedRate(scheduleTask, 0,intervalMillis, TimeUnit.MILLISECONDS)

}

ExecutorAllocaionManager.schedule:

/**

* This is called at a fixed interval to regulate the number of pending executor requests

* and number of executors running.

*

* First, adjust our requested executors based on the add time and our current needs.

* Then, if the remove time for an existing executor has expired, kill the executor.

*

* This is factored out into its own method for testing.

*/

private defschedule(): Unit = synchronized {

val now = clock.getTimeMillis

updateAndSyncNumExecutorsTarget(now)

removeTimes.retain { case (executorId, expireTime) =>

val expired = now >= expireTime

if (expired) {

initializing = false

removeExecutor(executorId)

}

!expired

}

}

这个内部方法会被周期性的触发执行。

实际生产环境下,动态资源分配可能要自己做好定制。

SparkStreaming的动态调整的复杂之处在于,即使在batch duration内刚做了调整,但可能本batch duration马上就会过期。

你可以考虑改变执行周期(intervalMills),来动态调整。在一个batchduration中要对数据分片,可以算一下已拥有闲置的core,如果不够,则可以申请增加Executor,从而把任务分配到新增的Executor。

也可以考量针对上一个batchduration的资源需求情况,因为峰值出现时,往往会延续在多个连续的batch duration中。考量上一个batch duration的情况,用某种算法来动态调整后续的batch duration的资源。修改Spark Streaming可以设计StreamingContext的新子类。

其实前面的动态资源分配的定制方式做起来不容易,可能仍不太合适。

备注:

资料来源于:DT_大数据梦工厂(Spark发行版本定制)

更多私密内容,请关注微信公众号:DT_Spark

如果您对大数据Spark感兴趣,可以免费听由王家林老师每天晚上20:00开设的Spark永久免费公开课,地址YY房间号:68917580

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,014评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,796评论 3 386
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,484评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,830评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,946评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,114评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,182评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,927评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,369评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,678评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,832评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,533评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,166评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,885评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,128评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,659评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,738评论 2 351

推荐阅读更多精彩内容