序
本文主要讲述一下spring for kafka的consumer在spring.kafka.consumer.enable-auto-commit是false情况下,AckMode的选项
AckMode
spring-kafka-1.2.3.RELEASE-sources.jar!/org/springframework/kafka/listener/AbstractMessageListenerContainer.java$AckMode
/**
* The offset commit behavior enumeration.
*/
public enum AckMode {
/**
* Commit after each record is processed by the listener.
*/
RECORD,
/**
* Commit whatever has already been processed before the next poll.
*/
BATCH,
/**
* Commit pending updates after
* {@link ContainerProperties#setAckTime(long) ackTime} has elapsed.
*/
TIME,
/**
* Commit pending updates after
* {@link ContainerProperties#setAckCount(int) ackCount} has been
* exceeded.
*/
COUNT,
/**
* Commit pending updates after
* {@link ContainerProperties#setAckCount(int) ackCount} has been
* exceeded or after {@link ContainerProperties#setAckTime(long)
* ackTime} has elapsed.
*/
COUNT_TIME,
/**
* User takes responsibility for acks using an
* {@link AcknowledgingMessageListener}.
*/
MANUAL,
/**
* User takes responsibility for acks using an
* {@link AcknowledgingMessageListener}. The consumer is woken to
* immediately process the commit.
*/
MANUAL_IMMEDIATE,
}
- RECORD
每处理一条commit一次 - BATCH(
默认
)
每次poll的时候批量提交一次,频率取决于每次poll的调用频率 - TIME
每次间隔ackTime的时间去commit(跟auto commit interval有什么区别呢?
) - COUNT
累积达到ackCount次的ack去commit - COUNT_TIME
ackTime或ackCount哪个条件先满足,就commit - MANUAL
listener负责ack,但是背后也是批量上去 - MANUAL_IMMEDIATE
listner负责ack,每调用一次,就立即commit
KafkaMessageListenerContainer$ListenerConsumer
spring-kafka-1.2.3.RELEASE-sources.jar!/org/springframework/kafka/listener/KafkaMessageListenerContainer.java
@Override
public void run() {
if (this.theListener instanceof ConsumerSeekAware) {
((ConsumerSeekAware) this.theListener).registerSeekCallback(this);
}
this.count = 0;
this.last = System.currentTimeMillis();
if (isRunning() && this.definedPartitions != null) {
initPartitionsIfNeeded();
// we start the invoker here as there will be no rebalance calls to
// trigger it, but only if the container is not set to autocommit
// otherwise we will process records on a separate thread
if (!this.autoCommit) {
startInvoker();
}
}
long lastReceive = System.currentTimeMillis();
long lastAlertAt = lastReceive;
while (isRunning()) {
try {
if (!this.autoCommit) {
processCommits();
}
processSeeks();
if (this.logger.isTraceEnabled()) {
this.logger.trace("Polling (paused=" + this.paused + ")...");
}
ConsumerRecords<K, V> records = this.consumer.poll(this.containerProperties.getPollTimeout());
if (records != null && this.logger.isDebugEnabled()) {
this.logger.debug("Received: " + records.count() + " records");
}
if (records != null && records.count() > 0) {
if (this.containerProperties.getIdleEventInterval() != null) {
lastReceive = System.currentTimeMillis();
}
// if the container is set to auto-commit, then execute in the
// same thread
// otherwise send to the buffering queue
if (this.autoCommit) {
invokeListener(records);
}
else {
if (sendToListener(records)) {
if (this.assignedPartitions != null) {
// avoid group management rebalance due to a slow
// consumer
this.consumer.pause(this.assignedPartitions);
this.paused = true;
this.unsent = records;
}
}
}
}
else {
if (this.containerProperties.getIdleEventInterval() != null) {
long now = System.currentTimeMillis();
if (now > lastReceive + this.containerProperties.getIdleEventInterval()
&& now > lastAlertAt + this.containerProperties.getIdleEventInterval()) {
publishIdleContainerEvent(now - lastReceive);
lastAlertAt = now;
if (this.theListener instanceof ConsumerSeekAware) {
seekPartitions(getAssignedPartitions(), true);
}
}
}
}
this.unsent = checkPause(this.unsent);
}
catch (WakeupException e) {
this.unsent = checkPause(this.unsent);
}
catch (Exception e) {
if (this.containerProperties.getGenericErrorHandler() != null) {
this.containerProperties.getGenericErrorHandler().handle(e, null);
}
else {
this.logger.error("Container exception", e);
}
}
}
if (this.listenerInvokerFuture != null) {
stopInvoker();
commitManualAcks();
}
try {
this.consumer.unsubscribe();
}
catch (WakeupException e) {
// No-op. Continue process
}
this.consumer.close();
if (this.logger.isInfoEnabled()) {
this.logger.info("Consumer stopped");
}
}
这里while循环每次都判断是否auto commit,如果不是则processCommits
private void processCommits() {
handleAcks();
this.count += this.acks.size();
long now;
AckMode ackMode = this.containerProperties.getAckMode();
if (!this.isManualImmediateAck) {
if (!this.isManualAck) {
updatePendingOffsets();
}
boolean countExceeded = this.count >= this.containerProperties.getAckCount();
if (this.isManualAck || this.isBatchAck || this.isRecordAck
|| (ackMode.equals(AckMode.COUNT) && countExceeded)) {
if (this.logger.isDebugEnabled() && ackMode.equals(AckMode.COUNT)) {
this.logger.debug("Committing in AckMode.COUNT because count " + this.count
+ " exceeds configured limit of " + this.containerProperties.getAckCount());
}
commitIfNecessary();
this.count = 0;
}
else {
now = System.currentTimeMillis();
boolean elapsed = now - this.last > this.containerProperties.getAckTime();
if (ackMode.equals(AckMode.TIME) && elapsed) {
if (this.logger.isDebugEnabled()) {
this.logger.debug("Committing in AckMode.TIME " +
"because time elapsed exceeds configured limit of " +
this.containerProperties.getAckTime());
}
commitIfNecessary();
this.last = now;
}
else if (ackMode.equals(AckMode.COUNT_TIME) && (elapsed || countExceeded)) {
if (this.logger.isDebugEnabled()) {
if (elapsed) {
this.logger.debug("Committing in AckMode.COUNT_TIME " +
"because time elapsed exceeds configured limit of " +
this.containerProperties.getAckTime());
}
else {
this.logger.debug("Committing in AckMode.COUNT_TIME " +
"because count " + this.count + " exceeds configured limit of" +
this.containerProperties.getAckCount());
}
}
commitIfNecessary();
this.last = now;
this.count = 0;
}
}
}
}
handleAcks
private void handleAcks() {
ConsumerRecord<K, V> record = this.acks.poll();
while (record != null) {
if (this.logger.isTraceEnabled()) {
this.logger.trace("Ack: " + record);
}
processAck(record);
record = this.acks.poll();
}
}
private void processAck(ConsumerRecord<K, V> record) {
if (ListenerConsumer.this.isManualImmediateAck) {
try {
ackImmediate(record);
}
catch (WakeupException e) {
// ignore - not polling
}
}
else {
addOffset(record);
}
}
这里可以看到,如果不是isManualImmediateAck,则每次是累加到offsets的map中
commitIfNecessary
private void commitIfNecessary() {
Map<TopicPartition, OffsetAndMetadata> commits = new HashMap<>();
for (Entry<String, Map<Integer, Long>> entry : this.offsets.entrySet()) {
for (Entry<Integer, Long> offset : entry.getValue().entrySet()) {
commits.put(new TopicPartition(entry.getKey(), offset.getKey()),
new OffsetAndMetadata(offset.getValue() + 1));
}
}
this.offsets.clear();
if (this.logger.isDebugEnabled()) {
this.logger.debug("Commit list: " + commits);
}
if (!commits.isEmpty()) {
if (this.logger.isDebugEnabled()) {
this.logger.debug("Committing: " + commits);
}
try {
if (this.containerProperties.isSyncCommits()) {
this.consumer.commitSync(commits);
}
else {
this.consumer.commitAsync(commits, this.commitCallback);
}
}
catch (WakeupException e) {
// ignore - not polling
if (this.logger.isDebugEnabled()) {
this.logger.debug("Woken up during commit");
}
}
}
}
这里会从offsets的map组装出commits,然后去提交(commitSync或者commitAsync),然后clear掉offsets
manual commit
@KafkaListener(topics = "k010")
public void listen(ConsumerRecord<?, ?> cr,Acknowledgment ack) throws Exception {
LOGGER.info(cr.toString());
ack.acknowledge();
}
方法参数里头传递Acknowledgment,然后手工ack
前提要配置AckMode
instance.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);