1.Exception:
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
2.参考文章
https://blog.csdn.net/shibuwodai_/article/details/80678717
3.原因总结
两次poll()的时间间隔大于配置的session.timeout.ms,根本原因是处理时间太长,大于设定的session.timeout.ms。如果长时间不调用poll()方法,集群会认为该消费者已经挂掉了,就不会让它提交偏移量了,这样就会造成重复消费数据。
4.解决方案总结
- 调大max.poll.interval.ms(两次poll方法最大时间间隔),默认时间为300000ms
- 调小max.poll.records(一次最多处理的记录数量),默认500
- 启动多个线程并行处理数据,但要注意处理完一批消息后才能提交offset,然后进行下次的poll(会用到CountDownLatch)
5.session.timeout.ms 和 max.poll.interval.ms的区别
Assuming we are talking about Kafka 0.10.1.0 or upwards where each consumer instance employs two threads to function. One is user thread from which poll is called; the other is heartbeat thread that specially takes care of heartbeat things.
session.timeout.ms is for heartbeat thread. If coordinator fails to get any heartbeat from a consumer before this time interval elapsed, it marks consumer as failed and triggers a new round of rebalance.
max.poll.interval.ms is for user thread. If message processing logic is too heavy to cost larger than this time interval, coordinator explicitly have the consumer leave the group and also triggers a new round of rebalance.
heartbeat.interval.ms is used to have other healthy consumers aware of the rebalance much faster. If coordinator triggers a rebalance, other consumers will only know of this by receiving the heartbeat response with REBALANCE_IN_PROGRESS exception encapsulated. Quicker the heartbeat request is sent, faster the consumer knows it needs to rejoin the group.
Suggested values:
session.timeout.ms : a relatively low value, 10 seconds for instance.
max.poll.interval.ms: based on your processing requirements
heartbeat.interval.ms: a relatively low value, better 1/3 of the session.timeout.ms
6.解决方案代码
修改配置参数,调大间隔,调小一次处理的最大任务数量
props.put("max.poll.records", 8);
props.put("max.poll.interval.ms", "30000");
props.put("session.timeout.ms", "30000");
使用多线程并行处理
@Scheduled(fixedRate = 5000)
public void processing()
{
//如果消息队列中没有消息,等待timeout毫秒后,调用poll()方法。
//如果队列中有消息,立即消费消息,每次消费的消息的多少
//可以通过max.poll.records配置
ConsumerRecords<String, String> records = consumer.poll(3000);
if (records.count() == 0)
{
return;
}
Iterator<ConsumerRecord<String, String>> iterator = records.iterator();
CountDownLatch countDownLatch = new CountDownLatch(records.count());
ConsumerRecord array[] = new ConsumerRecord[records.count()];
int i;
for (i = 0; i < records.count(); ++i)
{
array[i] = iterator.next();
}
for (i = 0; i < records.count(); ++i){
final int id = i;
if (id < records.count() - 1)
{
new Thread(()-> {
disposeOneRecord(array[id],false);
countDownLatch.countDown();
}).start();
}
else
{
new Thread(()-> {
disposeOneRecord(array[id],true);
countDownLatch.countDown();
}).start();
}
}
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
consumer.commitAsync();
logger.info(String.format("Successfully processing %d records", records.count()));
}
private void disposeOneRecord(ConsumerRecord<String, String> record, boolean saveInRedis)
{
String[] split;
DCSPoint point;
String rowKey, qualifier, value;
List<Put> putList = new ArrayList<>();
Map<String,Object> tagAndValue = JSONObject.parseObject(record.value()).getInnerMap();
for (String tag : tagAndValue.keySet()) {
split = tag.split("_");
if (split.length != 2)
{
continue;
}
try {
point = DCSPoint.valueOf(split[1].toUpperCase());
}catch (IllegalArgumentException e){
continue;
}
if (point.getSection() == Section.UNKNOWN || point.getDataType() != DataType.REAL)
{
continue;
}
value = tagAndValue.get(tag).toString();
if (saveInRedis)
{
RedisConfig.masterRedis.set(tag, value);
}
rowKey = split[0] + "_" + record.key();
qualifier = split[1];
putList.add(HBaseDaoUtil.cellPut(rowKey, HBaseConfig.FAMILY,qualifier,value));
}
hBaseDao.adds(HBaseConfig.TABLE_NAME, putList);
}