rocketmq-1

rocketmq-1

rocketmq的存储文件:

  1. CommitLog
  2. ConsumeQueue 消费队列文件

CommitLog

将CommitLog中存储的内容解析出来具有如下字段:

  1. 主题
  2. 消息
  3. 队列
  4. 存储地址

CommitLog 文件保存了所有主题的消息,但我们消费时,更多的是订阅某一个主题进行消费。RocketMQ是怎么样进行高效的检索消息的呢 ?

MessageQueue

Topic有多少个队列,就有多少个MessageQueue。MesageQueue一般均匀分布在broker集群上,一个broker会对应一个或多个MessageQueue。

当向topic发送消息时,会通过某种策略来选择一个MessageQueue,默认是轮询策略(顺序消费和推送策略有关)。而在故障延迟的情况下,还会有一套复杂的推送策略。

ConsumerQueue

ConsumerQueue的存储文件,是为了高效检索主题消息的。

ConsumerQueue的文件目录结构:

-consumequeue文件夹
--topic文件夹
---MessageQueue队列ID命名文件夹
----consumequeue存储文件

根据以上文件目录结构,我们可以得出:

  1. 先通过主题名称,可以定位到具体的文件夹;
  2. 然后根据消息队列ID找到具体的文件;
  3. 最后根据文件内容,找到具体的消息。

解析ConsumerQueue的存储文件

消费消息

存储内容:

  1. 消息长度
  2. 消息偏移量

消息偏移量=消息长度*队列长度

根据ConsumerQueue文件中的消息长度和消息偏移量,可以在CommitLog中找到对应的消息。

通过MessageId查询

rocketmq在消费消息时时按照消息偏移量来定位消息,除此之外,rocketmq还有几种查找消息的方式:

  1. 通过Message Key查询,消息发送前由客户端生成
  2. 通过Unique Key查询,消息发送前由客户端生成
  3. 通过Message Id查询,Broker端存储消息的时候生成

Message Id

总共 16 字节,包含消息存储主机地址和在CommitLog文件中的偏移量offset。

/**
 * 创建消息ID
 * @param input     
 * @param addr      Broker服务器地址
 * @param offset    正在存储的消息,在Commitlog中的偏移量
 * @return
 */
public static String createMessageId(final ByteBuffer input, final ByteBuffer addr, final long offset) {
    input.flip();
    int msgIDLength = addr.limit() == 8 ? 16 : 28;
    input.limit(msgIDLength);
    input.put(addr);
    input.putLong(offset);
    return UtilAll.bytes2string(input.array());
}

Index

由于Message Id自身包含了消息存储主机地址和在CommitLog文件中的偏移量offset,所以查找消息相对来说比较容易。

为了能通过Message Key和Unique Key也能查找消息,RocketMQ引入Hash索引机制,为消息建立索引,它的键就是Message Key 和 Unique Key。

index索引文件结构

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容