RocketMQ源码分析----ProcessQueue

RockerMQ里有个非常重要的数据结构叫ProcessQueue,很多功能,例如消费进度,消费等等功能的底层核心数据保存都是有ProcessQueue提供,下面介绍一下ProcessQueue提供的功能,而整个涉及的流程不会在这展开,在另外的功能分析文章如果涉及才会深入分析

看下代码上的注释:

Queue consumption snapshot

即消息快照的意思,为什么要这样形容呢?主要是因为在消息拉取到的时候,会把消息存放在其中。另外在拉取消息的时候,使用是的PullRequest去请求,其内部结构如下:

public class PullRequest {
    private String consumerGroup;
    private MessageQueue messageQueue;
    private ProcessQueue processQueue;
    private long nextOffset;
    private boolean lockedFirst = false;
}

可以看到,ProcessQueue和一个MessageQueue是对应的,即一个队列会有一个ProcessQueue的数据结构,看下其主要的字段

public class ProcessQueue {
    public final static long RebalanceLockMaxLiveTime =
            Long.parseLong(System.getProperty("rocketmq.client.rebalance.lockMaxLiveTime", "30000"));
    public final static long RebalanceLockInterval = Long.parseLong(System.getProperty("rocketmq.client.rebalance.lockInterval", "20000"));
    private final static long PullMaxIdleTime = Long.parseLong(System.getProperty("rocketmq.client.pull.pullMaxIdleTime", "120000"));
    // 用来保存拉取到的消息
    private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<Long, MessageExt>();
    // 当前保存的消息数,放进来的时候会加,移除的时候会减
    private final AtomicLong msgCount = new AtomicLong();
    // 消费锁,主要在顺序消费和移除ProcessQueue的时候使用
    private final Lock lockConsume = new ReentrantLock();
    // 顺序消费的时候使用
    private final TreeMap<Long, MessageExt> msgTreeMapTemp = new TreeMap<Long, MessageExt>();
    // 记录了废弃ProcessQueue的时候lockConsume的次数
    private final AtomicLong tryUnlockTimes = new AtomicLong(0);
    // ProcessQueue中保存的消息里的最大offset,为ConsumeQueue的offset
    private volatile long queueOffsetMax = 0L;
    // 该数据结构里的消息是否废弃
    private volatile boolean dropped = false;
    // 上次执行拉取消息的时间
    private volatile long lastPullTimestamp = System.currentTimeMillis();
    // 上次消费完消息后记录的时间
    private volatile long lastConsumeTimestamp = System.currentTimeMillis();
    
    private volatile boolean locked = false;
    // 上次锁定的时间
    private volatile long lastLockTimestamp = System.currentTimeMillis();
    // 是否正在消息
    private volatile boolean consuming = false;
    // 该参数为调整线程池的时候提供了数据参考
    private volatile long msgAccCnt = 0;
}

isLockExpired

    public boolean isLockExpired() {
        boolean result = (System.currentTimeMillis() - this.lastLockTimestamp) > RebalanceLockMaxLiveTime;
        return result;
    }

顺序消费的时候使用,消费之前会判断一下ProcessQueue锁定时间是否超过阈值(默认30000ms),如果没有超时,代表还是持有锁,具体细节在顺序消费的时候会详细说明.
负载

isPullExpired

    public boolean isPullExpired() {
        boolean result = (System.currentTimeMillis() - this.lastPullTimestamp) > PullMaxIdleTime;
        return result;
    }

在拉取的时候更新lastPullTimestamp的值,然后在rebalance的时候会去判断ProcessQueue已经超过一定的时间没有去拉取消息,如果是的话,则将ProcessQueue废弃(setDropped(true))且从ProcessQueue和MessageQueue的对应关系中移除该ProcessQueue,代码细节如下:

if (pq.isPullExpired()) {
    switch (this.consumeType()) {
        case CONSUME_ACTIVELY:
            break;
        case CONSUME_PASSIVELY:
            pq.setDropped(true);
            if (this.removeUnnecessaryMessageQueue(mq, pq)) {
                it.remove();
                changed = true;
                log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",
                        consumerGroup, mq);
            }
            break;
        default:
            break;
    }
}

根据打的日志推测,这个应该是个BUG,在某种情况下,拉取会停止,导致时间没有更新,这时候重建ProcessQueue,具体是什么原因,这点不太清楚

cleanExpiredMsg

    public void cleanExpiredMsg(DefaultMQPushConsumer pushConsumer) {
        if (pushConsumer.getDefaultMQPushConsumerImpl().isConsumeOrderly()) {// 顺序消费不处理
            return;
        }
        // 最多处理16条消息
        int loop = msgTreeMap.size() < 16 ? msgTreeMap.size() : 16;
        for (int i = 0; i < loop; i++) {
            MessageExt msg = null;
            try {
                this.lockTreeMap.readLock().lockInterruptibly();
                try {
                    // 存在待处理的消息
                    // 且offset最小的消息消费时间大于consumeTimeout() * 60 * 1000(默认15分钟)
                    if (!msgTreeMap.isEmpty() 
                && System.currentTimeMillis() -
                 Long.parseLong(MessageAccessor.getConsumeStartTimeStamp(msgTreeMap.firstEntry().getValue())) 
                > pushConsumer.getConsumeTimeout() * 60 * 1000) {
                        msg = msgTreeMap.firstEntry().getValue();
                    } else {
                        break;
                    }
                } finally {
                    this.lockTreeMap.readLock().unlock();
                }
            } catch (InterruptedException e) {
                log.error("getExpiredMsg exception", e);
            }

            try {
                // 将消息发回Broker,等待重试,且延迟级别为3
                // 该效果是消费失败重试原理类似
                pushConsumer.sendMessageBack(msg, 3);
                try {
                    this.lockTreeMap.writeLock().lockInterruptibly();
                    try {
                        // 如果这个时候,ProcessQueue里offset最小的消息还等于上面取到的消息
                        // 那么就将其移除,有可能在上面取出消息处理的过程中,消息已经被消费,且从ProcessQueue中移除
                        if (!msgTreeMap.isEmpty() && msg.getQueueOffset() == msgTreeMap.firstKey()) {
                            try {
                                msgTreeMap.remove(msgTreeMap.firstKey());
                            } catch (Exception e) {
                                log.error("send expired msg exception", e);
                            }
                        }
                    } finally {
                        this.lockTreeMap.writeLock().unlock();
                    }
                } catch (InterruptedException e) {
                    log.error("getExpiredMsg exception", e);
                }
            } catch (Exception e) {
                log.error("send expired msg exception", e);
            }
        }
    }

上面是并发消费模式下,定时清理消费时间超过15分钟的消息的逻辑,在消费者启动的时候,就好开启一个定时任务定时调用该方法

    public void start() {
        this.CleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                cleanExpireMsg();
            }

        }, this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES);
    }

15分钟执行一次

putMessage

    public boolean putMessage(final List<MessageExt> msgs) {
        // 返回值,顺序消费有用,返回true表示可以消费
        boolean dispatchToConsume = false;
        try {
            this.lockTreeMap.writeLock().lockInterruptibly();
            try {
                int validMsgCnt = 0;
                for (MessageExt msg : msgs) {
                    // 以offset为key,放到treemap中
                    MessageExt old = msgTreeMap.put(msg.getQueueOffset(), msg);
                    if (null == old) {// 正常
                        validMsgCnt++;
                        // 更新当前ProcessQueue中消息最大的offset
                        this.queueOffsetMax = msg.getQueueOffset();
                    }
                }
                // 新增消息数量
                msgCount.addAndGet(validMsgCnt);
                // 如果ProcessQueue有需要处理的消息(从上可知,如果msgs不为空那么msgTreeMap不为空)
                // 如果consuming为false,将其设置为true,表示正在消费
                // 这个值在放消息的时候会设置为true,在顺序消费模式,取不到消息则设置为false
                if (!msgTreeMap.isEmpty() && !this.consuming) {
                   // 有消息,且为未消费状态,则顺序消费模式可以消费
                    dispatchToConsume = true;
                    this.consuming = true;
                }

                if (!msgs.isEmpty()) {
                    MessageExt messageExt = msgs.get(msgs.size() - 1);
                    // property为ConsumeQueue里最大的offset
                    String property = messageExt.getProperty(MessageConst.PROPERTY_MAX_OFFSET);
                    if (property != null) {
                        long accTotal = Long.parseLong(property) - messageExt.getQueueOffset();
                        if (accTotal > 0) {// 当前消息的offset与最大消息的差值,相当于还有多少offset没有消费
                            this.msgAccCnt = accTotal;
                        }
                    }
                }
            } finally {
                this.lockTreeMap.writeLock().unlock();
            }
        } catch (InterruptedException e) {
            log.error("putMessage exception", e);
        }

        return dispatchToConsume;
    }

getMaxSpan

    public long getMaxSpan() {
        try {
            this.lockTreeMap.readLock().lockInterruptibly();
            try {
                if (!this.msgTreeMap.isEmpty()) {
                    return this.msgTreeMap.lastKey() - this.msgTreeMap.firstKey();
                }
            }
            finally {
                this.lockTreeMap.readLock().unlock();
            }
        }
        catch (InterruptedException e) {
            log.error("getMaxSpan exception", e);
        }

        return 0;
    }

获取当前这批消息中最大最小offset之前的差距,这个方法主要在拉取消息的时候,用来判断当前有多少消息未处理,如果大于某个值(默认2000),则进行流控处理

//DefaultMQPushConsumerImpl.pullMessage
        if (!this.consumeOrderly) {
            // 大于2000
            if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
                //  将PullRequest存起来一会再执行(PullRequest是用来发起拉取消息请求的参数载体)
                //PullTimeDelayMillsWhenFlowControl默认为50
                this.executePullRequestLater(pullRequest, PullTimeDelayMillsWhenFlowControl);
                // 流控次数加一,每1000次则打印日志
                if ((flowControlTimes2++ % 1000) == 0) {
                    log.warn(
                            "the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",
                            processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
                            pullRequest, flowControlTimes2);
                }
                return;
            }
        } else {
            //....
        }
    //DefaultMQPushConsumerImpl.executePullRequestLater
     private void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {
        this.mQClientFactory.getPullMessageService().executePullRequestLater(pullRequest, timeDelay);
    }
    //PullMessageService.executePullRequestLater
    public void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {
          // timeDelay毫秒后再执行拉取请求
        this.scheduledExecutorService.schedule(new Runnable() {
            @Override
            public void run() {
                // 将PullRequest放回拉取消息的队列中,这样拉取线程就会取到,马上进行请求
                PullMessageService.this.executePullRequestImmediately(pullRequest);
            }
        }, timeDelay, TimeUnit.MILLISECONDS);
    }

removeMessage

    public long removeMessage(final List<MessageExt> msgs) {
        // 返回给外部的值,代表当前消费进度的offset
        long result = -1;
        final long now = System.currentTimeMillis();
        try {
            this.lockTreeMap.writeLock().lockInterruptibly();
            this.lastConsumeTimestamp = now;
            try {
                if (!msgTreeMap.isEmpty()) {
                    result = this.queueOffsetMax + 1;
                    int removedCnt = 0;
                    // 遍历消息,将其从TreeMap中移除
                    for (MessageExt msg : msgs) {
                        MessageExt prev = msgTreeMap.remove(msg.getQueueOffset());
                        if (prev != null) {// 不为空证明移除成功
                            removedCnt--;// 移除消息数
                        }
                    }
                   // msgCount是ProcessQueue中的消息数量,移除了则需要减去该值,即加上该值的负数
                    msgCount.addAndGet(removedCnt);
                    // 如果还有消息存在,则使用当前最小的offset作为消费进度
                    // 如果已经没有消息了,则使用之前ProcessQueue里最大的offset作为消费进度
                    if (!msgTreeMap.isEmpty()) {
                        result = msgTreeMap.firstKey();
                    }
                }
            } finally {
                this.lockTreeMap.writeLock().unlock();
            }
        } catch (Throwable t) {
        }
        return result;
    }

这里的返回值和消费进度有很大的关系,在后面分析消费进度的时候会再深入分析

takeMessags

    public List<MessageExt> takeMessags(final int batchSize) {
        List<MessageExt> result = new ArrayList<MessageExt>(batchSize);
        final long now = System.currentTimeMillis();
        try {
            this.lockTreeMap.writeLock().lockInterruptibly();
            this.lastConsumeTimestamp = now;
            try {
                if (!this.msgTreeMap.isEmpty()) {
                    // 从treeMap中获取batchSize条数据,每次都返回offset最小的那条并移除
                    for (int i = 0; i < batchSize; i++) {
                        Map.Entry<Long, MessageExt> entry = this.msgTreeMap.pollFirstEntry();
                        if (entry != null) {
                            // 放到返回列表和一个临时用的treemapp中
                            result.add(entry.getValue());
                            msgTreeMapTemp.put(entry.getKey(), entry.getValue());
                        } else {
                            break;
                        }
                    }
                }
                // 取到消息了就会开始进行消费,如果没取到,则不需要消费,那么consuming设为false
                if (result.isEmpty()) {
                    consuming = false;
                }
            } finally {
                this.lockTreeMap.writeLock().unlock();
            }
        } catch (InterruptedException e) {
            log.error("take Messages exception", e);
        }

        return result;
    }

该方法顺序消费模式中使用的,取到该消息后就会调用我们定义的MessageListener进行消费

commit

在顺序消费模式下,调用takeMessages取消息,其内部逻辑中,将treeMap的消息放到一个临时用的treeMap里,然后进行消费,消费完成后需要将这个临时的map清除,则是调用commit方法

    public long commit() {
        try {
            this.lockTreeMap.writeLock().lockInterruptibly();
            try {
                // msgTreeMapTemp是这次消费的消息集合,lastKey代表当前消费的进度
                Long offset = this.msgTreeMapTemp.lastKey();
                // 消费完成,减去该批次的消息数量
                msgCount.addAndGet(this.msgTreeMapTemp.size() * (-1));
                // 清除消息
                this.msgTreeMapTemp.clear();
                if (offset != null) {
                    // 返回消费进度
                    return offset + 1;
                }
            } finally {
                this.lockTreeMap.writeLock().unlock();
            }
        } catch (InterruptedException e) {
            log.error("commit exception", e);
        }

        return -1;
    }

makeMessageToCosumeAgain

当顺序模式返回SUSPEND_CURRENT_QUEUE_A_MOMENT,那么可能会调用该方法,该方法名称意思为:让消息重新消费。
回顾一下上面说的流程:

  1. 取消息:从treeMap里取出消息然后放到临时treeMap,等待消费成功
  2. 消费成功:删除临时treeMap

从上面两部可以猜出,当消费失败的时候,不能无视临时treeMap和treeMap,应该要将临时treeMap的消息放回去,如果不放回去的话,一会重新消费的时候,从treeMap中就取不到原来那批消费失败的数据了,具体逻辑在后面分析

    public void makeMessageToCosumeAgain(List<MessageExt> msgs) {
        try {
            this.lockTreeMap.writeLock().lockInterruptibly();
            try {
                // 将这批没消费成功的消息从临时treeMap中移除
                // 并放回treeMap,等待下次消费
                for (MessageExt msg : msgs) {
                    this.msgTreeMapTemp.remove(msg.getQueueOffset());
                    this.msgTreeMap.put(msg.getQueueOffset(), msg);
                }
            } finally {
                this.lockTreeMap.writeLock().unlock();
            }
        } catch (InterruptedException e) {
            log.error("makeMessageToCosumeAgain exception", e);
        }
    }

isDropped

返回true代表这个ProcessQueue被废弃了,具体出现的原因大概如下:

  1. rebalance之后,原来存在的MessageQueue不在本地新分配的MessageQueue之中,则把对应的ProcessQueue废弃。
    举个栗子:0123这个4个队列,一开始分配给A消费者,这时候启动一个B消费者后,A消费者分配了01这两个队列,那么原来34队列的ProcessQueue就会设置成true
  2. rebalance的时候,会将未订阅的topic下对应的ProcessQueue设置成true
  3. 还有就是上面isPullExpired讨论的情况
  4. 当拉取消息的时候,如果broker返回OFFSET_ILLEGAL,那么这时候将对应的ProcessQueue废弃
  5. consumer关闭(调用shutdown方法)的时候也会废弃

上面就是ProcessQueue提供的一些功能,有很多上层的功能都依赖他的实现,看别的东西前要先了解ProcessQueue,所以上面对ProcessQueue的功能进行了分析,稍微发散了一下,因为涉及的面比较广,所以相关的细节没有展开,待后面文章遇到了再进行分析

ProcessQueue相关的知识点:

  1. 消费模式:顺序、并行
  2. 消费进度管理
  3. rebalance负载均衡
  4. 等等....
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,245评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,749评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,960评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,575评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,668评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,670评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,664评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,422评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,864评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,178评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,340评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,015评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,646评论 3 323
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,265评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,494评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,261评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,206评论 2 352

推荐阅读更多精彩内容

  • consumer 1.启动 有别于其他消息中间件由broker做负载均衡并主动向consumer投递消息,Rock...
    veShi文阅读 4,934评论 0 2
  • 点击查看原文 Web SDK 开发手册 SDK 概述 网易云信 SDK 为 Web 应用提供一个完善的 IM 系统...
    layjoy阅读 13,748评论 0 15
  • 这是感恩的一年!9.4去了新公司工作!虽说思想一直在浮动,却也变得踏实了!明白现有的工资是向往生活的根基!所以想怎...
    小鱼的理想家阅读 176评论 0 0
  • 1.显示表格 2.请求数据js代码 3.PHP代码查询
    哥本哈登_sketch阅读 800评论 0 0
  • 最近不闲。 算算,连续几个周六日都没有好好休息一下了。 参加省直单位技能大赛、与装修新居的设计师数次见面洽谈、先后...
    楚歌儿阅读 257评论 -1 1