问题及背景
首先说一下背景,最近在做实时数仓,准备构建实时宽表,读取kafka数据实时关联维表并写入kafka和HDFS,由于公司hadoop版本是2.6的所以写HDFS用的BucketingSink
程序开发完运行了一段时间发现写到hdfs文件的状态一直是pending状态
于是各种排查,后来发现是程序一直无法触发checkpoint和savepoint导致pending没有转换为finished,job Manager日志如下
排查
顺着Job Manager日志找到 CheckpointCoordinator,我们可以发现checkpoint触发了triggerCheckpoint方法,顺着往下看会发现
Execution[] executions = new Execution[tasksToTrigger.length];
for (int i = 0; i < tasksToTrigger.length; i++) {
Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt();
if (ee == null) {
LOG.info("Checkpoint triggering task {} of job {} is not being executed at the moment. Aborting checkpoint.",
tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
job);
return new CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
} else if (ee.getState() == ExecutionState.RUNNING) {
executions[i] = ee;
} else {
LOG.info("Checkpoint triggering task {} of job {} is not in state {} but {} instead. Aborting checkpoint.",
tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
job,
ExecutionState.RUNNING,
ee.getState());
return new CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
}
}
可以看到ee.getState() == ExecutionState.RUNNING 在checkpoint时会判断每个Execution的State,当State不为RUNNING时直接报出
Checkpoint triggering task {} of job {} is not in state {} but {} instead. Aborting checkpoint.
并结束triggerCheckpoint,由此推断出有Execution不是运行状态,进入WEB UI 看一下视图发现确实有些Execution处于FINISHED状态
原因是在广播维表时全量与增量数据union到一起,全量数据由HDFS读取,而且用的是readTextFile加载,当读取完HDFS文件后Execution就是FINISHED状态
解决
修改全量维表加载方式改为在RichFlatMap或者BroadcastProcessFunction的open方法中加载,避免出现Execution为finished状态
当然如果不写hdfs或者程序不涉及broadcastState以外的state这种方案是可行的