1. 简要说明
由于Driver中的SparkContext类似项目经理, 在启动时, 他先启动了一个记录工作状态的小本本. 这就是ListenerBus.
Spark使用事件监听器模式来实现消息的分发, Executor的心跳裹挟着各种重要的信息发送到Driver后被扔进这个Bus里, 然后各种Listener就会被对应的信息触发.从而跟踪各种事情的scheduler.
这个过程非常类似于产品经理每天早晨开会把所有人的进度和问题记录下来, 然后根据任务蓝图来发布各种指示
后边我们来介绍几种ListenerBus
2. customer listener
SparkListener
继承 SparkListenerInterface
实现一些 callback methods 来作为回调的钩子, 这是监听器一般的设计方法
Spark 利用SparkListeners
来让内部的调度服务管理分布式环境下的运行. 谁需要了解集群的状态, 就注册一个Listener上去, 然后拿自己需要的信息.像webui
Executor Status
等等
用户可以设计自己的SparkListener来管理自己需要的一些东西.
import org.apache.spark.scheduler.{SparkListenerStageCompleted, SparkListener, SparkListenerJobStart}
class CustomSparkListener extends SparkListener {
// 监听stage开始, 开始时打印一条信息
override def onJobStart(jobStart: SparkListenerJobStart) {
println(s"Job started with ${jobStart.stageInfos.size} stages: $jobStart")
}
// 监听stage结束, 结束时打印一条信息
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = {
println(s"Stage ${stageCompleted.stageInfo.stageId} completed with ${stageCompleted.stageInfo.numTasks} tasks.")
}
}
注册这个Listener的一句话接口
sparkContext.addSparkListener(listener: SparkListenerInterface): Unit
成果后看到打印日志
INFO SparkContext: Registered listener org.shadowinlife.CustomerListener
3. SparkListener Interface
这个trait在org.spark.scheduler里
/**
* :: DeveloperApi ::
* Interface for listening to events from the Spark scheduler. Note that this is an internal
* interface which might change in different Spark releases. Java clients should extend
* {@link JavaSparkListener}
*/
@DeveloperApi
trait SparkListener {
/**
* Called when a stage completes successfully or fails, with information on the completed stage.
*/
def onStageCompleted(stageCompleted: SparkListenerStageCompleted) { }
/**
* Called when a stage is submitted
*/
def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) { }
/**
* Called when a task starts
*/
def onTaskStart(taskStart: SparkListenerTaskStart) { }
/**
* Called when a task begins remotely fetching its result (will not be called for tasks that do
* not need to fetch the result remotely).
*/
def onTaskGettingResult(taskGettingResult: SparkListenerTaskGettingResult) { }
/**
* Called when a task ends
*/
def onTaskEnd(taskEnd: SparkListenerTaskEnd) { }
/**
* Called when a job starts
*/
def onJobStart(jobStart: SparkListenerJobStart) { }
/**
* Called when a job ends
*/
def onJobEnd(jobEnd: SparkListenerJobEnd) { }
/**
* Called when environment properties have been updated
*/
def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate) { }
/**
* Called when a new block manager has joined
*/
def onBlockManagerAdded(blockManagerAdded: SparkListenerBlockManagerAdded) { }
/**
* Called when an existing block manager has been removed
*/
def onBlockManagerRemoved(blockManagerRemoved: SparkListenerBlockManagerRemoved) { }
/**
* Called when an RDD is manually unpersisted by the application
*/
def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD) { }
/**
* Called when the application starts
*/
def onApplicationStart(applicationStart: SparkListenerApplicationStart) { }
/**
* Called when the application ends
*/
def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) { }
/**
* Called when the driver receives task metrics from an executor in a heartbeat.
*/
def onExecutorMetricsUpdate(executorMetricsUpdate: SparkListenerExecutorMetricsUpdate) { }
/**
* Called when the driver registers a new executor.
*/
def onExecutorAdded(executorAdded: SparkListenerExecutorAdded) { }
/**
* Called when the driver removes an executor.
*/
def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved) { }
/**
* Called when the driver receives a block update info.
*/
def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated) { }
}
4. Spark内部用到的几个Listener
Spark Listener | Description |
---|---|
EventLoggingListener | 把JSON格式的日志写到history server里去 |
StatsReportListener | 在每个stage结束的时候记录统计信息 |
SparkFirehoseListener |
顾名思义, 监听所有信息, 基本是为了二次开发准备的 |
ExecutorAllocationListener |
ExecutorAllocationListener 监听所有和stages, tasks, executors有关的信息, 比如onStageSubmitted , onStageCompleted , onTaskStart , onTaskEnd , onExecutorAdded , onExecutorRemoved . 然后为ExecutorAllocationManager 服务. 这个监听器本身也实现在ExecutorAllocationManager里. 很多spark的service都是在内部实现Listener, 然后挂载到sparkContext上 |
HeartbeatReceiver | 非常重要的心跳信息 |
StreamingJobProgressListener | 记录spark stream job里每个batch内部的统计信息 |
ExecutorsListener | 为 web ui上的[Executors tab]提供信息 |
StorageStatusListener, RDDOperationGraphListener, EnvironmentListener, BlockStatusListener, StorageListener | 顾名思义, 数据存储相关的统计信息 |
SpillListener | 当shuffle过程(理解为搬砖工需要移交工作内容, 比如后端三个人开发的三个接口同时交付给前端的一个哥们) 中产生的中间数据过大时, 会把这些中间结果存到磁盘上, 这个过程就是spill, 这个listener会监听这个过程. 关于shuffle过程有很多相关的分析 |
ApplicationEventListener | 用于web ui上application相关的那些统计信息 |
StreamingQueryListenerBus | StreamSQL相关, 新特性 |
SQLListener | 把sparkSQL相关的运行信息写入到History Server里去 |
StreamingListenerBus | 顾名思义 |
JobProgressListener | 在执行任务的时候, driver端可以看到console里有一个进度条显示当前有完成了多少task, 有多少task在运行, 有多少个stage, 当前运行到哪一步了. 就是这个listener提供的数据. |