RocketMQ源码阅读(九)-broker处理Consumer请求

Broker 怎么响应Consumer请求?
Broker 如何处理已经消费的消息?
Broker 的消息是 at least once还是exactly only once?

1.Broker 怎么响应Consumer请求


原理:
如上图所示,RocketMQ将所有消息都放在CommitLog里面,消费是维护一个ConsumeQueue帮助Consumer消费.pull操作要读两次,先读ConsumeQueue得到offset, 再读CommitLog得到消息内容.
BrokerController.initialize方法中会注册PullMessageProcessor来处理pull message 请求.

/**
* PullMessageProcessor
*/
this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor);
this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);

在PullMessageProcessor.processRequest中又委托给DefaultMessageStore获取.

final GetMessageResult getMessageResult =
    this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
        requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), subscriptionData);

进入DefaultMessageStore.getMessage之后会先通过topic和queueId获取对应的ConsumeQueue, 有了ConsumeQueue就可以操作逻辑队列.

ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);  
public ConsumeQueue findConsumeQueue(String topic, int queueId) {  
    ConcurrentHashMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);  
    if (null == map) {  
        ConcurrentHashMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128);  
        ConcurrentHashMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);  
        if (oldMap != null) {  
            map = oldMap;  
        } else {  
            map = newMap;  
        }  
    }  
  
    ConsumeQueue logic = map.get(queueId);  
    if (null == logic) {  
        ConsumeQueue newLogic = new ConsumeQueue(//  
                topic, //  
                queueId, //  
                StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()), //  
                this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(), //  
                this);  
        ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);  
        if (oldLogic != null) {  
            logic = oldLogic;  
        } else {  
            logic = newLogic;  
        }  
    }  
  
    return logic;  
}

获取到ConsumeQueue之后, 会判断offset的合理性.

minOffset = consumeQueue.getMinOffsetInQuque();  
maxOffset = consumeQueue.getMaxOffsetInQuque();  
  
if (maxOffset == 0) {  
    status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;  
    nextBeginOffset = nextOffsetCorrection(offset, 0);  
} else if (offset < minOffset) {  
    status = GetMessageStatus.OFFSET_TOO_SMALL;  
    nextBeginOffset = nextOffsetCorrection(offset, minOffset);  
} else if (offset == maxOffset) {  
    status = GetMessageStatus.OFFSET_OVERFLOW_ONE;  
    nextBeginOffset = nextOffsetCorrection(offset, offset);  
} else if (offset > maxOffset) {  
    status = GetMessageStatus.OFFSET_OVERFLOW_BADLY;  
    if (0 == minOffset) {  
        nextBeginOffset = nextOffsetCorrection(offset, minOffset);  
    } else {  
        nextBeginOffset = nextOffsetCorrection(offset, maxOffset);  
    }  
} else {  
    //消费位置合法  
}

消费的位置合法, 那么就从consumeQueue中获取SelectMapedBufferResult(封装了bytebuffer).

SelectMapedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset);
//ConsumeQueue.getIndexBuffer
public SelectMapedBufferResult getIndexBuffer(final long startIndex) {  
    int mapedFileSize = this.mapedFileSize;  
    long offset = startIndex * CQStoreUnitSize;  
    if (offset >= this.getMinLogicOffset()) {  
        MapedFile mapedFile = this.mapedFileQueue.findMapedFileByOffset(offset);  
        if (mapedFile != null) {  
            SelectMapedBufferResult result = mapedFile.selectMapedBuffer((int) (offset % mapedFileSize));  
            return result;  
        }  
    }  
    return null;  
}  

逻辑队列中一个数据的大小是20, 所以真正的offset需要乘以20, 然后获取对应的MapedFile.
获取到SelectMapedBufferResult之后, 就可以读取消息了, 代码中是一个循环.

if (bufferConsumeQueue != null) {  
        try {  
            status = GetMessageStatus.NO_MATCHED_MESSAGE;  

            long nextPhyFileStartOffset = Long.MIN_VALUE;  
            long maxPhyOffsetPulling = 0;  

            int i = 0;  
            final int MaxFilterMessageCount = 16000;  
            final boolean diskFallRecorded = this.messageStoreConfig.isDiskFallRecorded();  
            for (; i < bufferConsumeQueue.getSize() && i < MaxFilterMessageCount; i += ConsumeQueue.CQStoreUnitSize) {  
                long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();  
                int sizePy = bufferConsumeQueue.getByteBuffer().getInt();  
                long tagsCode = bufferConsumeQueue.getByteBuffer().getLong();  

                maxPhyOffsetPulling = offsetPy;  

                // 说明物理文件正在被删除  
                if (nextPhyFileStartOffset != Long.MIN_VALUE) {  
                    if (offsetPy < nextPhyFileStartOffset)  
                        continue;  
                }  

                // 判断是否拉磁盘数据  
                boolean isInDisk = checkInDiskByCommitOffset(offsetPy, maxOffsetPy);  
                // 此批消息达到上限了  
                if (this.isTheBatchFull(sizePy, maxMsgNums, getResult.getBufferTotalSize(), getResult.getMessageCount(),  
                        isInDisk)) {  
                    break;  
                }  

                // 消息过滤  
                if (this.messageFilter.isMessageMatched(subscriptionData, tagsCode)) {  
                    SelectMapedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);  
                    if (selectResult != null) {  
                        this.storeStatsService.getGetMessageTransferedMsgCount().incrementAndGet();  
                        getResult.addMessage(selectResult);  
                        status = GetMessageStatus.FOUND;  
                        nextPhyFileStartOffset = Long.MIN_VALUE;  
                    } else {  
                        if (getResult.getBufferTotalSize() == 0) {  
                            status = GetMessageStatus.MESSAGE_WAS_REMOVING;  
                        }  

                        // 物理文件正在被删除, 尝试跳过  
                        nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy);  
                    }  
                } else {  
                    if (getResult.getBufferTotalSize() == 0) {  
                        status = GetMessageStatus.NO_MATCHED_MESSAGE;  
                    }  

                    if (log.isDebugEnabled()) {  
                        log.debug("message type not matched, client: " + subscriptionData + " server: " + tagsCode);  
                    }  
                }  
            }  

            if (diskFallRecorded) {  
                long fallBehind = maxOffsetPy - maxPhyOffsetPulling;  
                brokerStatsManager.recordDiskFallBehindSize(group, topic, queueId, fallBehind);  
            }  
            nextBeginOffset = offset + (i / ConsumeQueue.CQStoreUnitSize);  
            long diff = maxOffsetPy - maxPhyOffsetPulling;  
            long memory = (long) (StoreUtil.TotalPhysicalMemorySize  
                    * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));  
            getResult.setSuggestPullingFromSlave(diff > memory);  
        } finally {  

            bufferConsumeQueue.release();  
        }  
    } else {  
        status = GetMessageStatus.OFFSET_FOUND_NULL;  
        nextBeginOffset = nextOffsetCorrection(offset, consumeQueue.rollNextFile(offset));  
        log.warn("consumer request topic: " + topic + "offset: " + offset + " minOffset: " + minOffset + " maxOffset: "  
                + maxOffset + ", but access logic queue failed.");  
    }  
}  

逻辑队列是大小为20, 每次+20. 关于MappedFile的操作可以参考RocketMQ源码阅读(四)-消息存储二,

根据consumequeue的结构, 可以知道由3部分组成, 那么依次取出来commitlog offset, size, tag然后根据拿tag和SubscriptionData进行比对, 进行过滤消息消息匹配之后, 有了offset和size, 则可以调用 commitlog 的getMessage 方法获取 SelectMapedBufferResult (获取过程和上面说的getIndexBuffer类似), 然后将 SelectMapedBufferResult 添加到GetMessageResult 中.
退出循环后, 会计算下次的offset, 放到GetMessageResult中返回.
当pullMessageProcessor获取到GetMessageResult, 然后将消息体, 下次访问的offset等设置到response中返回.

2. Broker 如何处理已经消费的消息

由于RocketMQ操作CommitLog, ConsumeQueue文件, 都是基于内存映射方法并在启动的时候, 会加载commitlog, ConsumeQueue目录下的所有文件, 为了避免内存与磁盘的浪费, 不可能将消息永久存储在消息服务器上, 所以需要一种机制来删除已过期的文件. RocketMQ顺序写Commitlog、ConsumeQueue文件, 所有写操作全部落在最后一个CommitLog或ConsumeQueue文件上, 之前的文件在下一个文件创建后, 将不会再被更新,RocketMQ清除过期文件的方法是: 如果非当前写文件在一定时间间隔内没有再次被更新, 则认为是过期文件, 可以被删除, RocketMQ不会管这个这个文件上的消息是否被全部消费. 默认每个文件的过期时间为72小时. 通过在Broker配置文件中设置fileReservedTime来改变过期时间, 单位为小时. 接下来详细分析RocketMQ是如何设计与实现上述机制的.
Broker在启动时加入一个定时任务.

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        DefaultMessageStore.this.cleanFilesPeriodically();
    }
}, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS);

RocketMQ会每隔10s调度一次cleanFilesPeriodically, 检测是否需要清除过期文件. 执行频率可以通过设置cleanResourceInterval, 默认为10s.
DefaultMessageStore#cleanFilesPeriodically

private void cleanFilesPeriodically() {
    this.cleanCommitLogService.run();
    this.cleanConsumeQueueService.run();
}

主要清除CommitLog、ConsumeQueue的过期文件.
CommitLog与ConsumeQueue对于过期文件的删除算法、逻辑大同小异, 本文将以CommitLog过期文件为例来详细分析其实现原理.

DefaultMessageStore$CleanCommitLogService#run

public void run() {
    try {
        this.deleteExpiredFiles();
        this.redeleteHangedFile();
    } catch (Throwable e) {
        DefaultMessageStore.log.warn(this.getServiceName() + " service has 
                exception. ", e);
    }
}

整个执行过程分为两个大的步骤, 第一个步骤: 尝试删除过期文件;第二个步骤: 重试删除被hange(由于被其他线程引用在第一阶段未删除的文件), 在这里再重试一次.
DefaultMessageStore$CleanCommitLogService#deleteExpiredFiles

long fileReservedTime = DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime();
int deletePhysicFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval();
int destroyMapedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();

Step1: 解释一下这个三个配置属性的含义.
fileReservedTime: 文件保留时间, 也就是从最后一次更新时间到现在, 如果超过了该时间, 则认为是过期文件, 可以被删除.
deletePhysicFilesInterval: 删除物理文件的间隔, 因为在一次清除过程中, 可能需要删除的文件不止一个, 该值指定两次删除文件的间隔时间.
destroyMapedFileIntervalForcibly: 在清除过期文件时, 如果该文件被其他线程所占用(引用次数大于0, 比如读取消息), 此时会阻止此次删除任务,
同时在第一次试图删除该文件时记录当前时间戳, destroyMapedFileIntervalForcibly 表示第一次拒绝删除之后能保留的最大时间, 在此时间内, 同样可以被拒绝删除, 同时会将引用减少1000个, 超过该时间间隔后, 文件将被强制删除.
DefaultMessageStore$CleanCommitLogService#deleteExpiredFiles:

boolean timeup = this.isTimeToDelete();
boolean spacefull = this.isSpaceToDelete();
boolean manualDelete = this.manualDeleteFileSeveralTimes > 0;
if (timeup || spacefull || manualDelete) {
    //继续执行删除逻辑
   return;
} else {
   // 本次删除任务无作为. 
}

Step2: RocketMQ在如下三种情况任意满足之一的情况下将继续执行删除文件操作.
1)到了删除文件的时间点, RocketMQ通过deleteWhen设置一天的固定时间执行一次删除过期文件操作, 默认为凌晨4点.
2)判断磁盘空间是否充足, 如果不充足, 则返回true, 表示应该触发过期文件删除操作.
3)预留, 手工触发, 可以通过调用excuteDeleteFilesManualy方法手工触发过期文件删除, 目前RocketMQ暂未封装手工触发文件删除的命令.
重点分析一下磁盘不足的判断依据.
DefaultMessageStore$CleanCommitLogService#isSpaceToDelete

private boolean isSpaceToDelete() {
    double ratio = DefaultMessageStore.this.getMessageStoreConfig().getDiskMaxUsedSpaceRatio() / 100.0;  //@1

    cleanImmediately = false;

    {
        String storePathPhysic = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog();    //@2
        double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);   //@3
        if (physicRatio > diskSpaceWarningLevelRatio) {
            boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull();
            if (diskok) {
                DefaultMessageStore.log.error("physic disk maybe full soon " + physicRatio + ", so mark disk full");
            }

            cleanImmediately = true;
        } else if (physicRatio > diskSpaceCleanForciblyRatio) {
            cleanImmediately = true;
        } else {
            boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK();
            if (!diskok) {
                DefaultMessageStore.log.info("physic disk space OK " + physicRatio + ", so mark disk ok");
            }
        }

        if (physicRatio < 0 || physicRatio > ratio) {
            DefaultMessageStore.log.info("physic disk maybe full soon, so reclaim space, " + physicRatio);
            return true;
        }
    }

    {
        String storePathLogics = StorePathConfigHelper
            .getStorePathConsumeQueue(DefaultMessageStore.this.getMessageStoreConfig().getStorePathRootDir());
        double logicsRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogics);
        if (logicsRatio > diskSpaceWarningLevelRatio) {
            boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull();
            if (diskok) {
                DefaultMessageStore.log.error("logics disk maybe full soon " + logicsRatio + ", so mark disk full");
            }

            cleanImmediately = true;
        } else if (logicsRatio > diskSpaceCleanForciblyRatio) {
            cleanImmediately = true;
        } else {
            boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK();
            if (!diskok) {
                DefaultMessageStore.log.info("logics disk space OK " + logicsRatio + ", so mark disk ok");
            }
        }

        if (logicsRatio < 0 || logicsRatio > ratio) {
            DefaultMessageStore.log.info("logics disk maybe full soon, so reclaim space, " + logicsRatio);
            return true;
        }
    }

    return false;
}

代码@1: 获取maxUsedSpaceRatio, 表示commitlog、consumequeue文件所在磁盘分区的最大使用量, 如果超过该值, 则需要立即清除过期文件.
代码@2: 通过File#getTotalSpace()获取commitlog所在磁盘分区总的存储容量, 通过File#getFreeSpace()获取commitlog目录所在磁盘文件剩余容量并得出当前该分区的物理磁盘使用率physicRatio .
代码@3: RocketMQ另外提供了两个与磁盘空间使用率相关的系统级参数:
-Drocketmq.broker.diskSpaceWarningLevelRatio=0.90: 如果磁盘分区使用率超过该阔值, 将设置磁盘不可写, 此时会拒绝新消息的写入.
-Drocketmq.broker.diskSpaceCleanForciblyRatio=0.85: 如果磁盘分区使用超过该阔值, 建议立即执行过期文件清除, 但不会拒绝新消息的写入.
判断磁盘是否可用, 用当前已使用物理磁盘率maxUsedSpaceRatio、diskSpaceWarningLevelRatio、diskSpaceCleanForciblyRatio, 如果当前磁盘使用率达到上述阔值, 将返回true表示磁盘已满, 需要进行过期文件删除操作.
Step3: 然后根据文件的最后一次更新时间与当前时间做比较, 判断是否过期, 如果已过期, 调用MappedFile的destory.

关于ConsumeQueue的过期文件删除机制与Commitlog文件机制类似, 本文就不重复讲解.

Broker 的消息是 at least once还是exactly only once?

at least once:

  • 是指每个消息必须投递一次,RocketMQ Consumer 先 pull 消息到本地, 消费完成后, 才向服务器返回 ack, 如果没有消费一定不会ack消息, 所以RocketMQ可以很好的支持此特性.

exactly only once:

  • (1). 发送消息阶段, 不允许发送重复的消息.
  • (2). 消费消息阶段, 不允许消费重复的消息. 只有以上两个条件都满足情况下, 才能认为消息是"Exactly Only Once", 而要实现以上两点, 在分布式系统环境下, 不可避免要产生巨大的开销. 所以 RocketMQ 为了追求高性能, 并不保证此特性, 要求在业务上进行去重, 也就是说消费消息要做到幂等性. RocketMQ 虽然不能严格保证不重复, 但是正常情况下很少会出现重复发送、消 费情况, 只有网络异常, Consumer 启停等异常情况下会出现消息重复. 此问题的本质原因是网络调用存在不确定性, 即既不成功也不失败的第三种状态, 所以才产生了消息重复性问题.

参考:
https://segmentfault.com/a/1190000011042216
https://blog.csdn.net/prestigeding/article/details/79482339

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,445评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,889评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,047评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,760评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,745评论 5 367
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,638评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,011评论 3 398
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,669评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,923评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,655评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,740评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,406评论 4 320
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,995评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,961评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,197评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,023评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,483评论 2 342

推荐阅读更多精彩内容