rocketmq-1
rocketmq的存储文件:
- CommitLog
- ConsumeQueue 消费队列文件
CommitLog
将CommitLog中存储的内容解析出来具有如下字段:
- 主题
- 消息
- 队列
- 存储地址
CommitLog 文件保存了所有主题的消息,但我们消费时,更多的是订阅某一个主题进行消费。RocketMQ是怎么样进行高效的检索消息的呢 ?
MessageQueue
Topic有多少个队列,就有多少个MessageQueue。MesageQueue一般均匀分布在broker集群上,一个broker会对应一个或多个MessageQueue。
当向topic发送消息时,会通过某种策略来选择一个MessageQueue,默认是轮询策略(顺序消费和推送策略有关)。而在故障延迟的情况下,还会有一套复杂的推送策略。
ConsumerQueue
ConsumerQueue的存储文件,是为了高效检索主题消息的。
ConsumerQueue的文件目录结构:
-consumequeue文件夹
--topic文件夹
---MessageQueue队列ID命名文件夹
----consumequeue存储文件
根据以上文件目录结构,我们可以得出:
- 先通过主题名称,可以定位到具体的文件夹;
- 然后根据消息队列ID找到具体的文件;
- 最后根据文件内容,找到具体的消息。
解析ConsumerQueue的存储文件
消费消息
存储内容:
- 消息长度
- 消息偏移量
消息偏移量=消息长度*队列长度
根据ConsumerQueue文件中的消息长度和消息偏移量,可以在CommitLog中找到对应的消息。
通过MessageId查询
rocketmq在消费消息时时按照消息偏移量来定位消息,除此之外,rocketmq还有几种查找消息的方式:
- 通过Message Key查询,消息发送前由客户端生成
- 通过Unique Key查询,消息发送前由客户端生成
- 通过Message Id查询,Broker端存储消息的时候生成
Message Id
总共 16 字节,包含消息存储主机地址和在CommitLog文件中的偏移量offset。
/**
 * 创建消息ID
 * @param input     
 * @param addr      Broker服务器地址
 * @param offset    正在存储的消息,在Commitlog中的偏移量
 * @return
 */
public static String createMessageId(final ByteBuffer input, final ByteBuffer addr, final long offset) {
    input.flip();
    int msgIDLength = addr.limit() == 8 ? 16 : 28;
    input.limit(msgIDLength);
    input.put(addr);
    input.putLong(offset);
    return UtilAll.bytes2string(input.array());
}
Index
由于Message Id自身包含了消息存储主机地址和在CommitLog文件中的偏移量offset,所以查找消息相对来说比较容易。
为了能通过Message Key和Unique Key也能查找消息,RocketMQ引入Hash索引机制,为消息建立索引,它的键就是Message Key 和 Unique Key。