问题
flink kafka 设置自动offset 提交
kafka-client 0.11.0.2
kafka-broker 1.1.1
隔一段时间老报错如下
marking the coordinator (id rack null) dead for group
分析
通过阅读源码
org.apache.kafka.clients.consumer.internals.AbstractCoordinator
protected synchronized void coordinatorDead() {
if (this.coordinator != null) {
log.info("Marking the coordinator {} dead for group {}", this.coordinator, this.groupId);
this.client.failUnsentRequests(this.coordinator, CoordinatorNotAvailableException.INSTANCE);
this.coordinator = null;
}
}
HeartbeatThread.run()
while(true) {
synchronized(AbstractCoordinator.this) {
if (this.closed) {
return;
}
if (!this.enabled) {
AbstractCoordinator.this.wait();
} else if (AbstractCoordinator.this.state != AbstractCoordinator.MemberState.STABLE) {
this.disable();
} else {
AbstractCoordinator.this.client.pollNoWakeup();
long now = AbstractCoordinator.this.time.milliseconds();
if (AbstractCoordinator.this.coordinatorUnknown()) {
if (AbstractCoordinator.this.findCoordinatorFuture != null || AbstractCoordinator.this.lookupCoordinator().failed()) {
AbstractCoordinator.this.wait(AbstractCoordinator.this.retryBackoffMs);
}
} else if (AbstractCoordinator.this.heartbeat.sessionTimeoutExpired(now)) {
AbstractCoordinator.this.coordinatorDead();
} else if (AbstractCoordinator.this.heartbeat.pollTimeoutExpired(now)) {
AbstractCoordinator.this.maybeLeaveGroup();
} else if (!AbstractCoordinator.this.heartbeat.shouldHeartbeat(now)) {
AbstractCoordinator.this.wait(AbstractCoordinator.this.retryBackoffMs);
} else {
AbstractCoordinator.this.heartbeat.sentHeartbeat(now);
AbstractCoordinator.this.sendHeartbeatRequest().addListener(new RequestFutureListener<Void>() {
public void onSuccess(Void value) {
synchronized(AbstractCoordinator.this) {
AbstractCoordinator.this.heartbeat.receiveHeartbeat(AbstractCoordinator.this.time.milliseconds());
}
}
public void onFailure(RuntimeException e) {
synchronized(AbstractCoordinator.this) {
if (e instanceof RebalanceInProgressException) {
AbstractCoordinator.this.heartbeat.receiveHeartbeat(AbstractCoordinator.this.time.milliseconds());
} else {
AbstractCoordinator.this.heartbeat.failHeartbeat();
AbstractCoordinator.this.notify();
}
}
}
});
}
}
}
通过源码发现导致问题的原因是client连接kafka brocker coordinator 超时引起
解决
阅读源码中发现如上报错信息只有client 0.11.0.2版本中才会有,可以选择升级客户端client版本
或者调整session超时时长,调整心跳超时时长,调整连接重试时长默认50ms调整为3s
prop.setProperty("session.timeout.ms", "300000")
prop.setProperty("heartbeat.interval.ms", "100000")
prop.setProperty("retry.backoff.ms", "3000")
prop.setProperty("reconnect.backoff.ms", "3000")
prop.setProperty("reconnect.backoff.max.ms", "5000")
prop.setProperty("request.timeout.ms", "400000")
prop.setProperty("fetch.max.wait.ms", "5000")
prop.setProperty("enable.auto.commit", "true")
prop.setProperty("max.poll.records","200")
prop.setProperty("auto.commit.interval.ms", "30000")
prop.setProperty("max.poll.interval.ms", "600000")