Broker是如何存储消息的
流程图
代码解释
写入CommitLog
public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
// Set the storage time
msg.setStoreTimestamp(System.currentTimeMillis());
// Set the message body BODY CRC (consider the most appropriate setting
// on the client)
msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
// Back to Results
AppendMessageResult result = null;
StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
String topic = msg.getTopic();
int queueId = msg.getQueueId();
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
// Delay Delivery 延时等级 > 0,替换原有的消费主题为 SCHEDULE_TOPIC_XXXX,队列id 为 延时的等级-1
if (msg.getDelayTimeLevel() > 0) {
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
// Backup real topic, queueId
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
msg.setTopic(topic);
msg.setQueueId(queueId);
}
}
InetSocketAddress bornSocketAddress = (InetSocketAddress) msg.getBornHost();
if (bornSocketAddress.getAddress() instanceof Inet6Address) {
msg.setBornHostV6Flag();
}
InetSocketAddress storeSocketAddress = (InetSocketAddress) msg.getStoreHost();
if (storeSocketAddress.getAddress() instanceof Inet6Address) {
msg.setStoreHostAddressV6Flag();
}
PutMessageThreadLocal putMessageThreadLocal = this.putMessageThreadLocal.get();
PutMessageResult encodeResult = putMessageThreadLocal.getEncoder().encode(msg);
if (encodeResult != null) {
return CompletableFuture.completedFuture(encodeResult);
}
msg.setEncodedBuff(putMessageThreadLocal.getEncoder().encoderBuffer);
PutMessageContext putMessageContext = new PutMessageContext(generateKey(putMessageThreadLocal.getKeyBuilder(), msg));
long elapsedTimeInLock = 0;
MappedFile unlockMappedFile = null;
// 申请 putMessageLock 锁 (将消息存储到CommitLog文件中是串行的)
putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
try {
// 获取当前可以写入的CommitLog文件
// CommitLog 存储地址:${ROCKET_HOME}/store/commitlog 文件默认大小1g。一个文件写满后就再创建另一个文件
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
this.beginTimeInLock = beginLockTimestamp;
// Here settings are stored timestamp, in order to ensure an orderly
// global
msg.setStoreTimestamp(beginLockTimestamp);
// ${ROCKET_HOME}/store/commitlog 目录下没有任何文件
if (null == mappedFile || mappedFile.isFull()) {
// 以偏移量为0 创建commitLog文件
mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
}
if (null == mappedFile) {
log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));
}
result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
switch (result.getStatus()) {
case PUT_OK:
break;
case END_OF_FILE:
unlockMappedFile = mappedFile;
// Create a new file, re-write the message
mappedFile = this.mappedFileQueue.getLastMappedFile(0);
if (null == mappedFile) {
// XXX: warn and notify me
log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result));
}
result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
break;
case MESSAGE_SIZE_EXCEEDED:
case PROPERTIES_SIZE_EXCEEDED:
beginTimeInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result));
case UNKNOWN_ERROR:
beginTimeInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
default:
beginTimeInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
}
elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
beginTimeInLock = 0;
} finally {
// 处理完追加逻辑就会释放锁
putMessageLock.unlock();
}
if (elapsedTimeInLock > 500) {
log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, result);
}
if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
}
PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
// Statistics
storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).add(1);
storeStatsService.getSinglePutMessageTopicSizeTotal(topic).add(result.getWroteBytes());
// 刷盘
CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);
// 此处会处理主从同步的结果(HA)
CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, msg);
return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {
if (flushStatus != PutMessageStatus.PUT_OK) {
putMessageResult.setPutMessageStatus(flushStatus);
}
if (replicaStatus != PutMessageStatus.PUT_OK) {
putMessageResult.setPutMessageStatus(replicaStatus);
if (replicaStatus == PutMessageStatus.FLUSH_SLAVE_TIMEOUT) {
log.error("do sync transfer other node, wait return, but failed, topic: {} tags: {} client address: {}",
msg.getTopic(), msg.getTags(), msg.getBornHostNameString());
}
}
return putMessageResult;
});
}
从代码中,我们可以看到写入CommitLog这个操作主要做了以下几件事情
- 设置存储的消息的基本信息
- 如果消息是延时消息,会将原有的消息topic替换为 SCHEDULE_TOPIC_XXXX,队列id为延迟等级
- 申请putMessageLock(该操作确保了只会有一个线程去对CommitLog进行修改)
- 如果CommitLog不存在或者已经写满,需要创建新的CommitLog
- 往MappedFile中追加消息(追加消息的时候才会生成消息唯一ID)
- 如果文件剩余空间不足,会创建新的文件 (消息长度 + END_FILE_MIN_BLANK_LENGTH > CommitLog文件空闲空间,返回END_OF_FILE)
- 其他的一些异常
- 追加完消息,释放锁
那么,这边来解释下CommitLog和MappedFile的关系
CommitLog和MappedFile的关系
CommitLog里面有什么
CommitLog里面记录了消息的完整内容,我们在读取消息的时候,首先先通过前4个字节记录了当前消息的实际长度,然后再往后读对应的长度,就可以将消息完全读取出来
CommitLog文件在磁盘中的存储路径 ${ROCKET_HOME}/store/commitlog/
CommitLog和MappedFile的关系
RocketMQ采用内存映射文件的方式来提高IO访问性能,无论是CommitLog,ConsumeQueue还是IndexFile,单个文件都被设置为固定长度。然后RocketMQ使用MappedFile和MappedFileQueue来封装存储文件。具体对MappedFile和MappedFileQueue的概念我会在后面在具体介绍
CommitLog与ConsumeQueue,IndexFile的关系
既然要探讨ConsumeQueue和IndexFile与CommitLog之间的关系,不如直接来讲一讲这两个文件是干什么用的,从他们的用处中我们也就能够得知为什么在已经有了CommitLog的情况下,RocketMQ还需要ConsumeQueue和IndexFile这两种文件
ConsumeQueue
首先,我们来看看ConsumeQueue的结构
ConsumeQueue的结构
从图片中,我们不难看出,RocketMQ对ConsumeQueue的层级目录为
ConsumeQueue - 具体的Topic - 具体的某一个队列 - ConsumeQueue的单元 (记录了消息在CommitLog中的偏移量,消息的长度,消息的tags)
为什么需要CommitLog和ConsumeQueue的映射关系
那么为什么RocketMQ要设计这样的映射关系呢?主要要从两个方面来考虑,同时这两个方面最终的目的也是为了保证RocketMQ的性能
- 写入消息:如果我们没有CommitLog,直接就用一个ConsumeQueue,那么在Producer生产消息给Broker的时候,不同的Topic和队列的消息我们要写到不同的ConsumeQueue文件中。那么就会存在随机写的问题,这样写入消息的效率就会变的非常的低
- 读取消息:如果为了保证消息的写入效率,我们将消息存放在CommitLog中,但是,在消费者拉取消息的时候,它肯定是要拉取自己感兴趣的Topic的消息。此时,我们去CommitLog中寻找对应的Topic和对应MessageQueue下对应偏移量的消息,挨个去遍历CommitLog一看就是一个效率非常低的操作。此时我们使用ConsumeQueue就可以快速定位到消息处于哪个CommitLog,他对应的偏移量是多少。这样读取消息的效率就会变得非常的高
ConsumeQueue的tag hashcode
这边补充一点,关于ConsumeQueue中的tag hashcode。
在RocketMQ中,我们不仅可以监听我们感兴趣的Topic消息,同时我们还可以只监听Topic下部分的消息。这个操作就是通过消息的tag来实现的(一种实现方式,另外还有类过滤和SQL92过滤的方式)【RocketMQ服务端过滤根据tag的hashcode】
在消费者拉取消息的时候,不仅消费者自己会对消息进行过滤,Broker也会根据消费者的情况对消息进行一次预过滤。此处我们就不展开讲述,具体可以消息消费 Consumer消费消息 的文章
IndexFile
IndexFile的结构
他的目的是为了方便我们根据消息的key快速检索消息,快速检索主要是依靠文件头部的Hash槽。
哈希冲突的解决
既然说到了hash,那么就一定会存在哈希冲突的情况。那么,IndexFile是如何解决哈希冲突的呢?
在我们写IndexFile的时候,假如IndexFile还有足够的空间,那么我们就会对当前这条消息的消息key进行一次哈希计算,假如对应的hash槽中没有记录对应的index条目偏移量,那么直接将偏移量记录到hash槽中。否则这是在Index条目中的pre index no记录原先的偏移量,然后再将这个条目的偏移量记录到hash槽中
对于为什么要设计CommitLog,ConsumeQueue,IndexFile的总结
因为消费者消费消息的时候,是根据具体的Topic去进行消费的。所以为了加快消费者消费的速度,需要根据具体的Topic去存储消息(ConsumeQueue)。但是如果直接把消息存储在ConsumeQueue,会导致生产者发送的消息Topic不同,出现大量随机写的情况。所以为了提高消费者和生产者的速度。相同的,为了加快对消息key检索消息的速度,所以添加了IndexFile
RocketMQ首先先将生产者的消息写入到CommitLog中(顺序写),然后在通过一个异步线程将消息基本信息(偏移量等)写入到ConsumeQueue和IndexFile中
class ReputMessageService extends ServiceThread {
// ...省略其他代码
@Override
public void run() {
DefaultMessageStore.log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
// 每执行一次任务推送休息1毫秒,就继续尝试推送消息到消息消费队列和索引文件
Thread.sleep(1);
// 消息消费转发核心方法
this.doReput();
} catch (Exception e) {
DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
}
}
DefaultMessageStore.log.info(this.getServiceName() + " service end");
}
// ...省略其他代码
}
// 最终调用到该方法
public void doDispatch(DispatchRequest req) {
// CommitLogDispatcher 有三个实现
// 其中就有我们用到的 CommitLogDispatcherBuildConsumeQueue 写ConsumeQueue
// CommitLogDispatcherBuildIndex 写IndexFile
for (CommitLogDispatcher dispatcher : this.dispatcherList) {
dispatcher.dispatch(req);
}
}
MappedFile和MappedFileQueue
MappedFile
为什么需要MappedFile
RocketMQ底层使用 mmap + write (零拷贝)的方式减少用户态和内存态的切换次数
具体内容参考 mmap解释 这篇文章
这里只简单介绍一下:
加入没有mmap,我们读取文件需要经历两次数据拷贝,一次是从磁盘拷贝到page cache中,一次从page cache拷贝到用户空间内存。
而有了mmap,就能够减少page cache到用户空间内存的拷贝,mmap对page cache和用户空间虚拟地址进行了直接的映射,操作虚拟地址就等同于操作page cache
MappedFileQueue
MappedFileQueue是MappedFile的管理容器,它是对存储目录的封装
比如说在CommitLog文件存储目录下有好多个CommitLog文件,对应在内存中也会有好多个MappedFile对象,而MappedFileQueue就是来管理这些MappedFile对象的
MappedFile刷盘操作
消息刷盘有两种:
- 同步刷盘:只有在消息真正持久化至磁盘后RocketMQ的Broker端才会真正返回给Producer端一个成功的ACK响应。同步刷盘对MQ消息可靠性来说是一种不错的保障,但是性能上会有较大影响,一般适用于金融业务应用该模式较多。
- 异步刷盘:能够充分利用OS的PageCache的优势,只要消息写入PageCache即可将成功的ACK返回给Producer端。消息刷盘采用后台异步线程提交的方式进行,降低了读写延迟,提高了MQ的性能和吞吐量。
RocketMQ定期删除已经消费的消息文件
另外,RocketMQ会定时将已经消费的消息从存储文件中删除,以CommitLog为例,第一个CommitLog文件的偏移量不一定 00000000000000000000
主干逻辑流程图
对应源码和注释
// 默认每个文件过期时间为72小时,通过Broker配置文件中设置fileReservedTime来改变
// 如果非当前写文件在一定时间间隔内没有再次被更新,则会被认为是过期文件,可以被删除(RocketMQ不会关注这个文件上的消息是否被全部消费)
private void deleteExpiredFiles() {
int deleteCount = 0;
// 文件保留时间 72小时
long fileReservedTime = DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime();
// 删除物理文件的间隔(在一次清除过程中,可能需要删除的文件不止一个,该值指定两次删除文件的时间间隔)
int deletePhysicFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval();
// 在清除过期文件时,如果该文件被其他线程所占用(引用次数大于0,比如读取消息),此时会阻止此次删除任务
// 同时在第一次试图删除该文件时记录当前时间戳 destroyMapedFileIntervalForcibly 表示第一次拒绝删除之后能保留的最大时间
// 在此时间内,同样可以被拒绝删除,同时会将引用减少1000个,超过该时间间隔后,文件将被强制删除
int destroyMapedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();
// 指定删除文件的时间点
boolean timeup = this.isTimeToDelete();
// 磁盘空间满了
boolean spacefull = this.isSpaceToDelete();
boolean manualDelete = this.manualDeleteFileSeveralTimes > 0;
if (timeup || spacefull || manualDelete) {
if (manualDelete)
this.manualDeleteFileSeveralTimes--;
boolean cleanAtOnce = DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable() && this.cleanImmediately;
log.info("begin to delete before {} hours file. timeup: {} spacefull: {} manualDeleteFileSeveralTimes: {} cleanAtOnce: {}",
fileReservedTime,
timeup,
spacefull,
manualDeleteFileSeveralTimes,
cleanAtOnce);
fileReservedTime *= 60 * 60 * 1000;
// 清除过期文件
deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval,
destroyMapedFileIntervalForcibly, cleanAtOnce);
if (deleteCount > 0) {
} else if (spacefull) {
log.warn("disk space will be full soon, but delete file failed.");
}
}
}
public int deleteExpiredFileByTime(final long expiredTime,
final int deleteFilesInterval,
final long intervalForcibly,
final boolean cleanImmediately) {
Object[] mfs = this.copyMappedFiles(0);
if (null == mfs)
return 0;
// 从倒数第二个文件开始
int mfsLength = mfs.length - 1;
int deleteCount = 0;
// 通过MappedFile,没有磁盘i/o的情况
List<MappedFile> files = new ArrayList<MappedFile>();
if (null != mfs) {
for (int i = 0; i < mfsLength; i++) {
MappedFile mappedFile = (MappedFile) mfs[i];
// 文件最大存活时间=文件的最后一次更新时间+文件存活时间(默认72小时)
long liveMaxTimestamp = mappedFile.getLastModifiedTimestamp() + expiredTime;
// 当前时间>=最大存活时间 或 需要立即删除
if (System.currentTimeMillis() >= liveMaxTimestamp || cleanImmediately) {
// 删除文件(将该文件加入到待删除文件列表中,然后统一执行File#delete方法将文件从物理磁盘中删除)
if (mappedFile.destroy(intervalForcibly)) {
files.add(mappedFile);
deleteCount++;
if (files.size() >= DELETE_FILES_BATCH_MAX) {
break;
}
if (deleteFilesInterval > 0 && (i + 1) < mfsLength) {
try {
Thread.sleep(deleteFilesInterval);
} catch (InterruptedException e) {
}
}
} else {
break;
}
} else {
//avoid deleting files in the middle
break;
}
}
}
deleteExpiredFile(files);
return deleteCount;
}