实时任务监控原因
在实时任务执行的过程中,由于数据突然激增或网络阻塞等情况,使得任务数据堆积或失败等
解决办法
通过实现SparkListener和StreamingListener重写其中的相关方法,便能监控任务的执行情况,对症下药
class NeiStreamingListener(ssc:StreamingContext,delay:Long) extends SparkListener with StreamingListener{
override def onBatchCompleted(batchStarted: StreamingListenerBatchCompleted): Unit = synchronized{
// 获取任务名称
val appName = ssc.sparkContext.appName
if (batchStarted != null){
val batchInfo = batchStarted.batchInfo
// 接收批次数据量
val numRecords = batchInfo.numRecords
// 调度延迟时间
val schedulingTime = batchInfo.schedulingDelay.getOrElse(0L)
// 执行时间
val processingTime = batchInfo.processingDelay.getOrElse(0L)
// 总延迟时间
val totalDelay = batchInfo.totalDelay.getOrElse(0L)
if(delay != -1 && schedulingTime > delay ){
val yarnManager = new YarnAppManager()
yarnManager.getJobState()
val sparMap = yarnManager.sparkMap()
if (sparMap.containsKey(appName) ) {
val appid: String = sparMap.get(appName).getApplicationId.toString
// 可以在此处添加对应逻辑,告警邮件或短信等
}
}
}
}
}
以上代码为重写onBatchCompleted方法,获取实时任务中批次数据处理延迟时间,并通过yarn获取到任务的applicationId;