聊聊spring for kafka的AckMode

本文主要讲述一下spring for kafka的consumer在spring.kafka.consumer.enable-auto-commit是false情况下,AckMode的选项

AckMode

spring-kafka-1.2.3.RELEASE-sources.jar!/org/springframework/kafka/listener/AbstractMessageListenerContainer.java$AckMode

    /**
     * The offset commit behavior enumeration.
     */
    public enum AckMode {

        /**
         * Commit after each record is processed by the listener.
         */
        RECORD,

        /**
         * Commit whatever has already been processed before the next poll.
         */
        BATCH,

        /**
         * Commit pending updates after
         * {@link ContainerProperties#setAckTime(long) ackTime} has elapsed.
         */
        TIME,

        /**
         * Commit pending updates after
         * {@link ContainerProperties#setAckCount(int) ackCount} has been
         * exceeded.
         */
        COUNT,

        /**
         * Commit pending updates after
         * {@link ContainerProperties#setAckCount(int) ackCount} has been
         * exceeded or after {@link ContainerProperties#setAckTime(long)
         * ackTime} has elapsed.
         */
        COUNT_TIME,

        /**
         * User takes responsibility for acks using an
         * {@link AcknowledgingMessageListener}.
         */
        MANUAL,

        /**
         * User takes responsibility for acks using an
         * {@link AcknowledgingMessageListener}. The consumer is woken to
         * immediately process the commit.
         */
        MANUAL_IMMEDIATE,

    }
  • RECORD
    每处理一条commit一次
  • BATCH(默认)
    每次poll的时候批量提交一次,频率取决于每次poll的调用频率
  • TIME
    每次间隔ackTime的时间去commit(跟auto commit interval有什么区别呢?)
  • COUNT
    累积达到ackCount次的ack去commit
  • COUNT_TIME
    ackTime或ackCount哪个条件先满足,就commit
  • MANUAL
    listener负责ack,但是背后也是批量上去
  • MANUAL_IMMEDIATE
    listner负责ack,每调用一次,就立即commit

KafkaMessageListenerContainer$ListenerConsumer

spring-kafka-1.2.3.RELEASE-sources.jar!/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

        @Override
        public void run() {
            if (this.theListener instanceof ConsumerSeekAware) {
                ((ConsumerSeekAware) this.theListener).registerSeekCallback(this);
            }
            this.count = 0;
            this.last = System.currentTimeMillis();
            if (isRunning() && this.definedPartitions != null) {
                initPartitionsIfNeeded();
                // we start the invoker here as there will be no rebalance calls to
                // trigger it, but only if the container is not set to autocommit
                // otherwise we will process records on a separate thread
                if (!this.autoCommit) {
                    startInvoker();
                }
            }
            long lastReceive = System.currentTimeMillis();
            long lastAlertAt = lastReceive;
            while (isRunning()) {
                try {
                    if (!this.autoCommit) {
                        processCommits();
                    }
                    processSeeks();
                    if (this.logger.isTraceEnabled()) {
                        this.logger.trace("Polling (paused=" + this.paused + ")...");
                    }
                    ConsumerRecords<K, V> records = this.consumer.poll(this.containerProperties.getPollTimeout());
                    if (records != null && this.logger.isDebugEnabled()) {
                        this.logger.debug("Received: " + records.count() + " records");
                    }
                    if (records != null && records.count() > 0) {
                        if (this.containerProperties.getIdleEventInterval() != null) {
                            lastReceive = System.currentTimeMillis();
                        }
                        // if the container is set to auto-commit, then execute in the
                        // same thread
                        // otherwise send to the buffering queue
                        if (this.autoCommit) {
                            invokeListener(records);
                        }
                        else {
                            if (sendToListener(records)) {
                                if (this.assignedPartitions != null) {
                                    // avoid group management rebalance due to a slow
                                    // consumer
                                    this.consumer.pause(this.assignedPartitions);
                                    this.paused = true;
                                    this.unsent = records;
                                }
                            }
                        }
                    }
                    else {
                        if (this.containerProperties.getIdleEventInterval() != null) {
                            long now = System.currentTimeMillis();
                            if (now > lastReceive + this.containerProperties.getIdleEventInterval()
                                    && now > lastAlertAt + this.containerProperties.getIdleEventInterval()) {
                                publishIdleContainerEvent(now - lastReceive);
                                lastAlertAt = now;
                                if (this.theListener instanceof ConsumerSeekAware) {
                                    seekPartitions(getAssignedPartitions(), true);
                                }
                            }
                        }
                    }
                    this.unsent = checkPause(this.unsent);
                }
                catch (WakeupException e) {
                    this.unsent = checkPause(this.unsent);
                }
                catch (Exception e) {
                    if (this.containerProperties.getGenericErrorHandler() != null) {
                        this.containerProperties.getGenericErrorHandler().handle(e, null);
                    }
                    else {
                        this.logger.error("Container exception", e);
                    }
                }
            }
            if (this.listenerInvokerFuture != null) {
                stopInvoker();
                commitManualAcks();
            }
            try {
                this.consumer.unsubscribe();
            }
            catch (WakeupException e) {
                // No-op. Continue process
            }
            this.consumer.close();
            if (this.logger.isInfoEnabled()) {
                this.logger.info("Consumer stopped");
            }
        }

这里while循环每次都判断是否auto commit,如果不是则processCommits

        private void processCommits() {
            handleAcks();
            this.count += this.acks.size();
            long now;
            AckMode ackMode = this.containerProperties.getAckMode();
            if (!this.isManualImmediateAck) {
                if (!this.isManualAck) {
                    updatePendingOffsets();
                }
                boolean countExceeded = this.count >= this.containerProperties.getAckCount();
                if (this.isManualAck || this.isBatchAck || this.isRecordAck
                        || (ackMode.equals(AckMode.COUNT) && countExceeded)) {
                    if (this.logger.isDebugEnabled() && ackMode.equals(AckMode.COUNT)) {
                        this.logger.debug("Committing in AckMode.COUNT because count " + this.count
                                + " exceeds configured limit of " + this.containerProperties.getAckCount());
                    }
                    commitIfNecessary();
                    this.count = 0;
                }
                else {
                    now = System.currentTimeMillis();
                    boolean elapsed = now - this.last > this.containerProperties.getAckTime();
                    if (ackMode.equals(AckMode.TIME) && elapsed) {
                        if (this.logger.isDebugEnabled()) {
                            this.logger.debug("Committing in AckMode.TIME " +
                                    "because time elapsed exceeds configured limit of " +
                                    this.containerProperties.getAckTime());
                        }
                        commitIfNecessary();
                        this.last = now;
                    }
                    else if (ackMode.equals(AckMode.COUNT_TIME) && (elapsed || countExceeded)) {
                        if (this.logger.isDebugEnabled()) {
                            if (elapsed) {
                                this.logger.debug("Committing in AckMode.COUNT_TIME " +
                                        "because time elapsed exceeds configured limit of " +
                                        this.containerProperties.getAckTime());
                            }
                            else {
                                this.logger.debug("Committing in AckMode.COUNT_TIME " +
                                        "because count " + this.count + " exceeds configured limit of" +
                                        this.containerProperties.getAckCount());
                            }
                        }

                        commitIfNecessary();
                        this.last = now;
                        this.count = 0;
                    }
                }
            }
        }

handleAcks

        private void handleAcks() {
            ConsumerRecord<K, V> record = this.acks.poll();
            while (record != null) {
                if (this.logger.isTraceEnabled()) {
                    this.logger.trace("Ack: " + record);
                }
                processAck(record);
                record = this.acks.poll();
            }
        }

        private void processAck(ConsumerRecord<K, V> record) {
            if (ListenerConsumer.this.isManualImmediateAck) {
                try {
                    ackImmediate(record);
                }
                catch (WakeupException e) {
                    // ignore - not polling
                }
            }
            else {
                addOffset(record);
            }
        }

这里可以看到,如果不是isManualImmediateAck,则每次是累加到offsets的map中

commitIfNecessary

        private void commitIfNecessary() {
            Map<TopicPartition, OffsetAndMetadata> commits = new HashMap<>();
            for (Entry<String, Map<Integer, Long>> entry : this.offsets.entrySet()) {
                for (Entry<Integer, Long> offset : entry.getValue().entrySet()) {
                    commits.put(new TopicPartition(entry.getKey(), offset.getKey()),
                            new OffsetAndMetadata(offset.getValue() + 1));
                }
            }
            this.offsets.clear();
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("Commit list: " + commits);
            }
            if (!commits.isEmpty()) {
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("Committing: " + commits);
                }
                try {
                    if (this.containerProperties.isSyncCommits()) {
                        this.consumer.commitSync(commits);
                    }
                    else {
                        this.consumer.commitAsync(commits, this.commitCallback);
                    }
                }
                catch (WakeupException e) {
                    // ignore - not polling
                    if (this.logger.isDebugEnabled()) {
                        this.logger.debug("Woken up during commit");
                    }
                }
            }
        }

这里会从offsets的map组装出commits,然后去提交(commitSync或者commitAsync),然后clear掉offsets

manual commit

    @KafkaListener(topics = "k010")
    public void listen(ConsumerRecord<?, ?> cr,Acknowledgment ack) throws Exception {
        LOGGER.info(cr.toString());
        ack.acknowledge();
    }

方法参数里头传递Acknowledgment,然后手工ack
前提要配置AckMode

instance.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);

doc

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,711评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,079评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,194评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,089评论 1 286
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,197评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,306评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,338评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,119评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,541评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,846评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,014评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,694评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,322评论 3 318
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,026评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,257评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,863评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,895评论 2 351

推荐阅读更多精彩内容