概览
RocketMQ的消息存储主要是在${ROCKETMQ_HOME}/store文件夹下,message消息主要存储在commitlog文件夹下,RocketMQ消息存储和索引是分开隔离的,已Topic为主题的消息索引存储在consumequeue文件夹下,通过MessageQueue映射为ConsumeQueue的文件就存储在这个文件夹下,然后index主要是以消息key和offset的对应关系,以类似HashMap的方式存储,方便消息查询。
本片文章主要介绍消息存储组织结构、Message是如何快速存储都MappedFile文件中的。MappedFile文件就是一个个以首条消息的offset为名称的存储文件,如上图commitlog文件夹下展示的00000000000000000000、00000000001073741824等,每一个mappedFile文件的大小约为102410241024=1G。
DefaultMessageStore
DefaultMessageStore是消息相关操作的主要服务,包括消息的存储、查询、定时清除等等。这里主要介绍其中消息存储相关的事物,包括是否开启TransientStorePool临时消息存储池,一次创建2个MappedFile文件的AllocateMappedFileService消息存储预创建服务,还有历史存储文件mappedFile加载加载到直接内存MappedByteBuffer和对应的mmap文件映射等。
# org.apache.rocketmq.store.DefaultMessageStore
// MappedFile 分配服务
private final AllocateMappedFileService allocateMappedFileService;
// 是否开启
// 消息临时存储
private final TransientStorePool transientStorePool;
this.transientStorePool = new TransientStorePool(messageStoreConfig);
// 根据是否开启 transientStorePoolEnable,存在两种初始化情况。
// transientStorePoolEnable 为 true 表示内容先存储在堆外内存(直接内存),然后通过 Commit 线程将数据提交到FileChannel中,再通过 Flush 线程将内存映射 Buffer 中的数据持久化到磁盘中。
if (messageStoreConfig.isTransientStorePoolEnable()) {
this.transientStorePool.init();
}
//加载历史mappedFile文件,进行便于文件查询和消费
// load Commit Log
result = result && this.commitLog.load();
TransientStorePool
TransientStorePool是短暂的消息存储池。这里先进行简单介绍,具体作用到应用的时候详细介绍。这里直接开辟默认5个1G的直接内存ByteBuffer,用来临时存储消息。它还引入了内存锁的机制,避免直接内存的数据被替换到系统中的Swap分区中,提高系统存储性能,使RocketMQ消息低延迟、高吞吐量。
public class TransientStorePool {
// availableBuffers 个数,可通过在broker中配置文件中设置 transientStorePool,默认值为 5
private final int poolSize;
// 每个 ByteBuffer 大小,默认为 mappedFileSizeCommitLog,表明 TransientStorePool 为 commitlog 文件服务
private final int fileSize;
// 直接内存,ByteBuffer 容器,双端队列
private final Deque<ByteBuffer> availableBuffers;
/**
* 创建默认的堆外内存
* It's a heavy init method.
*/
public void init() {
for (int i = 0; i < poolSize; i++) {
// 利用 NIO 直接直接分配,堆外内存(直接内存),在系统中的内存,非 JVM 内存
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(fileSize);
// 内存地址
final long address = ((DirectBuffer) byteBuffer).address();
Pointer pointer = new Pointer(address);
// 内存锁定
LibC.INSTANCE.mlock(pointer, new NativeLong(fileSize));
availableBuffers.offer(byteBuffer);
}
}
}
CommitLog
CommitLog主要有消息的刷盘的存储服务、消息的刷盘服务,存储消息的回调等等,这里主要介绍MappedFileQueue,它是对${ROCKET_HOME}/store/commitlog目录的封装,主要用来管理多个MappedFile。
public class CommitLog {
// 映射文件队列,ROCKETMQ_HOME/commitlog 文件夹下的文件对应
protected final MappedFileQueue mappedFileQueue;
// 默认消息存储服务
protected final DefaultMessageStore defaultMessageStore;
// commitLog 刷盘操作
private final FlushCommitLogService flushCommitLogService;
//If TransientStorePool enabled, we must flush message to FileChannel at fixed periods
// 定时将 transientStorePoll 中的直接内存 ByteBuffer,提交条内存映射 MappedByteBuffer 中
private final FlushCommitLogService commitLogService;
// 存储消息到 mappedFile 的回调映射
private final AppendMessageCallback appendMessageCallback;
// 消息解码服务线程
private final ThreadLocal<MessageExtBatchEncoder> batchEncoderThreadLocal;
// topic-queue-id,offset;消息的key,和在 commitlog 中的 offset,方便消息存储时的索引
protected HashMap<String/* topic-queueid */, Long/* offset */> topicQueueTable = new HashMap<String, Long>(1024);
public CommitLog(final DefaultMessageStore defaultMessageStore) {
// 在这里组织 commitlog 的对应的 MappedFile 文件,然后进行相应的文件操作,文件映射,刷线到磁盘文件
this.mappedFileQueue = new MappedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(),
defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(), defaultMessageStore.getAllocateMappedFileService());
this.defaultMessageStore = defaultMessageStore;
// 异步、同步刷盘服务初始化
if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
// 同步刷盘服务为 GroupCommitService
this.flushCommitLogService = new GroupCommitService();
} else {
// 异步刷盘服务为 FlushRealTimeService
this.flushCommitLogService = new FlushRealTimeService();
}
// 定时将 transientStorePoll 中的直接内存 ByteBuffer,提交条内存映射 MappedByteBuffer 中
this.commitLogService = new CommitRealTimeService();
// 存储消息到 mappedFile 的回调映射
this.appendMessageCallback = new DefaultAppendMessageCallback(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
batchEncoderThreadLocal = new ThreadLocal<MessageExtBatchEncoder>() {
@Override
protected MessageExtBatchEncoder initialValue() {
return new MessageExtBatchEncoder(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
}
};
// putMessage 到 mappedFile 时是否使用可重入锁,默认使用自旋锁
this.putMessageLock = defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() : new PutMessageSpinLock();
}
}
MappedFileQueue
MappedFile 的存储集合和管理器,是对 ${ROCKET_HOME}/store/commitlog 文件夹的封装。主要用来管理MappedFile文件,包括消息的查询、提交、落盘的刷新,历史MappedFile文件的预热加载和直接内存映射mmap操作,过期文件的删除、追加消息的最后一个MappedFile文件的获取和创建等。
public class MappedFileQueue {
// 存储路径${ROCKET_HOME}/store/commitlog,该目录下会存在多个内存映射文件
private final String storePath;
// 单个文件的存储大小
private final int mappedFileSize;
// mappedFile 文件集合
// 一个线程安全的 ArrayList 的变种,通过可 reentrantLock 可重入锁实现数组的新建和数组旧有内容的 copy 到新建的数组,然后返回新建的数组
private final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();
// 创建 MappedFile 服务类
private final AllocateMappedFileService allocateMappedFileService;
// 当前刷盘指针,表示该指针之前的所有数据全部持久化到磁盘
// MappedFile 中的 MappedByteBuffer 中数据写入磁盘的指针,该指针之前的所有数据全部持久化到磁盘
private long flushedWhere = 0;
// Java 应用程序态数据要写入nio内存映射的ByteBuffer的提交了位置的指针
// commitWhere 只有开启 transientStorePool 的前提下才有作用;
// commitWhere 代表着 transientStorePool 中直接内存 ByteBuffer 需要提交数据到 MappedByteBuffer 直接内存的,位置为已经提交了数据的位置。下次要提交的开始位置,上次提交的结尾位置。
private long committedWhere = 0;
/**
* 项目启动,加载 commitlog 文件夹下对应的文件
* @return
*/
public boolean load() {
File dir = new File(this.storePath);
File[] files = dir.listFiles();
if (files != null) {
// ascending order
// 根据文件名(offset)排序
Arrays.sort(files);
for (File file : files) {
// 如果物理文件大小 != mappedFileSize,说明文件被破坏了,返回false
if (file.length() != this.mappedFileSize) {
log.warn(file + "\t" + file.length()
+ " length not matched message store config value, please check it manually");
return false;
}
try {
MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize);
// 更新 mappedFile 文件指针
mappedFile.setWrotePosition(this.mappedFileSize);
mappedFile.setFlushedPosition(this.mappedFileSize);
mappedFile.setCommittedPosition(this.mappedFileSize);
// 加入映射文件集合
this.mappedFiles.add(mappedFile);
log.info("load " + file.getPath() + " OK");
} catch (IOException e) {
log.error("load file " + file + " error", e);
return false;
}
}
}
return true;
}
/**
* 获取最后存储消息的映射mappedFile
*
* @param startOffset mappedFile 开始写文件的offset
* @param needCreate 是否需要创建新的 mappedFile 文件
* @return
*/
//
public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {
// 创建映射文件的起始偏移量,也就是即将的mappedfile文件名称
long createOffset = -1;
// 获取最后一个映射文件,如果为null或者写满则会执行创建逻辑
MappedFile mappedFileLast = getLastMappedFile();
// mappedFileLast == null,表示需要创建新的 mappedFile 文件,创建新文件的offset值;
if (mappedFileLast == null) {
// 计算将要创建的映射文件的起始偏移量
// 如果startOffset<=mappedFileSize则起始偏移量为0
// 如果startOffset>mappedFileSize则起始偏移量为是mappedFileSize的倍数
createOffset = startOffset - (startOffset % this.mappedFileSize);
}
// 映射文件满了,创建新的映射文件
if (mappedFileLast != null && mappedFileLast.isFull()) {
// 创建的映射文件的偏移量等于最后一个映射文件的起始偏移量 + 映射文件的大小(commitlog文件大小)
createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;
}
// 创建新的映射文件
if (createOffset != -1 && needCreate) {
// 构造commitlog 文件名称
String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);
// nextNextFilePath,预先创建下一个 mappedFile 文件,通过 allocateMappedFileService 服务,一起创建两个文件,预先创建下一个文件
String nextNextFilePath = this.storePath + File.separator
+ UtilAll.offset2FileName(createOffset + this.mappedFileSize);
MappedFile mappedFile = null;
// 优先通过allocateMappedFileService中方式构建映射文件,预分配方式,性能高
// 如果上述方式失败则通过new创建映射文件
if (this.allocateMappedFileService != null) {
mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
nextNextFilePath, this.mappedFileSize);
} else {
try {
mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
} catch (IOException e) {
log.error("create mappedFile exception", e);
}
}
if (mappedFile != null) {
// 是否是 MappedFileQueue 队列中第一个文件
if (this.mappedFiles.isEmpty()) {
mappedFile.setFirstCreateInQueue(true);
}
this.mappedFiles.add(mappedFile);
}
return mappedFile;
}
return mappedFileLast;
}
}
MappedFile
CommitLog、MappedFileQueue、MappedFile的关系如图:
MappedFile是RocketMQ消息存储的终极Boss,重中之重。涉及MapedFile的预创建和映射、历史数据MappedFile的磁盘文件预热。MappedByteBuffer是通过NIO方式创建的内存映射对象。ByteBuffer writeBuffer是直接内存从TransientStorePool中借来的,他们两个是在内存中用来存放消息的,其中区别下面详细介绍。这里先从CommitLog文件存放消息说起。
public class MappedFile extends ReferenceResource {
// 当前JVM实例中 MappedFile 虚拟内存
private static final AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY = new AtomicLong(0);
// 当前JVM实例中MappedFile对象个数
private static final AtomicInteger TOTAL_MAPPED_FILES = new AtomicInteger(0);
// 即将写入消息的mappedFile 的位置
// 当前 MappedFile 文件的写指针,从 0 开始(内存映射文件的写指针)
protected final AtomicInteger wrotePosition = new AtomicInteger(0);
// 当前文件的提交到 MappedBuffer的指针,如果开启 transientStorePoolEnable,则数据会存储在 TransientStorePool 中,然后提交到内存映射 ByteBuffer 中,再刷写到磁盘
protected final AtomicInteger committedPosition = new AtomicInteger(0);
// 刷写到磁盘指针,该指针之前的数据持久化到磁盘中
private final AtomicInteger flushedPosition = new AtomicInteger(0);
// 文件大小
protected int fileSize;
// 文件通道
protected FileChannel fileChannel;
/**
* Message will put to here first, and then reput to FileChannel if writeBuffer is not null.
*/
// 堆外内存 ByteBuffer,如果不为空,数据首先将存储在该 Buffer 中,然后提交到 MappedFile 对应的内存映射文件 Buffer。
// transientStorePoolEnable 为true时不为空。
protected ByteBuffer writeBuffer = null;
// 堆内存池,transientStorePoolEnable 为true 时启用
protected TransientStorePool transientStorePool = null;
// 文件名称
private String fileName;
// mappedFile 文件的开始偏移量地址
private long fileFromOffset;
// 物理文件
private File file;
// NIO 物理文件对应的内存映射Buffer
private MappedByteBuffer mappedByteBuffer;
// 文件最后一次内容写入的时间
private volatile long storeTimestamp = 0;
// 是否是 MappedFileQueue 队列中第一个文件
private boolean firstCreateInQueue = false;
}
CommitLog#putMessage方法是来存放消息的,存放消息到系统内存映射中,并没有落入磁盘中,等待同步刷盘、或者异步刷盘,然后是消息存储的高可用。
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
// Set the storage time
// 消息存储时间
msg.setStoreTimestamp(System.currentTimeMillis());
// Set the message body BODY CRC (consider the most appropriate setting
// on the client)
// 循环冗余校验码,检测数据在网络中传输是否发生变化
msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
// Back to Results
AppendMessageResult result = null;
// 存储服务统计功能服务
StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
// topic
String topic = msg.getTopic();
int queueId = msg.getQueueId();
// 事务回滚消息标志
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
// 如果是非事务消息,或者事务消息的 commit 操作;进而判断是不是延迟消息,存储到特殊的延迟消息队列;然后事务消息存储也进行了同样的消息 topic 的转换,从而实现了消息的事务;事务消息非提交阶段,先进行另一个 topic 的储存,如果事务提交了,才进行,存储到消息的真正的topic 中去。
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
// Delay Delivery
// 如果是延迟级别消息
if (msg.getDelayTimeLevel() > 0) {
// 设置消息延迟级别
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
// 延迟消息topic
topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
// 延迟消息消息队列Id
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
// Backup real topic, queueId
// 将真实的 topic 放入 message 属性中
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
// 替换为延迟消息topic
msg.setTopic(topic);
msg.setQueueId(queueId);
}
}
// 消息诞生地址 ipv6 设置
InetSocketAddress bornSocketAddress = (InetSocketAddress) msg.getBornHost();
if (bornSocketAddress.getAddress() instanceof Inet6Address) {
msg.setBornHostV6Flag();
}
// 消息存储地址 ipv6 设置
InetSocketAddress storeSocketAddress = (InetSocketAddress) msg.getStoreHost();
if (storeSocketAddress.getAddress() instanceof Inet6Address) {
msg.setStoreHostAddressV6Flag();
}
long elapsedTimeInLock = 0;
MappedFile unlockMappedFile = null;
// 最后一个 消息 存储 commitlog 消息映射文件
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
// 自旋锁 或 可重入锁,上锁;消息写入 commitlog 的映射文件是串行的
putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
try {
// 开始上锁时间
long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
this.beginTimeInLock = beginLockTimestamp;
// Here settings are stored timestamp, in order to ensure an orderly
// global
// 消息存储时间,确定消息全局有序
msg.setStoreTimestamp(beginLockTimestamp);
//mappedFile==null标识CommitLog文件还未创建,第一次存消息则创建CommitLog文件
//mappedFile.isFull()表示mappedFile文件已满,需要重新创建CommitLog文件
if (null == mappedFile || mappedFile.isFull()) {
mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise;新文件,造成脏数据
}
// mappedFile==null说明创建CommitLog文件失败抛出异常,创建失败可能是磁盘空间不足或者权限不够
if (null == mappedFile) {
log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
}
// 追加消息到 mappedFile 文件中
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
switch (result.getStatus()) {
case PUT_OK:
break;
// 针对一条消息足够长,然后 mappedFile 文件不够存储,需要创建新的 mappedFile 进行消息存放。
case END_OF_FILE:
// 上一个 mappedFile 暂存文件,需要解锁这个 mappedFile
unlockMappedFile = mappedFile;
// broker 重新开辟,新的 commitlog 文件
// Create a new file, re-write the message
mappedFile = this.mappedFileQueue.getLastMappedFile(0);
if (null == mappedFile) {
// XXX: warn and notify me
log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
}
// 开辟成功,再将消息写入
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
break;
case MESSAGE_SIZE_EXCEEDED:
case PROPERTIES_SIZE_EXCEEDED:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
case UNKNOWN_ERROR:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
default:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
}
// 存储消息花费时间
elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
beginTimeInLock = 0;
} finally {
// 最后释放存储消息的锁
putMessageLock.unlock();
}
// 存储消息花费时间 > 500
if (elapsedTimeInLock > 500) {
log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, result);
}
// 上一个有空闲空间,但不够存储消息的 mappedFile 文件,
if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
// 解锁 mappedFile 的内存锁定
this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
}
PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
// Statistics
// topic 下存放消息次数
storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
// topic 下存放消息字节数
storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());
// handle 硬盘刷新
handleDiskFlush(result, putMessageResult, msg);
// handle 高可用
handleHA(result, putMessageResult, msg);
// 返回存储消息的结果
return putMessageResult;
}
PutMessage重要步骤
- 获取上次最后一个写入消息的存储文件MappedFile,MappedFile文件的获取在后面会详细接受。
// 最后一个 消息 存储 commitlog 消息映射文件
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
- 向MappedFile文件追加消息,如果返回END_OF_FILE代表这个整备追加消息的MappedFile文件写满了,不够存储本条消息,然后再去获取这最后下一个MappedFile文件。
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
// 针对一条消息足够长,然后 mappedFile 文件不够存储,需要创建新的 mappedFile 进行消息存放。
case END_OF_FILE:
// 上一个 mappedFile 暂存文件,需要解锁这个 mappedFile
unlockMappedFile = mappedFile;
// broker 重新开辟,新的 commitlog 文件
// Create a new file, re-write the message
mappedFile = this.mappedFileQueue.getLastMappedFile(0);
if (null == mappedFile) {
// XXX: warn and notify me
log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
}
// 开辟成功,再将消息写入
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
- 写MappedFile文件是会被mlock内存锁定,防止被交换到Swap分区中,写满的MappedFile文件进行锁定解除。
// 上一个有空闲空间,但不够存储消息的 mappedFile 文件,
if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
// 解锁 mappedFile 的内存锁定
this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
}
- 写入内存的消息进行刷盘,然后是HA消息存储的高可用,Broker存储消息的复制,这两部分内容也很重要,下次在介绍,在本章不是重点内容。
// handle 硬盘刷新
handleDiskFlush(result, putMessageResult, msg);
// handle 高可用
handleHA(result, putMessageResult, msg);
最后一个MappedFile的获取
这是MappedFile设计的经典,现在重点介绍。创建MappedFile对象有两种方式。
第一种:通过构造方法,new MappedFile()一个对象。然后进行MapepdFile对象MappedByteBuffer的内存映射。
mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
public MappedFile(final String fileName, final int fileSize) throws IOException {
init(fileName, fileSize);
}
/**
* MappedFile 初始化,并做好 mappedFile 和 mappedByteBuffer 的NIO 直接内存映射关系
*
* @param fileName 物理文件路径
* @param fileSize mappedFileSize 文件大小
* @throws IOException
*/
private void init(final String fileName, final int fileSize) throws IOException {
this.fileName = fileName;
this.fileSize = fileSize;
// 物理文件名称
this.file = new File(fileName);
// 文件开始位置
this.fileFromOffset = Long.parseLong(this.file.getName());
boolean ok = false;
// 确保文件路径存在,不存在,进行路径文件创建
ensureDirOK(this.file.getParent());
try {
// 通过 RandomAccessFile 创建读写文件通道,并将文件内容使用NIO 的内存映射 Buffer 将文件映射到内存中
this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
// 物理文件对应的内存映射Buffer
// 通过 NIO 文件通道和mappedFileSize 大小,创建内存映射文件 mappedByteBuffer
this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
// 当前JVM实例中 MappedFile 虚拟内存
TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);
// 当前JVM实例中MappedFile对象个数
TOTAL_MAPPED_FILES.incrementAndGet();
ok = true;
} catch (FileNotFoundException e) {
log.error("Failed to create file " + this.fileName, e);
throw e;
} catch (IOException e) {
log.error("Failed to map file " + this.fileName, e);
throw e;
} finally {
if (!ok && this.fileChannel != null) {
this.fileChannel.close();
}
}
}
第二种:预创建MappedFile,通过allocateMappedFileService服务一次创建两个MappedFile对象。
// 优先通过allocateMappedFileService中方式构建映射文件,预分配方式,性能高
// 如果上述方式失败则通过new创建映射文件
if (this.allocateMappedFileService != null) {
mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
nextNextFilePath, this.mappedFileSize);
} else {
try {
mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
} catch (IOException e) {
log.error("create mappedFile exception", e);
}
}
AllocateMappedFileService
AllocateMappedFileService是预创建MappedFile文件的服务,通过一次构造两个创建MappedFile的AllocateRequest然后放入队列requestQueue中,通过CountDownLatch线程同步协调器等待mmapOperation()方法,创建MappedFile对象,并返回。RocketMQ中预分配MappedFile的设计非常巧妙,下次获取时候直接返回就可以不用等待MappedFile创建分配所产生的时间延迟。
CountDownLatch协调两个线程之间的通信
/**
* 预先创建 MappedFile 文件,只是先创建2个创建mappedFile 文件的请求,放入队列中,具体 mappedFile 文件的创建和文件内存直接映射由 mmapOperation() 方法来实现。
* @param nextFilePath 创建 mappedFile 文件的全路径名称
* @param nextNextFilePath 创建下一个 mappedFile 文件的全路径名称
* @param fileSize 文件大小
* @return
*/
public MappedFile putRequestAndReturnMappedFile(String nextFilePath, String nextNextFilePath, int fileSize) {
// 默认提交两个请求
int canSubmitRequests = 2;
// 是否启用 transientStorePool
if (this.messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
// SLAVE 节点中 transientByteBuffer 即使没有足够的 ByteBuffer,也不支持快速失败
// 启动快速失败策略时,计算TransientStorePool中剩余的buffer数量减去requestQueue中待分配的数量后,剩余的buffer数量
if (this.messageStore.getMessageStoreConfig().isFastFailIfNoBufferInStorePool()
&& BrokerRole.SLAVE != this.messageStore.getMessageStoreConfig().getBrokerRole()) { //if broker is slave, don't fast fail even no buffer in pool
// 可用的 ByteBuffer - requestQueue,还剩余可用的 ByteBuffer 数量
canSubmitRequests = this.messageStore.getTransientStorePool().availableBufferNums() - this.requestQueue.size();
}
}
// 创建分配请求
AllocateRequest nextReq = new AllocateRequest(nextFilePath, fileSize);
// 判断requestTable中是否存在该路径的分配请求,如果存在则说明该请求已经在排队中
boolean nextPutOK = this.requestTable.putIfAbsent(nextFilePath, nextReq) == null;
//该路径没有在排队
if (nextPutOK) {
// byteBuffer 数量不够,则快速失败
if (canSubmitRequests <= 0) {
log.warn("[NOTIFYME]TransientStorePool is not enough, so create mapped file error, " +
"RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().availableBufferNums());
this.requestTable.remove(nextFilePath);
return null;
}
// 数量充足的话,将指定的元素插入到此优先级队列中
boolean offerOK = this.requestQueue.offer(nextReq);
if (!offerOK) {
log.warn("never expected here, add a request to preallocate queue failed");
}
// 请求数量 -1
canSubmitRequests--;
}
// 下下个请求的处理
AllocateRequest nextNextReq = new AllocateRequest(nextNextFilePath, fileSize);
boolean nextNextPutOK = this.requestTable.putIfAbsent(nextNextFilePath, nextNextReq) == null;
if (nextNextPutOK) {
if (canSubmitRequests <= 0) {
log.warn("[NOTIFYME]TransientStorePool is not enough, so skip preallocate mapped file, " +
"RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().availableBufferNums());
this.requestTable.remove(nextNextFilePath);
} else {
boolean offerOK = this.requestQueue.offer(nextNextReq);
if (!offerOK) {
log.warn("never expected here, add a request to preallocate queue failed");
}
}
}
// 报错,日志
if (hasException) {
log.warn(this.getServiceName() + " service has exception. so return null");
return null;
}
// 下一个分配请求,获取当前请求,然后通过线程协调器CountDownLatch,协调另一个线程进行完mmpOperation操作后,返回创建好的MappedFile文件
AllocateRequest result = this.requestTable.get(nextFilePath);
try {
if (result != null) {
// 默认等待5s,等待 mmapOperation 操作创建 mappedFile
// 调用此方法的线程会被阻塞,直到 CountDownLatch 的 count 为 0;等到 mmapOperation() finally countDownLatch 为 0
boolean waitOK = result.getCountDownLatch().await(waitTimeOut, TimeUnit.MILLISECONDS);
if (!waitOK) {
log.warn("create mmap timeout " + result.getFilePath() + " " + result.getFileSize());
return null;
} else {
// 成功从 requestTable 中移除请求,并返回 mappedFile 文件
this.requestTable.remove(nextFilePath);
return result.getMappedFile();
}
} else {
log.error("find preallocate mmap failed, this never happen");
}
} catch (InterruptedException e) {
log.warn(this.getServiceName() + " service has exception. ", e);
}
return null;
}
具体创建MappedFile对象
AllocateMappedFileService服务开启了一个线程,不停地从创建MappedFile对象的请求队列requestQueue中获取AllocateRequest,并实时创建MappedFile对象,并通过CountDownLatch通知putRequestAndReturnMappedFile() 方法已经创建了MappedFile对象,从而获取返回。
/**
* 开始 mappedFile 文件分配服务,从 requestQueue 中获取创建 mappedFile 的文件请求
*/
public void run() {
log.info(this.getServiceName() + " service started");
// 除非停止,否则一直在进行 mmap 映射操作
while (!this.isStopped() && this.mmapOperation()) {
}
log.info(this.getServiceName() + " service end");
}
MmapOperation具体创建MappedFile对象
在这里创建MappedFile对象,也有两种情况,区别在于是否启用TransientStorePool消息暂存池,它里面有默认5个1G的直接内存,可以通过直接内存赋值给MappedFile的writerBuffer对象,省去了开辟内存的时间;还有一种是通过MappedFile的NIO创建的MappedByteBuffer直接内存映射来存储消息,需要进行文件的map映射操作,开辟内存空间。这两种方式的对比会在下面介绍。
第一种:不启用transientStorePool对象,通过构造方法创建。
第二种:通过ServerLoader.load的方式创建,如果失败了,再去尝试构造方法的方式。
if (messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
try {
// 为每一个 mappedFile 文件,进行init中的mmap 映射操作
mappedFile = ServiceLoader.load(MappedFile.class).iterator().next();
mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
} catch (RuntimeException e) {
log.warn("Use default implementation.");
// spi 加载失败,使用构造方法创建 mappedFile
mappedFile = new MappedFile(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
}
} else {
// 不开启 transientStorePool,直接内存映射
mappedFile = new MappedFile(req.getFilePath(), req.getFileSize());
}
MappedFile文件预热
通过 mmap 建立内存映射仅是将文件磁盘地址和虚拟地址通过映射对应起来,此时物理内存并没有填充磁盘文件内容。
当实际发生文件读写时,产生缺页中断并陷入内核,然后才会将磁盘文件内容读取至物理内存。针对上述场景,RocketMQ 设计了 MappedFile 预热机制。
当 RocketMQ 开启 MappedFile 内存预热(warmMapedFileEnable),且 MappedFile 文件映射空间大小大于等于 mapedFileSizeCommitLog(1 GB) 时,调用 warmMappedFile 方法对 MappedFile 进行预热。
if (mappedFile.getFileSize() >= this.messageStore.getMessageStoreConfig()
.getMappedFileSizeCommitLog()
&&
this.messageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
// 对 mappedFile 进行预热
mappedFile.warmMappedFile(this.messageStore.getMessageStoreConfig().getFlushDiskType(),
this.messageStore.getMessageStoreConfig().getFlushLeastPagesWhenWarmMapedFile());
}
MappedFile创建后,需要对 MappedFile 文件进行预热,将内存和磁盘映射起来,然后每页写入占位数据0,然后将这些0数据,刷新到磁盘,进行磁盘预热。
当调用Mmap进行内存映射后,OS只是建立了虚拟内存地址至物理地址的映射表,而实际并没有加载任何文件至内存中。
程序要访问数据时,OS会检查该部分的分页是否已经在内存中,如果不在,则发出一次 缺页中断。X86的Linux中一个标准页面大小是4KB,
那么1G的CommitLog需要发生 1024KB/4KB=256次 缺页中断,才能使得对应的数据完全加载至物理内存中。
为什么每个页都需要写入数据呢?
RocketMQ在创建并分配MappedFile的过程中预先写入了一些随机值到Mmap映射出的内存空间里。原因在于:
仅分配内存并进行mlock系统调用后并不会为程序完全锁定这些分配的内存,原因在于其中的分页可能是写时复制的。因此,就有必要对每个内存页面中写入一个假的值。
锁定的内存可能是写时复制的,这个时候,这个内存空间可能会改变。这个时候,写入假的临时值,这样就可以针对每一个内存分页的写入操做会强制 Linux 为当前进程分配一个独立、私有的内存页。
写时复制
写时复制:子进程依赖使用父进程开创的物理空间。
内核只为新生成的子进程创建虚拟空间结构,它们来复制于父进程的虚拟究竟结构,但是不为这些段分配物理内存,它们共享父进程的物理空间,当父子进程中有更改相应段的行为发生时,再为子进程相应的段分配物理空间。
https:www.cnblogs.com/biyeymyhjob/archive/2012/07/20/2601655.html
为了避免OS检查分页是否在内存中的过程出现大量缺页中断,RocketMQ在做Mmap内存映射的同时进行了madvise系统调用,
目的是使OS做一次内存映射后,使对应的文件数据尽可能多的预加载至内存中,降低缺页中断次数,从而达到内存预热的效果。
RocketMQ通过map+madvise映射后预热机制,将磁盘中的数据尽可能多的加载到PageCache中,保证后续对ConsumeQueue和CommitLog的读取过程中,能够尽可能从内存中读取数据,提升读写性能。
public void warmMappedFile(FlushDiskType type, int pages) {
long beginTime = System.currentTimeMillis();
// 创建一个新的字节缓冲区,其内容是此缓冲区内容的共享子序列
ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
int flush = 0;
long time = System.currentTimeMillis();
// warmMappedFile 每间隔 OS_PAGE_SIZE 向 mappedByteBuffer 写入一个 0,此时对应页恰好产生一个缺页中断,操作系统为对应页分配物理内存
for (int i = 0, j = 0; i < this.fileSize; i += MappedFile.OS_PAGE_SIZE, j++) {
byteBuffer.put(i, (byte) 0);
// 刷盘方式是同步策略时,进行刷盘操作
// 每修改 pages 个分页刷一次盘,相当于 4096 * 4k = 16M,每 16 M刷一次盘,1G 文件 1024M/16M = 64 次
// force flush when flush disk type is sync
// 如果刷盘策略为同步刷盘,需要对每个页进行刷盘
if (type == FlushDiskType.SYNC_FLUSH) {
if ((i / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE) >= pages) {
flush = i;
mappedByteBuffer.force();
}
}
// prevent gc
// 防止垃圾回收
if (j % 1000 == 0) {
log.info("j={}, costTime={}", j, System.currentTimeMillis() - time);
time = System.currentTimeMillis();
try {
Thread.sleep(0);
} catch (InterruptedException e) {
log.error("Interrupted", e);
}
}
}
// force flush when prepare load finished
// 前面对每个页,写入了数据(0 占位用,防止被内存交互),进行了刷盘,然后这个操作是对所有的内存进行刷盘。
if (type == FlushDiskType.SYNC_FLUSH) {
log.info("mapped file warm-up done, force to disk, mappedFile={}, costTime={}",
this.getFileName(), System.currentTimeMillis() - beginTime);
// 刷盘,强制将此缓冲区内容的任何更改写入包含映射文件的存储设备
mappedByteBuffer.force();
}
log.info("mapped file warm-up done. mappedFile={}, costTime={}", this.getFileName(),
System.currentTimeMillis() - beginTime);
//通过 JNA 调用 mlock 方法锁定 mappedByteBuffer 对应的物理内存,阻止操作系统将相关的内存页调度到交换空间(swap space),以此提升后续在访问 MappedFile 时的读写性能。
this.mlock();
}
通过 JNA 调用 mlock 方法锁定 mappedByteBuffer 对应的物理内存,阻止操作系统将相关的内存页调度到交换空间(swap space),以此提升后续在访问 MappedFile 时的读写性能。
public void mlock() {
final long beginTime = System.currentTimeMillis();
final long address = ((DirectBuffer) (this.mappedByteBuffer)).address();
Pointer pointer = new Pointer(address);
{
int ret = LibC.INSTANCE.mlock(pointer, new NativeLong(this.fileSize));
log.info("mlock {} {} {} ret = {} time consuming = {}", address, this.fileName, this.fileSize, ret, System.currentTimeMillis() - beginTime);
}
// RocketMQ的做法是,在做Mmap内存映射的同时进行madvise系统调用,目的是使OS做一次内存映射后对应的文件数据尽可能多的预加载至内存中,从而达到内存预热的效果。
{
int ret = LibC.INSTANCE.madvise(pointer, new NativeLong(this.fileSize), LibC.MADV_WILLNEED);
log.info("madvise {} {} {} ret = {} time consuming = {}", address, this.fileName, this.fileSize, ret, System.currentTimeMillis() - beginTime);
}
}
MappedByteBuffer VS WriterBuffer
MappedByteBuffer和WriterBuffer都是MappedFile对象的成员属性,都是用来存放消息的,只有开启了TransientStorePool,才会向writerBuffer直接内存写入消息,然后commit消息到FileChannle中,然后再flush到磁盘。否则就是存储在NIO创建的MappedByteBuffer直接内存中,然后刷新到磁盘。
刷盘是否开启TransientStorePool的区别
不开启TransientStorePool:
MappedByteBuffer是直接内存,它暂时存储了message消息,MappedFile.mapp()方法做好MappedByteBuffer对象直接内存和落盘文件的映射关系,然后flush()方法执行MappedByteBuffer.force():强制将ByteBuffer中的任何内容的改变写入到磁盘文件,读写都经过Page Cache。
开启TransientStorePool:
TransientStorePool会通过ByteBuffer.allocateDirect调用直接申请对外内存writerBuffer,消息数据在写入内存的时候是写入预申请的内存中。MappedFile的writerBuffer为直接开辟的内存,然后MappedFile的初始化操作,做好FileChannel和磁盘文件的映射,commit()方法实质是执行fileChannel.write(writerBuffer),将writerBuffer的数据写入到FileChannel映射的磁盘文件,flush操作执行FileChannel.force():将映射文件中的数据强制刷新到磁盘。写入的时候不经过PageCache,因此在消息写入操作上会更快,因此能更少的占用CommitLog.putMessageLock锁,从而能够提升消息处理量。使用TransientStorePool方案的缺陷主要在于在异常崩溃的情况下回丢失更多的消息。
TransientStorePool的作用
TransientStorePool 相当于在内存层面做了读写分离,写走内存磁盘,读走pagecache,同时最大程度消除了page cache的锁竞争,降低了毛刺。它还使用了锁机制,避免直接内存被交换到swap分区。
日常FileChannel的写操作会经过Page Cache,但是TransientStorePool开辟了直接内存WriterBuffer,WriterBuffer只负责写入,也是通过FileChannel写入磁盘,读操作由单独的MappedByteBuffer负责,这样实现了读写分离。
参考:https://github.com/apache/rocketmq/issues/2466
FileChannel.force VS MappedByteBuffer.force区别
This method is only guaranteed to force changes that were made to this channel's file via the methods defined in this class. It may or may not force changes that were made by modifying the content of a{@link MappedByteBuffer <i>mapped byte buffer</i>} obtained by invoking the {@link #map map} method. Invoking the {@link MappedByteBuffer#force force} method of the mapped byte buffer will force changes made to the buffer's content to be written.
FileChannel和MappedByteBuffer都是NIO模块的类,ByteBuffer直接内存映射到磁盘文件通过FileChannel。
FileChannel.force()只会将FileChannel类中方法使FileChannel发生改变的内容强制刷新到存储设备文件中。
MappedByteBuffer.force()会将Map类中方法使ByteBuffer发生改变的内容强制刷新到存储设备文件中。
Mmap的写入操作是:Mmap的MappedByteBuffer映射直接内存,直接内存映射文件,然后文件会对应Page Cache,也就是 MmapedByteBuffer的直接内存可能是Page Cache的东西,然后通过写Page Cache,然后再写入磁盘。
FileChannle:是写直接内存,这个效率比较高,然后直接内存满了,在落盘的时候,再去经过Page Cache,落入磁盘。WriterBuffer的写入方式实际也就是FileChannel的写入方式,Mmap在写入4k一下的文件比较快,然后FileChannel写入文件大于4k时,比Mmap方式的要快,可能是因为PageCache 是4k,然后写着就可能去落盘了。而FileChannel 是写满了直接内存,才去经过PageCache,这样写入直接内存的效率更高,然后再经过Page Cache,当大于4k的时候,大于Page Cache的内存的时候,就是FileChannel快了。大概因为FileChannel是基于Block(块)的。
Mmap VS FileChannle参考https://juejin.cn/post/6844903842472001550