RocketMQ源码梳理

整体流程

image.png

生产端

image.png

补充说明
● producer首先访问namesvr获取路由信息,namesvr存储Topic维度的所有路由信息(包括每个topic在每个Broker的队列分布情况)。
● producer解析路由信息生成本地的路由信息,解析Topic在Broker队列信息并转化为本地的消息生产的路由信息。
● producer根据本地路由信息向Broker发送消息,选择本地路由中具体的Broker进行消息发送。
● producer通过Netty和Broker进行通信(即NettyRemotingClient)
路由信息TopicPublishInfo(一个Topic包含多个MessageQueue队列,MessageQueue保存最细粒度的队列信息),TopicRouteData核心变量QueueData保存每个Broker的队列信息,BrokerData保存Broker的地址信息。

public class TopicPublishInfo {
    private boolean orderTopic = false;
    private boolean haveTopicRouterInfo = false;
    // 最细粒度的队列信息
    private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
    private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
    private TopicRouteData topicRouteData;
}

public class MessageQueue implements Comparable<MessageQueue>, Serializable {
    private static final long serialVersionUID = 6191200464116433425L;
    // Topic信息
    private String topic;
    // 所属的brokerName信息
    private String brokerName;
    // Topic下的队列信息Id
    private int queueId;
}

public class TopicRouteData extends RemotingSerializable {
    private String orderTopicConf;
    // 按照broker维度保存的Queue信息
    private List<QueueData> queueDatas;
    // 按照broker维度保存的broker信息
    private List<BrokerData> brokerDatas;
    private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
}


public class QueueData implements Comparable<QueueData> {
    // broker的名称
    private String brokerName;
    // 读队列大小
    private int readQueueNums;
    // 写队列大小
    private int writeQueueNums;
    // 读写权限
    private int perm;
    private int topicSynFlag;
}


public class BrokerData implements Comparable<BrokerData> {
    // broker所属集群信息
    private String cluster;
    // broker的名称
    private String brokerName;
    // broker对应的ip地址信息
    private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;
    private final Random random = new Random();
}

路由同步过程

路由同步主要负责查询指定的Topic对应的路由信息,比较路由数据topicRouteData是否发生变更,最终解析路由信息转化为生产者的路由信息和消费者的路由信息。

public class MQClientInstance {
 
    public boolean updateTopicRouteInfoFromNameServer(final String topic) {
        return updateTopicRouteInfoFromNameServer(topic, false, null);
    }
 
 
    public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
        DefaultMQProducer defaultMQProducer) {
        try {
            if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
                try {
                    TopicRouteData topicRouteData;
                    if (isDefault && defaultMQProducer != null) {
                        // 省略对应的代码
                    } else {
                        // 1、负责查询指定的Topic对应的路由信息
                        topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
                    }
 
                    if (topicRouteData != null) {
                        // 2、比较路由数据topicRouteData是否发生变更
                        TopicRouteData old = this.topicRouteTable.get(topic);
                        boolean changed = topicRouteDataIsChange(old, topicRouteData);
                        if (!changed) {
                            changed = this.isNeedUpdateTopicRouteInfo(topic);
                        }
                        // 3、解析路由信息转化为生产者的路由信息和消费者的路由信息
                        if (changed) {
                            TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
 
                            for (BrokerData bd : topicRouteData.getBrokerDatas()) {
                                this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
                            }
 
                            // 生成生产者对应的Topic信息
                            {
                                TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
                                publishInfo.setHaveTopicRouterInfo(true);
                                Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
                                while (it.hasNext()) {
                                    Entry<String, MQProducerInner> entry = it.next();
                                    MQProducerInner impl = entry.getValue();
                                    if (impl != null) {
                                        impl.updateTopicPublishInfo(topic, publishInfo);
                                    }
                                }
                            }
                            // 保存到本地生产者路由表当中
                            this.topicRouteTable.put(topic, cloneTopicRouteData);
                            return true;
                        }
                    }
                } finally {
                    this.lockNamesrv.unlock();
                }
            } else {
            }
        } catch (InterruptedException e) {
        }
 
        return false;
    }
}

判断路由信息是否变更

public class MQClientInstance {
 
    public static TopicPublishInfo topicRouteData2TopicPublishInfo(final String topic, final TopicRouteData route) {
 
        TopicPublishInfo info = new TopicPublishInfo();
 
        info.setTopicRouteData(route);
        if (route.getOrderTopicConf() != null && route.getOrderTopicConf().length() > 0) {
          // 省略相关代码
        } else {
 
            List<QueueData> qds = route.getQueueDatas();
 
            // 按照brokerName进行排序
            Collections.sort(qds);
 
            // 遍历所有broker生成队列维度信息
            for (QueueData qd : qds) {
                // 具备写能力的QueueData能够用于队列生成
                if (PermName.isWriteable(qd.getPerm())) {
                    // 遍历获得指定brokerData进行异常条件过滤
                    BrokerData brokerData = null;
                    for (BrokerData bd : route.getBrokerDatas()) {
                        if (bd.getBrokerName().equals(qd.getBrokerName())) {
                            brokerData = bd;
                            break;
                        }
                    }
                    if (null == brokerData) {
                        continue;
                    }
                    if (!brokerData.getBrokerAddrs().containsKey(MixAll.MASTER_ID)) {
                        continue;
                    }
 
                    // 遍历QueueData的写队列的数量大小,生成MessageQueue保存指定TopicPublishInfo
                    for (int i = 0; i < qd.getWriteQueueNums(); i++) {
                        MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
                        info.getMessageQueueList().add(mq);
                    }
                }
            }
 
            info.setOrderTopic(false);
        }
 
        return info;
    }
}

发送消息

调用producer.send发送消息时,程序会使用RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE)把消息封装为自定义的通信协议RemotingCommand, 之后NettyRemotingClient会找到Broker地址并建立连接生成Channel对象调用writeAndFlush方法将请求(RemotingCommand)发送到Netty服务器.
执行流程DefaultMQProducerImpl.send()->sendKernelImpl()->mQClientFactory.getMQClientAPIImpl().sendMessage()->RemotingCommand.createRequestCommand()->NettyRemotingClient.invokeSync()->NettyRemotingAbstract.invoke0()


image.png
public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP);
        producer.start();
        Message msg = new Message(TOPIC /* Topic */,
                TAG /* Tag */,
                ("Hello RocketMQ ").getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
        );
        SendResult sendResult = producer.send(msg);
    }
}

public abstract class NettyRemotingAbstract {
    public RemotingCommand invokeSyncImpl(Channel channel, RemotingCommand request) {
        ...
        try {
            ...
            // 向服务端Broker通道发送消息
            channel.writeAndFlush(request).addListener((ChannelFutureListener) f -> {
                ...
            });
            RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
            ...
            return responseCommand;
        } 
        ...
    }
}

public class NettyRemotingClient extends NettyRemotingAbstract implements RemotingClient {

@Override
    public void start() {
            Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
            .option(ChannelOption.TCP_NODELAY, true)
            .option(ChannelOption.SO_KEEPALIVE, false)
            .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
            .handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ChannelPipeline pipeline = ch.pipeline();
                    if (nettyClientConfig.isUseTLS()) {
                        if (null != sslContext) {
                            pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));
                            LOGGER.info("Prepend SSL handler");
                        } else {
                            LOGGER.warn("Connections are insecure as SSLContext is null!");
                        }
                    }
                    ch.pipeline().addLast(
                        nettyClientConfig.isDisableNettyWorkerGroup() ? null : defaultEventExecutorGroup,
                        new NettyEncoder(),
                        new NettyDecoder(),
                        new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
                        new NettyConnectManageHandler(),
                        new NettyClientHandler());
                }
            });
    }
  }

生产端队列选择

● MessageQueue的选择默认按照轮询进行选择,通过全局维护索引进行累加取模选择发送队列。
● MessageQueue的选择过程中会避开上一次发送失败Broker对应的MessageQueue。

public class DefaultMQProducerImpl implements MQProducerInner {
 
    private SendResult sendDefaultImpl(
        Message msg,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final long timeout
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        
        // 1、查询消息发送的TopicPublishInfo信息
        TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
 
        if (topicPublishInfo != null && topicPublishInfo.ok()) {
             
            String[] brokersSent = new String[timesTotal];
            // 根据重试次数进行消息发送
            for (; times < timesTotal; times++) {
                // 记录上次发送失败的brokerName
                String lastBrokerName = null == mq ? null : mq.getBrokerName();
                // 2、从TopicPublishInfo获取发送消息的队列
                MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
                if (mqSelected != null) {
                    mq = mqSelected;
                    brokersSent[times] = mq.getBrokerName();
                    try {
                        // 3、执行发送并判断发送结果,如果发送失败根据重试次数选择消息队列进行重新发送
                        sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
                        }
                    } catch (MQBrokerException e) {
                        // 省略相关代码
                    } catch (InterruptedException e) {
                        // 省略相关代码
                    }
                } else {
                    break;
                }
            }
        }
    }
}

public class TopicPublishInfo {
 
    public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
        if (lastBrokerName == null) {
            return selectOneMessageQueue();
        } else {
            // 按照轮询进行选择发送的MessageQueue
            for (int i = 0; i < this.messageQueueList.size(); i++) {
                int index = this.sendWhichQueue.getAndIncrement();
                int pos = Math.abs(index) % this.messageQueueList.size();
                if (pos < 0)
                    pos = 0;
                MessageQueue mq = this.messageQueueList.get(pos);
                // 避开上一次上一次发送失败的MessageQueue
                if (!mq.getBrokerName().equals(lastBrokerName)) {
                    return mq;
                }
            }
            return selectOneMessageQueue();
        }
    }
}

其他选择算法(MessageQueueSelector)
● SelectMessageQueueByHash:根据请求参数hash选择队列
● SelectMessageQueueByRandom:随机
● SelectMessageQueueByMachineRoom:根据机器随机


Broker端

image.png

业务接入层:RocketMQ 基于 Netty 的 Reactor 多线程模型实现了底层通信。Reactor 主线程池 eventLoopGroupBoss 负责创建 TCP 连接,默认只有一个线程。连接建立后,再丢给 Reactor 子线程池 eventLoopGroupSelector 进行读写事件的处理。
defaultEventExecutorGroup 负责 SSL 验证、编解码、空闲检查、网络连接管理。然后根据 RomotingCommand 的业务请求码 code 去 processorTable 这个本地缓存变量中找到对应的 processor,封装成 task 任务后,提交给对应的业务 processor 处理线程池来执行。Broker 模块通过这四级线程池提升系统吞吐量。
业务处理层:处理各种通过 RPC 调用过来的业务请求,其中:
● SendMessageProcessor 负责处理 Producer 发送消息的请求;
● PullMessageProcessor 负责处理 Consumer 消费消息的请求;
● QueryMessageProcessor 负责处理按照消息 Key 等查询消息的请求。
存储逻辑层:DefaultMessageStore 是 RocketMQ 的存储逻辑核心类,提供消息存储、读取、删除等能力。
文件映射层:把 Commitlog、ConsumeQueue、IndexFile 文件映射为存储对象 MappedFile。
数据传输层:支持基于 mmap 内存映射进行读写消息,同时也支持基于 mmap 进行读取消息、堆外内存写入消息的方式进行读写消息。

初始化

image.png

● 注册各种消息处理器,SendMessageProcessor(),PullMessageProcessor(),QueryMessageProcessor()
● 获取Namsever地址,保持心跳
● HA处理,master/slave的数据同步

接收消息

image.png

补充说明


image.png

CommitLog数据写入

最底层消息写入核心代码在 CommitLog 的 asyncPutMessage 方法中,主要分为获取 MappedFile、往缓冲区写消息、提交刷盘请求三步。需要注意的是在这三步前后有自旋锁或 ReentrantLock 的加锁、释放锁,保证单个 Broker 写消息是串行的。

//org.apache.rocketmq.store.CommitLog::asyncPutMessage
public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
        ...
        putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
        try {
            //获取最新的 MappedFile
            MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
            ...
            //向缓冲区写消息
            result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
            ...
            //提交刷盘请求
            CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);
            ...
        } finally {
            putMessageLock.unlock();
        }
        ...
    }

MappedFile 初始化

在 Broker 初始化时会启动管理 MappedFile 创建的 AllocateMappedFileService 异步线程(BrokerController.initializeMessageStore())。消息处理线程 和 AllocateMappedFileService 线程通过队列 requestQueue 关联。
消息写入时调用 AllocateMappedFileService 的 putRequestAndReturnMappedFile 方法往 requestQueue 放入提交创建 MappedFile 请求,这边会同时构建两个 AllocateRequest 放入队列。
AllocateMappedFileService 线程循环从 requestQueue 获取 AllocateRequest 来创建 MappedFile。消息处理线程通过 CountDownLatch 等待获取第一个 MappedFile 创建成功就返回。
当消息处理线程需要再次创建 MappedFile 时,此时可以直接获取之前已预创建的 MappedFile。这样通过预创建 MappedFile ,减少文件创建等待时间。

//org.apache.rocketmq.store.AllocateMappedFileService::putRequestAndReturnMappedFile
public MappedFile putRequestAndReturnMappedFile(String nextFilePath, String nextNextFilePath, int fileSize) {
    //请求创建 MappedFile
    AllocateRequest nextReq = new AllocateRequest(nextFilePath, fileSize);
    boolean nextPutOK = this.requestTable.putIfAbsent(nextFilePath, nextReq) == null;
    ...
    //请求预先创建下一个 MappedFile
    AllocateRequest nextNextReq = new AllocateRequest(nextNextFilePath, fileSize);
    boolean nextNextPutOK = this.requestTable.putIfAbsent(nextNextFilePath, nextNextReq) == null;
    ...
    //获取本次创建 MappedFile
    AllocateRequest result = this.requestTable.get(nextFilePath);
    ...
}
 
//org.apache.rocketmq.store.AllocateMappedFileService::run
public void run() {
    ..
    while (!this.isStopped() && this.mmapOperation()) {
    }
    ...
}
 
//org.apache.rocketmq.store.AllocateMappedFileService::mmapOperation
private boolean mmapOperation() {
    ...
    //从队列获取 AllocateRequest
    req = this.requestQueue.take();
    ...
    //判断是否开启堆外内存池
    if (messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
        //开启堆外内存的 MappedFile
        mappedFile = ServiceLoader.load(MappedFile.class).iterator().next();
        mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
    } else {
        //普通 MappedFile
        mappedFile = new MappedFile(req.getFilePath(), req.getFileSize());
    }
    ...
    //MappedFile 预热
    if (mappedFile.getFileSize() >= this.messageStore.getMessageStoreConfig()
        .getMappedFileSizeCommitLog()
        &&
        this.messageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
        mappedFile.warmMappedFile(this.messageStore.getMessageStoreConfig().getFlushDiskType(),
            this.messageStore.getMessageStoreConfig().getFlushLeastPagesWhenWarmMapedFile());
    }
    req.setMappedFile(mappedFile);
    ...
}

每次新建普通 MappedFile 请求,都会创建 mappedByteBuffer,下面代码展示了 Java mmap 是如何实现的。

//org.apache.rocketmq.store.MappedFile::init
private void init(final String fileName, final int fileSize) throws IOException {
    ...
    this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
    this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
    ...
}

如果开启堆外内存,即 transientStorePoolEnable = true 时,mappedByteBuffer 只是用来读消息,堆外内存用来写消息,从而实现对于消息的读写分离。堆外内存对象不是每次新建 MappedFile 都需要创建,而是系统启动时根据堆外内存池大小就初始化好了。每个堆外内存 DirectByteBuffer 都与 CommitLog 文件大小相同,通过锁定住该堆外内存,确保不会被置换到虚拟内存中去。

//org.apache.rocketmq.store.TransientStorePool
public void init() {
    for (int i = 0; i < poolSize; i++) {
        //分配与 CommitLog 文件大小相同的堆外内存
        ByteBuffer byteBuffer = ByteBuffer.allocateDirect(fileSize);
        final long address = ((DirectBuffer) byteBuffer).address();
        Pointer pointer = new Pointer(address);
        //锁定堆外内存,确保不会被置换到虚拟内存中去
        LibC.INSTANCE.mlock(pointer, new NativeLong(fileSize));
        availableBuffers.offer(byteBuffer);
    }
}

上面的 mmapOperation 方法中有段 MappedFile 预热逻辑。为什么需要文件预热呢?文件预热怎么做的呢?
因为通过 mmap 映射,只是建立了进程虚拟内存地址与物理内存地址之间的映射关系,并没有将 Page Cache 加载至内存。读写数据时如果没有命中写 Page Cache 则发生缺页中断,从磁盘重新加载数据至内存,这样会影响读写性能。为了防止缺页异常,阻止操作系统将相关的内存页调度到交换空间(swap space),RocketMQ 通过对文件预热,文件预热实现如下。

//org.apache.rocketmq.store.MappedFile::warmMappedFile
public void warmMappedFile(FlushDiskType type, int pages) {
        ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
        int flush = 0;
        //通过写入 1G 的字节 0 来让操作系统分配物理内存空间,如果没有填充值,操作系统不会实际分配物理内存,防止在写入消息时发生缺页异常
        for (int i = 0, j = 0; i < this.fileSize; i += MappedFile.OS_PAGE_SIZE, j++) {
            byteBuffer.put(i, (byte) 0);
            // force flush when flush disk type is sync
            if (type == FlushDiskType.SYNC_FLUSH) {
                if ((i / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE) >= pages) {
                    flush = i;
                    mappedByteBuffer.force();
                }
            }
 
            //prevent gc
            if (j % 1000 == 0) {
                Thread.sleep(0);
            }
        }
 
        //force flush when prepare load finished
        if (type == FlushDiskType.SYNC_FLUSH) {
            mappedByteBuffer.force();
        }
        ...
        this.mlock();
}
 
//org.apache.rocketmq.store.MappedFile::mlock
public void mlock() {
    final long beginTime = System.currentTimeMillis();
    final long address = ((DirectBuffer) (this.mappedByteBuffer)).address();
    Pointer pointer = new Pointer(address);
 
    //通过系统调用 mlock 锁定该文件的 Page Cache,防止其被交换到 swap 空间
    int ret = LibC.INSTANCE.mlock(pointer, new NativeLong(this.fileSize));
 
    //通过系统调用 madvise 给操作系统建议,说明该文件在不久的将来要被访问
    int ret = LibC.INSTANCE.madvise(pointer, new NativeLong(this.fileSize), LibC.MADV_WILLNEED);
}

综上所述,RocketMQ 每次都预创建一个文件来减少文件创建延迟,通过文件预热避免了读写时缺页异常。

MappedFile写入

注意消息最终写入 ByteBuffer,还没有持久到磁盘

//org.apache.rocketmq.store.MappedFile::appendMessagesInner
public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb,
        PutMessageContext putMessageContext) {
    //判断使用 DirectBuffer 还是 MappedByteBuffer 进行写操作
    ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
    ..
    byteBuffer.position(currentPos);
    AppendMessageResult result  = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,
                    (MessageExtBrokerInner) messageExt, putMessageContext);
    ..
    return result;
}
 
//org.apache.rocketmq.store.CommitLog::doAppend
public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
    final MessageExtBrokerInner msgInner, PutMessageContext putMessageContext) {
    ...
    ByteBuffer preEncodeBuffer = msgInner.getEncodedBuff();
    ...
    //这边只是将消息写入缓冲区,还未实际刷盘
    byteBuffer.put(preEncodeBuffer);
    msgInner.setEncodedBuff(null);
    ...
    return result;
}

ConsumeQueue 和 IndexFile 写入

MessageStore 在初始化的时候,会启动一个 ReputMessageService 异步线程,它启动后便会在循环中不断调用 doReput 方法,用来通知 ConsumeQueue 和 IndexFile 进行更新。ConsumeQueue 和 IndexFile 之所以可以异步更新是因为 CommitLog 中保存了恢复 ConsumeQueue 和 IndexFile 所需队列和 Topic 等信息,即使 Broker 服务异常宕机,Broker 重启后可以根据 CommitLog 恢复 ConsumeQueue 和IndexFile。

//org.apache.rocketmq.store.DefaultMessageStore.ReputMessageService::run
public void run() {
    ...
    while (!this.isStopped()) {
        Thread.sleep(1);
         this.doReput();
    }
    ...
}
 
//org.apache.rocketmq.store.DefaultMessageStore.ReputMessageService::doReput
private void doReput() {
    ...
    //获取CommitLog中存储的新消息
    DispatchRequest dispatchRequest =
        DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
    int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();
 
    if (dispatchRequest.isSuccess()) {
        if (size > 0) {
            //如果有新消息,则分别调用 CommitLogDispatcherBuildConsumeQueue、CommitLogDispatcherBuildIndex 进行构建 ConsumeQueue 和 IndexFile
            DefaultMessageStore.this.doDispatch(dispatchRequest);
    }
    ...
}

ComsumeQueue持久化

//org.apache.rocketmq.store.ConsumeQueue::putMessagePositionInfo
private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,
    final long cqOffset) {
    ...
    this.byteBufferIndex.flip();
    this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
    this.byteBufferIndex.putLong(offset);
    this.byteBufferIndex.putInt(size);
    this.byteBufferIndex.putLong(tagsCode);
 
    final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;
 
    MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
    if (mappedFile != null) {
        ...
        return mappedFile.appendMessage(this.byteBufferIndex.array());
    }
}

IndexFile写入

indexFile详细说明见RocketMQ基础概念篇

//org.apache.rocketmq.store.index.IndexFile::putKey
public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
    int keyHash = indexKeyHashMethod(key);
    int slotPos = keyHash % this.hashSlotNum;
    int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;
    ...
    //从 Slot Table 获取当前最新消息位置
    int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
    ...
    int absIndexPos =
        IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
            + this.indexHeader.getIndexCount() * indexSize;
 
    this.mappedByteBuffer.putInt(absIndexPos, keyHash);
    this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
    this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
    //存放之前链表头 IndexItem 位置
    this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);
    //更新 Slot Table 中 hash 槽的值为最新消息位置
    this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());
 
    if (this.indexHeader.getIndexCount() <= 1) {
        this.indexHeader.setBeginPhyOffset(phyOffset);
        this.indexHeader.setBeginTimestamp(storeTimestamp);
    }
 
    if (invalidIndex == slotValue) {
        this.indexHeader.incHashSlotCount();
    }
    this.indexHeader.incIndexCount();
    this.indexHeader.setEndPhyOffset(phyOffset);
    this.indexHeader.setEndTimestamp(storeTimestamp);
 
    return true;
    ...
}

消息刷盘

(1) 同步刷盘: 只有在消息真正持久化至磁盘后 RocketMQ 的 Broker 端才会真正返回给 Producer 端一个成功的 ACK 响应。同步刷盘对 MQ 消息可靠性来说是一种不错的保障,但是性能上会有较大影响,一般金融业务使用该模式较多。
(2) 异步刷盘: 能够充分利用 OS 的 Page Cache 的优势,只要消息写入 Page Cache 即可将成功的 ACK 返回给 Producer 端。消息刷盘采用后台异步线程提交的方式进行,降低了读写延迟,提高了 MQ 的性能和吞吐量。异步刷盘包含开启堆外内存和未开启堆外内存两种方式。
在 CommitLog 中提交刷盘请求时,会根据当前 Broker 相关配置决定是同步刷盘还是异步刷盘。

//org.apache.rocketmq.store.CommitLog::submitFlushRequest
public CompletableFuture<PutMessageStatus> submitFlushRequest(AppendMessageResult result, MessageExt messageExt) {
    //同步刷盘
    if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
        final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
        if (messageExt.isWaitStoreMsgOK()) {
            GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(),
                    this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
            service.putRequest(request);
            return request.future();
        } else {
            service.wakeup();
            return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
        }
    }
    //异步刷盘
    else {
        if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
            flushCommitLogService.wakeup();
        } else  {
            //开启堆外内存的异步刷盘
            commitLogService.wakeup();
        }
        return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
    }
}

GroupCommitService、FlushRealTimeService、CommitRealTimeService 三者继承关系如图;


image.png
刷盘类型 方法 说明 示例
GroupCommitService GroupCommitService::run 消息写入到 Page Cache 后通过 GroupCommitService 同步刷盘,消息处理线程阻塞等待刷盘结果。
image.png
FlushRealTimeService FlushRealTimeService::run 未开启堆外内存的异步刷盘线程。消息写入到 Page Cache 后,消息处理线程立即返回,通过 FlushRealTimeService 异步刷盘。
image.png
CommitRealTimeService CommitRealTimeService::run 开启堆外内存的异步刷盘线程。消息处理线程把消息写入到堆外内存后立即返回。后续先通过 CommitRealTimeService 把消息由堆外内存异步提交至 Page Cache,再由 FlushRealTimeService 线程异步刷盘。注意:在消息异步提交至 Page Cache 后,业务就可以从 MappedByteBuffer 读取到该消息。消息写入到堆外内存 writeBuffer 后,会通过 isAbleToCommit 方法判断是否积累到至少提交页数(默认4页)。如果页数达到最小提交页数,则批量提交;否则还是驻留在堆外内存,这边有丢失消息风险。通过这种批量操作,读和写的 Page Cache 会间隔数页,降低了 Page Cache 读写冲突的概率,实现了读写分离。
image.png

消息读取

根据 offset 查询

读取消息的过程就是先从 ConsumeQueue 中找到消息在 CommitLog 的物理偏移地址,然后再从 CommitLog 文件中读取消息的实体内容。

//org.apache.rocketmq.store.DefaultMessageStore::getMessage
public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset,
    final int maxMsgNums,
    final MessageFilter messageFilter) {
    long nextBeginOffset = offset;
 
    GetMessageResult getResult = new GetMessageResult();
 
    final long maxOffsetPy = this.commitLog.getMaxOffset();
    //找到对应的 ConsumeQueue
    ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);
    ...
    //根据 offset 找到对应的 ConsumeQueue 的 MappedFile
    SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset);
    status = GetMessageStatus.NO_MATCHED_MESSAGE;
    long maxPhyOffsetPulling = 0;
 
    int i = 0;
    //能返回的最大信息大小,不能大于 16M
    final int maxFilterMessageCount = Math.max(16000, maxMsgNums * ConsumeQueue.CQ_STORE_UNIT_SIZE);
    for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
        //CommitLog 物理地址
        long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();
        int sizePy = bufferConsumeQueue.getByteBuffer().getInt();
        maxPhyOffsetPulling = offsetPy;
        ...
        //根据 offset 和 size 从 CommitLog 拿到具体的 Message
        SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);
        ...
        //将 Message 放入结果集
        getResult.addMessage(selectResult);
        status = GetMessageStatus.FOUND;
    }
 
    //更新 offset
    nextBeginOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
 
    long diff = maxOffsetPy - maxPhyOffsetPulling;
    long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE
        * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
    getResult.setSuggestPullingFromSlave(diff > memory);
    ...
    getResult.setStatus(status);
    getResult.setNextBeginOffset(nextBeginOffset);
    return getResult;
}

根据 key 查询

读取消息的过程就是用 topic 和 key 找到 IndexFile 索引文件中的一条记录,根据记录中的 CommitLog 的 offset 从 CommitLog 文件中读取消息的实体内容

//org.apache.rocketmq.store.DefaultMessageStore::queryMessage
public QueryMessageResult queryMessage(String topic, String key, int maxNum, long begin, long end) {
    QueryMessageResult queryMessageResult = new QueryMessageResult();
    long lastQueryMsgTime = end;
     
    for (int i = 0; i < 3; i++) {
        //获取 IndexFile 索引文件中记录的消息在 CommitLog 文件物理偏移地址
        QueryOffsetResult queryOffsetResult = this.indexService.queryOffset(topic, key, maxNum, begin, lastQueryMsgTime);
        ...
        for (int m = 0; m < queryOffsetResult.getPhyOffsets().size(); m++) {
            long offset = queryOffsetResult.getPhyOffsets().get(m);
            ...
            MessageExt msg = this.lookMessageByOffset(offset);
            if (0 == m) {
                lastQueryMsgTime = msg.getStoreTimestamp();
            }
            ...
            //在 CommitLog 文件获取消息内容
            SelectMappedBufferResult result = this.commitLog.getData(offset, false);
            ...
            queryMessageResult.addMessage(result);
            ...
        }
    }
 
    return queryMessageResult;
}

在 IndexFile 索引文件,查找 CommitLog 文件物理偏移地址实现如下:

//org.apache.rocketmq.store.index.IndexFile::selectPhyOffset
public void selectPhyOffset(final List<Long> phyOffsets, final String key, final int maxNum,
final long begin, final long end, boolean lock) {
    int keyHash = indexKeyHashMethod(key);
    int slotPos = keyHash % this.hashSlotNum;
    int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;
    //获取相同 hash 值 key 的第一个 IndexItme 存储位置,即链表的首节点
    int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
     
    //遍历链表节点
    for (int nextIndexToRead = slotValue; ; ) {
        if (phyOffsets.size() >= maxNum) {
            break;
        }
 
        int absIndexPos =
            IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
                + nextIndexToRead * indexSize;
 
        int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos);
        long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4);
 
        long timeDiff = (long) this.mappedByteBuffer.getInt(absIndexPos + 4 + 8);
        int prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4);
 
        if (timeDiff < 0) {
            break;
        }
 
        timeDiff *= 1000L;
 
        long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff;
        boolean timeMatched = (timeRead >= begin) && (timeRead <= end);
 
        //符合条件的结果加入 phyOffsets
        if (keyHash == keyHashRead && timeMatched) {
            phyOffsets.add(phyOffsetRead);
        }
 
        if (prevIndexRead <= invalidIndex
            || prevIndexRead > this.indexHeader.getIndexCount()
            || prevIndexRead == nextIndexToRead || timeRead < begin) {
            break;
        }
         
        //继续遍历链表
        nextIndexToRead = prevIndexRead;
    }
    ...
}

消费者

启动

image.png
  1. this.checkConfig(): 首先是检测消费配置项,包括消费分组group、消息模型(集群、广播)、订阅数据、消息监听器等是否存在,如果不存在的话,会抛出异常。
  2. copySubscription(): 构建主题订阅信息SubscriptionData并加入到RebalanceImpl负载均衡方法的订阅信息中。
  3. getAndCreateMQClientInstance(): 初始化MQ客户端实例。
  4. offsetStore.load(): 根据不同消息模式创建消费进度offsetStore并加载:BROADCASTING-广播模式,同一个消费group中的consumer都消费一次,CLUSTERING-集群模式,默认方式,只被消费一次。可以通过setMessageModel方式设置不同模式;广播模式下同消费组的消费者相互独立,消费进度在本地单独进行存储;集群模式下,同一条消息只会被同一个消费组消费一次,消费进度会参与到负载均衡中,消费进度是共享在整个消费组中的。
switch (this.defaultMQPushConsumer.getMessageModel()) {
    case BROADCASTING:
        this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
        break;
    case CLUSTERING:
        this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
        break;
    default:
        break;
}
  1. consumeMessageService.start(): 根据不同消息监听类型实例化并启动。这里有延时消息和顺序消息。(ConsumeMessageConcurrentlyService/ConsumeMessageOrderlyService)
  2. mQClientFactory.registerConsumer():MQClientInstance注册消费者,并启动MQClientInstance,没有注册成功会结束消费服务。
  3. mQClientFactory.start():最后会启动如下服务:远程客户端、定时任务、pull消息服务、负载均衡服务、push消息服务,然后将状态改为运行中。

消费

Pull模式-DefaultMQPullConsumer

应用通常主动调用Consumer的拉消息方法从Broker服务器拉消息、主动权由应用程序控制,可以指定消费的位移,【伪代码】如下所示:

DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("TestConsumer");
// 设置NameServer的地址
consumer.setNamesrvAddr("localhost:9876");
// 启动消费者实例
consumer.start();
//获取主题下所有的消息队列,这里根据主题从nameserver获取的
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("Test");
for (MessageQueue queue : mqs) {
    //获取当前队列的消费位移,指定消费进度offset,fromstore:从broker中获取还是本地获取,true-broker
    long offset = consumer.fetchConsumeOffset(queue, true);
    PullResult pullResult = null;
    while (offset < pullResult.getMaxOffset()) {
        //第二个参数为tag,获取指定topic下的tag
        //第三个参数表示从哪个位移下开始消费消息
        //第四个参数表示一次最大拉取多少个消息
        try {
            pullResult = consumer.pullBlockIfNotFound(queue, "*", offset, 32);
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println("pull拉取消息失败");
        }
        //代码省略,记录消息位移
        offset = pullResult.getNextBeginOffset();
        //代码省略,这里为消费消息
    }
}

push模式-DefaultMQPushConsumer

该模式下Broker收到数据后会主动推送给消费端,该消费模式一般实时性较高,现在一般推荐使用该方式。它也是通过实现pull方式来实现的。消费者启动之后,最后会启动拉取消息服务pullMessageService和负载均衡rebalanceService服务,它们启动后会一直有线程进行消费

case CREATE_JUST:
               //......
                // Start pull service
                this.pullMessageService.start();
                // Start rebalance service
                this.rebalanceService.start();
                //.......
                this.serviceState = ServiceState.RUNNING;
                break;
  case RUNNING:

  public class PullMessageService extends ServiceThread {
      @Override
      public void run() {
          log.info(this.getServiceName() + " service started");
     
          while (!this.isStopped()) {
              try {
                  PullRequest pullRequest = this.pullRequestQueue.take();
                  if (pullRequest != null) {
                      this.pullMessage(pullRequest);
                  }
              } catch (InterruptedException e) {
              } catch (Exception e) {
                  log.error("Pull Message Service Run Method exception", e);
              }
          }
     
          log.info(this.getServiceName() + " service end");
    }
  }

  public class RebalanceService extends ServiceThread {
    //初始化,省略....
 
    @Override
    public void run() {
        log.info(this.getServiceName() + " service started");
 
        while (!this.isStopped()) {
            this.waitForRunning(waitInterval);
            //做负载均衡
            this.mqClientFactory.doRebalance();
        }
 
        log.info(this.getServiceName() + " service end");
    }
 
    @Override
    public String getServiceName() {
        return RebalanceService.class.getSimpleName();
    }
}

pullMessage()中pullKernelImpl()有一个Pullback方法用于执行消息的回调,它会通过submitConsumeRequest()这个方法来处理消息,总而言之就是通过线程回调的方式让push模式下的监听器能够感知到。

//Pull回调
PullCallback pullCallback = new PullCallback() {
            @Override
            public void onSuccess(PullResult pullResult) {
                if (pullResult != null) {
                    pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
                        subscriptionData);
 
                    switch (pullResult.getPullStatus()) {
                        case FOUND:
                         //省略...消费位移更新
                                DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
                                    pullResult.getMsgFoundList(),
                                    processQueue,
                                    pullRequest.getMessageQueue(),
                                    dispathToConsume);
                    }
                }
            }
}

// 注册回调实现类来处理从broker拉取回来的消息
 consumer.registerMessageListener(new MessageListenerConcurrently() {
     @Override
     public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
         System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
         // 标记该消息已经被成功消费
         return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
     }
 });

对比分析

push模式相比较于pull模式不同的是,做负载均衡时,pullRequest请求会放入pullRequestQueue,然后PullMessageService线程会实时去取出这个请求,将消息存入ProcessQueue,通过线程回调的方式让push模式下的监听器能够感知到,这样消息从分发请求到接收都是实时的,而pull模式是消费端主动去拉取指定消息的,需要指定消费进度。
对于我们开发者来说,选取哪种模式实现我们的业务逻辑比较合适呢?
共同点:

  1. 两者底层实际一样,push模式也是基于pull模式来实现的。
  2. pull模式需要我们通过程序主动通过consumer向broker拉消息,而消息的push模式则只需要我们提供一个listener监听,实时获取消息。
    优点:
  3. push模式采用长轮询open in new window阻塞的方式获取消息,实时性非常高;;
  4. push模式rocketMQ处理了获取消息的细节,使用起来比较简单方便;
  5. pull模式可以指定消费进度,想消费多少就消费多少,灵活性大。
    缺点:
  6. push模式当消费者能力远远低于生产者能力的时候,会产生一定的消费者消息堆积;
  7. pull模式实时性很低,频率不好设置;
  8. 拉取消息的间隔不好设置,太短则产生很多无效Pull请求的RPC开销,影响MQ整体的网络性能,太长则实时性差。
    消费端也是通过Netty和Broker进行通信,如下图。


    image.png

    封装请求报文RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE)

public class MQClientAPIImpl implements NameServerUpdateCallback {
    public PullResult pullMessage(final String addr, final PullMessageRequestHeader requestHeader, ..., final PullCallback pullCallback) {
        ...
        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);
        ...
        RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
        ...
    }
}

//RemotingClient调用channel.writeAndFlush(request)发出拉取请求
public abstract class NettyRemotingAbstract {
    public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis) {
        ...
        try {
            ...
            channel.writeAndFlush(request).addListener((ChannelFutureListener) f -> {
                ...
            });
            ...
        } 
            ...
    }
}

//Broker接收拉取请求匹配返回消息
//同样Broker中NettyServerHandler收到RemotingCommand(RequestCode.PULL_MESSAGE)这个指令找到PullMessageProcessor调用processRequest方法去处理。
public class PullMessageProcessor implements NettyRequestProcessor {
    private RemotingCommand processRequest(final Channel channel, RemotingCommand request, ...) {
        ...
        messageStore.getMessageAsync(group, topic, queueId, requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter)
        .thenApply(result -> {
            ...
        })
        // 写回客户端
        .thenAccept(result -> NettyRemotingAbstract.writeResponse(channel, request, result));
        ...
    }
}
//PullMessageProcessor.processRequest处理时,调用messageStore.getMessageAsync去队列里查找消息,之后写回客户端
public class DefaultMessageStore implements MessageStore {
    @Override
    public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset, ...) {
        ...
        GetMessageResult getResult = new GetMessageResult();
        ...
        SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);
        ...
        getResult.addMessage(selectResult, cqUnit.getQueueOffset(), cqUnit.getBatchNum());
        ...
        return getResult;
    }
}
//从commitLog中获取消息
public class CommitLog implements Swappable {
    public SelectMappedBufferResult getMessage(final long offset, final int size) {
        int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog();
        MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, offset == 0);
        if (mappedFile != null) {
            int pos = (int) (offset % mappedFileSize);
            SelectMappedBufferResult selectMappedBufferResult = mappedFile.selectMappedBuffer(pos, size);
            if (null != selectMappedBufferResult) {
                selectMappedBufferResult.setInCache(coldDataCheckService.isDataInPageCache(offset));
                return selectMappedBufferResult;
            }
        }
        return null;
    }
}

消费端的负载均衡

AllocateMessageQueueAveragely

平均算法: 算出平均值,将连续的队列按平均值分配给每个消费者。 如果能够整除,则按顺序将平均值个Queue分配,如果不能整除,则将多余出的Queue按照Consumer顺序逐个分配


image.png

AllocateMessageQueueAveragelyByCircle

环形平均算法:将消费者按顺序形成一个环形,然后按照这个环形顺序逐个给消费者分配一个Queue


image.png

AllocateMessageQueueConsistentHash

一致性hash算法:先将消费端的hash值放于环上,同时计算队列的hash值,以顺时针方向,分配给离队列hash值最近的一个消费者节点


image.png
/*
   1.初始化:默认使用AllocateMessageQueueAveragely算法分配Queue
 */
public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer {
    public DefaultMQPushConsumer(final String consumerGroup) {
        this(null, consumerGroup, null, new AllocateMessageQueueAveragely());
    }
}
/*
   2.开启一个RebalanceService任务执行分配策略
 */
public class MQClientInstance {
    public void start() throws MQClientException {
        synchronized (this) {
            switch (this.serviceState) {
                case CREATE_JUST:
                    ...
                    // Start rebalance service
                    this.rebalanceService.start();
                    ...
                default:
                    break;
            }
        }
    }
}
/*
   3.RebalanceImpl.rebalanceByTopic执行具体的分配逻辑
 */
public abstract class RebalanceImpl {
    private boolean rebalanceByTopic(final String topic, final boolean isOrder) {
        boolean balanced = true;
        switch (messageModel) {
            ...
            case CLUSTERING: {
                // 拿到所有的Queue
                Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
                // 拿到所有的消费者ID
                List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
                ...
                if (mqSet != null && cidAll != null) {
                    List<MessageQueue> mqAll = new ArrayList<>();
                    mqAll.addAll(mqSet);

                    Collections.sort(mqAll);
                    Collections.sort(cidAll);

                    // 初始化设置的分配算法:即AllocateMessageQueueAveragely平均分配算法
                    AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;

                    List<MessageQueue> allocateResult = null;
                    try {
                        // 调用分配算法的具体实现
                        allocateResult = strategy.allocate(...mqAll, cidAll);
                    } catch (Throwable e) {
                        log.error("allocate message queue exception. strategy name: {}, ex: {}", strategy.getName(), e);
                        return false;
                    }
                    ...
                }
                break;
            }
            default:
                break;
        }
        return balanced;
    }
}
/*
   4.执行AllocateMessageQueueAveragely平均分配算法的具体实现
 */
public class AllocateMessageQueueAveragely extends AbstractAllocateMessageQueueStrategy {
    @Override
    public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) {

        List<MessageQueue> result = new ArrayList<>();
        if (!check(consumerGroup, currentCID, mqAll, cidAll)) {
            return result;
        }

        int index = cidAll.indexOf(currentCID);
        int mod = mqAll.size() % cidAll.size();
        int averageSize =
                mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
                        + 1 : mqAll.size() / cidAll.size());
        int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
        int range = Math.min(averageSize, mqAll.size() - startIndex);
        for (int i = 0; i < range; i++) {
            result.add(mqAll.get((startIndex + i) % mqAll.size()));
        }
        return result;
    }
}
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容