Structured Streaming 分析

本文对StructedStreaming的流程/机制进行分析

开发structedStreaming应用

StructedStreaming应用开发流程

从官网/源码中可以看到structedstreaming应用的开发
除了spark的初始化工作,通常有三步与业务相关的操作:

1.获取输入数据源(可以理解为source)

val lines = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option(subscribeType, topics)
.load()
.selectExpr("CAST(value AS STRING)")
.as[String]

2.根据业务逻辑对数据进行转换处理 (业务处理)

wordCounts = lines.flatMap(_.split(" ")).groupBy("value").count()

3.将处理结果写入第三方数据源,整个流应用通过query.start启动(可以理解为sink)

query = wordCounts.writeStream
.outputMode("complete")
.format("console")
.option("checkpointLocation", checkpointLocation)
.start()
query.awaitTermination()

流数据的读取

通过DataStreamReader类完成应用层与不同的流source源的reader隔离。load方法会为应用获取数据的逻辑

在处理数据源时框架使用serviceload机制,将所有集成DataSourceRegister的类加载如内存,判断对应source的shortName是否与设置的一致,如果一致,则实例化此类。并根据此类属性生成对应的dataframe。

当前支持的source源有如下:

Source名 Source源
MemorySource 测试用
TextSocketSource 用于展示使用
FileStreamSource 从固定目下下读文件
KafkaSource kafka作为数据源
RateStreamSource 固定速率的消息生成器,自增长的long型和时间戳

流数据的写出

数据的写出需要选择写出模式以及写出的sink源

写出模式:append,update,complete。
Structed streaming对写出模式的支持与数据处理时使用到的算子有关。需要根据需求,处理逻辑选合适的写出模式。
可参考:(http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes)
sink源的写出:
在处理sink源时框架依然使用serviceload机制,将所有集成DataSourceRegister的类加载如内存,判断对应source的shortName是否与设置的一致,如果一致,则实例化此类

当前实现的sink:

Sink名 sink目的地
memorysink 测试用
foreachSink 需要实现foreachwriter,用于定制化sink
kafkaSink 写出数据到kafka
fileformatSink 写出数据到hdfs。支持ORC,parquet等

StructedStreaming深入理解

对于structed streaming有如上理解即可开发相关应用。但structedstreaming的实现机制依然值得深究,尤其是structedstreaming的job触发机制,watermark是如何实现的,状态数据是如何保存的,用户应用如何被恢复的。如下对这三个"问题"进行分析

About Trigger

与sparkstreaming基于定时器产生job然后调度的机制不同,structedstreaming实现了一套新的job触发机制(trigger)。类似于flink这就是trigger机制。

trigger的设置

通过DataStreamWriter.trigger()完成对trigger设置。默认的trigger为ProcessingTime(interval),interval默认为0

trigger的分类

image.png

trigger有三种,OneTimeTrigger只会触发一次计算。在流应用中一般使用ProcessingTime和ContinuousTrigger两种,下面对着两种trigger进行对比。

Trigger类 ProcessingTime Continuous
对应execution MicroBatchExecution ContinuousExecution
工作模式 以一定间隔(interval)调度计算逻辑,间隔为0时,上批次调用完成后,立即进入下一批次调用一直调用,退化为类似sparkstreaming的micro batch的流处理 以一定间隔(interval)查看流计算状态
支持API 支持API丰富,如汇聚,关联等操作 仅简单的projection类(map,select等)
备注 total-cores个数大于partition数,task长时运行

ProcessingTime

在使用ProcessingTime Trigger时,对应的执行引擎为MicrobatchExecution。

Trigger调度机制如下:
override def execute(triggerHandler: () => Boolean): Unit = {
while (true) {
val triggerTimeMs = clock.getTimeMillis
val nextTriggerTimeMs = nextBatchTime(triggerTimeMs)
val terminated = !triggerHandler()
if (intervalMs > 0) {
val batchElapsedTimeMs = clock.getTimeMillis - triggerTimeMs
if (batchElapsedTimeMs > intervalMs) {
notifyBatchFallingBehind(batchElapsedTimeMs)
}
if (terminated) {
return
}
clock.waitTillTime(nextTriggerTimeMs)
} else {
if (terminated) {
return
}}}}

ProcessingTime Trigger循环调度每执行逻辑:
triggerExecutor.execute(() => {
startTrigger()
if (isActive) {
reportTimeTaken("triggerExecution") {
if (currentBatchId < 0) {
// We'll do this initialization only once
populateStartOffsets(sparkSessionForStream)
...
} else {
constructNextBatch()
}
if (dataAvailable) {
currentStatus = currentStatus.copy(isDataAvailable = true)
updateStatusMessage("Processing new data")
runBatch(sparkSessionForStream)
}
}
// Report trigger as finished and construct progress object.
finishTrigger(dataAvailable)
if (dataAvailable) {
// Update committed offsets.
commitLog.add(currentBatchId)
committedOffsets ++= availableOffsets
currentBatchId += 1
sparkSession.sparkContext.setJobDescription(getBatchDescriptionString)
} else {
currentStatus = currentStatus.copy(isDataAvailable = false)
updateStatusMessage("Waiting for data to arrive")
Thread.sleep(pollingDelayMs)
}}
updateStatusMessage("Waiting for next trigger")
isActive
})

ContinuousTrigger

在使用ContinuousTrigger时,对应的执行逻辑为continuousExecution。在调度时,Trigger退化为ProcessingTime Trigger。仅仅对执行状态查询,记录

Continuous执行逻辑
triggerExecutor.execute(() => {
startTrigger()
if (reader.needsReconfiguration() && state.compareAndSet(ACTIVE, RECONFIGURING)) {
stopSources()
if (queryExecutionThread.isAlive) {
sparkSession.sparkContext.cancelJobGroup(runId.toString)
queryExecutionThread.interrupt()
}
false
} else if (isActive) {
currentBatchId = epochEndpoint.askSync[Long](IncrementAndGetEpoch)
logInfo(s"New epoch $currentBatchId is starting.")
true
} else {
false
}})
在ContinuousDataSourceRDD的compute方法中可以看出,其计算逻辑如下:

  • 通过一个名为continuous-reader--{context.partitionId()}--" + s"{context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY)} 的线程实时获取数据,放入名为queue的队列中。
  • worker线程则长时间运行,在计算时则是从queue中实时获取消息处理。

About waternark

StructedStreaming的与sparkstreaming相比一大特性就是支持基于数据中的时间戳的数据处理。也就是在处理数据时,可以对记录中的字段的时间进行考虑。eventTime更好的代表数据本身的信息。
在获取消息本身的时间戳之后,就可以根据该时间戳来判断消息的到达是否延迟(乱序)以及延迟的时间是否在容忍的范围内。该判断方法是根据watermark机制来设置和判断消息的有效性(延迟是否在可容忍范围内)


image.png

watermark的设置

通过dataset.withWatermark()完成对watermark的设置

watermark的生成/更新

1.在driver内注册一个累加器eventTimeStats;
2.在一个批次计算内,executor的各task根据各自分区内的消息的时间戳,来更新累加器
executor中各task获取分区的eventtime信息方式如下:
在EventTimeWatermarkExec中的doExecute方法中
iter.map { row =>
eventTimeStats.add(getEventTime(row).getLong(0) / 1000)
row
}
def add(eventTime: Long): Unit = {
this.max = math.max(this.max, eventTime)
this.min = math.min(this.min, eventTime)
this.count += 1
this.avg += (eventTime - avg) / count
}
3.在driver端生成batch时,获取各个操作/plan的watermark,找出操作的最小的watermark时间点,写入offsetSeqMetadata,同时写入offsetlog
// 计算各plan的watermark lastExecution.executedPlan.collect { case e: EventTimeWatermarkExec => e }.zipWithIndex.foreach { case (e, index) if e.eventTimeStats.value.count > 0 => logDebug(s"Observed event time stats $index: ${e.eventTimeStats.value}") val newWatermarkMs = e.eventTimeStats.value.max - e.delayMs val prevWatermarkMs = watermarkMsMap.get(index) if (prevWatermarkMs.isEmpty || newWatermarkMs > prevWatermarkMs.get) { watermarkMsMap.put(index, newWatermarkMs) } //找出watermark中最小值 if(!watermarkMsMap.isEmpty) { val newWatermarkMs = watermarkMsMap.minBy(_._2)._2 if (newWatermarkMs > batchWatermarkMs) { logInfo(s"Updating eventTime watermark to: $newWatermarkMs ms") batchWatermarkMs = newWatermarkMs } //写入offsetSeqMetadata offsetSeqMetadata = offsetSeqMetadata.copy( batchWatermarkMs = batchWatermarkMs, batchTimestampMs = triggerClock.getTimeMillis()) //写入offsetlog offsetLog.add( currentBatchId, availableOffsets.toOffsetSeq(sources, offsetSeqMetadata)
4.根据watermark在读消息时过滤数据
StreamingSymmetricHashJoinExec -> doExecutor -> processPartitions -> StoreAndJoinWithOtherSide中有如下操作: val nonLateRows = WatermarkSupport.watermarkExpression(watermarkAttribute, eventTimeWatermark) match { case Some(watermarkExpr) => val predicate = newPredicate(watermarkExpr, inputAttributes) inputIter.filter { row => !predicate.eval(row) } case None => inputIter }

About state:

流应用中,如果有状态相关的如汇聚,关联等操作,需要在应用中将部分数据进行缓存,structedstreaming中通过statestore来对数据缓存以备后续计算及异常恢复使用。

当前的statestore的实现仅HDFSBackedStateStore,由HDFSBackedStateStoreProvider生成和管理; 每个HDFSBackedStateStoreProvider对应一个目录。

状态数据的写入:

在在一些有状态的操作如关联汇聚等,部分数据需要保存以备后续计算使用,

store的put操作:
只有需要存储部分状态的操作/算子需要对状态数据进行缓存。从源码中查看,有如下算子:StateStoreSaveExec,FlatMapGroupsWithStateExec,SymmetricHashJoinStateManager

以流关联操作为例,介绍SymmetricHashJoinStateManager中的state写流程如下:

1) 将数据写入state文件:在StreamingSymmetricHashJoinExec的doExecute方法中,调用到processPartitions,会调用到OneSideHashJoiner的storeAndJoinWithOtherSide方法,会根据条件判断该记录是否写入临时文件的输出流中。判断条件condition ( !stateKeyWatermarkPredicateFunc(key) && !stateValueWatermarkPredicateFunc(thisRow))

image.png

2) 在计算节结束后,将statestore数据写入磁盘
StreamingSymmetricHashJoinExec -> onOutputCompletion -> leftSideJoiner.commitStateAndGetMetrics -> joinStateManager.commit -> keyToNumValues.commit -> StateStoreHandler.commit -> HDFSBackedStateStore.commit

状态数据的读取:

在一些有状态的操作如关联汇聚等,需要对“历史/之前批次”数据进行“缓存”,以备下次计算时,读取使用。
有两处读取store的逻辑

1.statestoreRdd的compute方法
2.StreamingSymmetricHashJoinExec -> doExecutor -> processPartitions -> OneSideHashJoiner.init -> SymmetricHashJoinStateManager.init -> KeyToNumValuesStore.init -> getStateStore -> stateStore.get ->storeProvider.getStore

状态数据的管理/maintain
在executor内部,对于每一个分片启动一个线程定期“compact”中间数据,周期由spark.sql.streaming.stateStore.maintenanceInterval参数控制,默认为60s,线程名 : state-store-maintenance-task 主要工作是扫描delta文件,生成snapshot文件,清理陈旧文件。

生成snapshot文件具体逻辑:
1) 扫描目录下的文件,找出delta文件当前最大的版本号Max(d)(delta文件的命名方式Int.delta,版本号为Int值,如10.delta,则版本号为10)
2) 找出当前最大的snapshot文件的版本号Max(s)(delta文件的命名方式Int.snapshot,版本号为Int值,如10.snapshot,则版本号为10)
3) 当Max(d) - Max(s) 大于spark.sql.streaming.stateStore.minDeltasForSnapshot(默认为10)时,进行打快照操作。否则,跳过。
陈旧文件清理:
1) 找出当前文件的最大版本号Max(v)
2) MaxversionToRetain = Max(v) - spark.sql.streaming.minBatchesToRetain(默认100)时,当MaxversionToRetain > 0 时清理所有版本号小于MaxversionToRetain的文件。否则,跳过

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 211,884评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,347评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,435评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,509评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,611评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,837评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,987评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,730评论 0 267
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,194评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,525评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,664评论 1 340
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,334评论 4 330
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,944评论 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,764评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,997评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,389评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,554评论 2 349