一、任务背景
上游Kafka消息量:60W/S,64个分区
Streaming资源:32Executor(堆内1G 堆外2G) 64Threads
Batch窗口:300S
二、解决过程
1、故障描述
spark streaming任务运行失败,故障日志如下:
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:600)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:541)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:201)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:999)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938)
2、故障分析
上述错误日志含义:消费者在处理完一批poll的消息后,在同步提交偏移量给broker时报的错。初步分析日志是由于当前消费者线程消费的分区已经被broker给回收了。
Kafka消费者配置参数:
max.poll.interval.ms = 600000
Streaming相关配置参数:
spark.streaming.kafka.maxRatePerPartition=50000
任务停止后再启动,因上游积压了大量数据,安装上述配置启动时每批最大的数据量为960,000,000【maxRatePerPartition的值 * kafka分区数 * 窗口时间】,某次重启运行图如下:
大概耗时11分钟,同时本例中设置max.poll.interval.ms = 600000【该属性为kafka消费者在每一轮poll()调用之间的最大延迟时长,消费者在获取更多记录之前可以空闲的时间量的上限。如果此超时时间期满之前poll()没有被再次调用,则消费者被视为失败,并且分组将重新平衡。】,综合上述配置结果会出现因为某批次处理时长超过kafka等待poll的最大时长导致kafka集群进行rebalance,和本例的故障现象吻合。
3、解决方案
(a)延长Poll等待时长
(b)降低批次最大数据处理量
调整kafka相关参数如下:
max.poll.interval.ms = 1200000
spark.streaming.kafka.maxRatePerPartition=15000
运行效果如下: