说明
本节讲解批量消息(MessageExtBatch)的编码器,以及上一节没讲完了批量消息的doAppend方法
MessageExtBatchEncoder
属性
// Store the message content
private final ByteBuffer msgBatchMemory;//存储批量消息内容的buffer
// The maximum length of the message
private final int maxMessageSize;//最大阈值,批量消息长度不能超过该值
private final ByteBuffer hostHolder = ByteBuffer.allocate(8);//存储host的byteBuffer
函数
构造函数
//size默认4M
MessageExtBatchEncoder(final int size) {
this.msgBatchMemory = ByteBuffer.allocateDirect(size);
this.maxMessageSize = size;
}
resetByteBuffer
byteBuffer回到初始位置,再设置limit,使得可以写
private void resetByteBuffer(final ByteBuffer byteBuffer, final int limit) {
byteBuffer.flip();
byteBuffer.limit(limit);
}
encode
核心方法,把messageExtBatch放入msgBatchMemory
public ByteBuffer encode(final MessageExtBatch messageExtBatch) {
msgBatchMemory.clear(); //not thread-safe
int totalMsgLen = 0;
ByteBuffer messagesByteBuff = messageExtBatch.wrap();//MessageExtBatch包起来,得到ByteBuffer(包含所有head和body等信息)
while (messagesByteBuff.hasRemaining()) {//依次便利其中的每一条msg
// 1 TOTALSIZE
messagesByteBuff.getInt();
// 2 MAGICCODE
messagesByteBuff.getInt();
// 3 BODYCRC
messagesByteBuff.getInt();
// 4 FLAG
int flag = messagesByteBuff.getInt();
// 5 BODY
int bodyLen = messagesByteBuff.getInt();
int bodyPos = messagesByteBuff.position();
int bodyCrc = UtilAll.crc32(messagesByteBuff.array(), bodyPos, bodyLen);
messagesByteBuff.position(bodyPos + bodyLen);
// 6 properties
short propertiesLen = messagesByteBuff.getShort();
int propertiesPos = messagesByteBuff.position();
messagesByteBuff.position(propertiesPos + propertiesLen);
final byte[] topicData = messageExtBatch.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
final int topicLength = topicData.length;
final int msgLen = calMsgLength(bodyLen, topicLength, propertiesLen);//计算消息程度
// Exceeds the maximum message,超过最大阈值
if (msgLen > this.maxMessageSize) {
CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLen
+ ", maxMessageSize: " + this.maxMessageSize);
throw new RuntimeException("message size exceeded");
}
totalMsgLen += msgLen;
// Determines whether there is sufficient free space
if (totalMsgLen > maxMessageSize) {
throw new RuntimeException("message size exceeded");
}
// 1 TOTALSIZE
this.msgBatchMemory.putInt(msgLen);
// 2 MAGICCODE
this.msgBatchMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE);
// 3 BODYCRC
this.msgBatchMemory.putInt(bodyCrc);
// 4 QUEUEID
this.msgBatchMemory.putInt(messageExtBatch.getQueueId());
// 5 FLAG
this.msgBatchMemory.putInt(flag);
// 6 QUEUEOFFSET
this.msgBatchMemory.putLong(0);//这里还不知道queueOffset是多少,先存入0,后面doAppend时候会写
// 7 PHYSICALOFFSET
this.msgBatchMemory.putLong(0);//这里还不知道PHYSICALOFFSET是多少,先存入0,后面doAppend时候会写
// 8 SYSFLAG
this.msgBatchMemory.putInt(messageExtBatch.getSysFlag());
// 9 BORNTIMESTAMP
this.msgBatchMemory.putLong(messageExtBatch.getBornTimestamp());
// 10 BORNHOST
this.resetByteBuffer(hostHolder, 8);
this.msgBatchMemory.put(messageExtBatch.getBornHostBytes(hostHolder));
// 11 STORETIMESTAMP
this.msgBatchMemory.putLong(messageExtBatch.getStoreTimestamp());
// 12 STOREHOSTADDRESS
this.resetByteBuffer(hostHolder, 8);
this.msgBatchMemory.put(messageExtBatch.getStoreHostBytes(hostHolder));
// 13 RECONSUMETIMES
this.msgBatchMemory.putInt(messageExtBatch.getReconsumeTimes());
// 14 Prepared Transaction Offset, batch does not support transaction
this.msgBatchMemory.putLong(0);
// 15 BODY
this.msgBatchMemory.putInt(bodyLen);
if (bodyLen > 0)
this.msgBatchMemory.put(messagesByteBuff.array(), bodyPos, bodyLen);
// 16 TOPIC
this.msgBatchMemory.put((byte) topicLength);
this.msgBatchMemory.put(topicData);
// 17 PROPERTIES
this.msgBatchMemory.putShort(propertiesLen);
if (propertiesLen > 0)
this.msgBatchMemory.put(messagesByteBuff.array(), propertiesPos, propertiesLen);
}
msgBatchMemory.flip();//切换到读模式
return msgBatchMemory;
}
注意这里并没有合理设置QUEUEOFFSET和PHYSICALOFFSET,临时先放的0
批量消息的doAppend
//写批量消息MessageExtBatch到commitLog返回结果
public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
final MessageExtBatch messageExtBatch) {
byteBuffer.mark();
//physical offset
long wroteOffset = fileFromOffset + byteBuffer.position();//物理起始位置
// Record ConsumeQueue information
keyBuilder.setLength(0);
keyBuilder.append(messageExtBatch.getTopic());
keyBuilder.append('-');
keyBuilder.append(messageExtBatch.getQueueId());
String key = keyBuilder.toString();
Long queueOffset = CommitLog.this.topicQueueTable.get(key);
if (null == queueOffset) {
queueOffset = 0L;
CommitLog.this.topicQueueTable.put(key, queueOffset);
}
long beginQueueOffset = queueOffset;
int totalMsgLen = 0;
int msgNum = 0;
msgIdBuilder.setLength(0);
final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
ByteBuffer messagesByteBuff = messageExtBatch.getEncodedBuff();
this.resetByteBuffer(hostHolder, 8);//hostHolder准备写入数据
ByteBuffer storeHostBytes = messageExtBatch.getStoreHostBytes(hostHolder);
messagesByteBuff.mark();
while (messagesByteBuff.hasRemaining()) {
// 1 TOTALSIZE
final int msgPos = messagesByteBuff.position();
final int msgLen = messagesByteBuff.getInt();//每个消息存储的开头都是totalsize,参见calMsgLength
final int bodyLen = msgLen - 40; //only for log, just estimate it
// Exceeds the maximum message
if (msgLen > this.maxMessageSize) {
CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLen
+ ", maxMessageSize: " + this.maxMessageSize);
return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED);
}
totalMsgLen += msgLen;
// Determines whether there is sufficient free space
if ((totalMsgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {//空间不够了,需要用的空间 超过了 剩余空间
this.resetByteBuffer(this.msgStoreItemMemory, 8);
// 1 TOTALSIZE
this.msgStoreItemMemory.putInt(maxBlank);
// 2 MAGICCODE
this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);
// 3 The remaining space may be any value
//ignore previous read
messagesByteBuff.reset();
// Here the length of the specially set maxBlank
byteBuffer.reset(); //ignore the previous appended messages
byteBuffer.put(this.msgStoreItemMemory.array(), 0, 8);
return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgIdBuilder.toString(), messageExtBatch.getStoreTimestamp(),
beginQueueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
}
//move to add queue offset and commitlog offset
messagesByteBuff.position(msgPos + 20);//切换到该写入queue offset 以及 commitlog offset的位置
messagesByteBuff.putLong(queueOffset);
messagesByteBuff.putLong(wroteOffset + totalMsgLen - msgLen);
storeHostBytes.rewind();//准备重新读
String msgId = MessageDecoder.createMessageId(this.msgIdMemory, storeHostBytes, wroteOffset + totalMsgLen - msgLen);
if (msgIdBuilder.length() > 0) {
msgIdBuilder.append(',').append(msgId);
} else {
msgIdBuilder.append(msgId);
}
queueOffset++;
msgNum++;
messagesByteBuff.position(msgPos + msgLen);
}
messagesByteBuff.position(0);
messagesByteBuff.limit(totalMsgLen);
byteBuffer.put(messagesByteBuff);
messageExtBatch.setEncodedBuff(null);
AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, totalMsgLen, msgIdBuilder.toString(),
messageExtBatch.getStoreTimestamp(), beginQueueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
result.setMsgNum(msgNum);
CommitLog.this.topicQueueTable.put(key, queueOffset);
return result;
}
注意,这里byteBuffer只用设置queue offset以及commitlog offset就好了,其他字段都已经在encoder设置好了
思考
批量消息的doAppend和单个消息doAppend有什么不同
批量消息,遍历时,只有设置QueueOffset以及CommitLogOffSet了,其他比如topic,sysFlag什么的,在encode中就设置好了
问题
调用方是如何设置body的
在SendMessageProcessor#sendBatchMessage
执行了
messageExtBatch.setBody(request.getBody());
这里body应该是包含了所有的header和body信息的,暂时没有深究request到底是怎么样的
MessageExtBatch和MessageExtBrokerInner区别是什么
感觉上批量消息 就是 消息的列表一样
但是doAppend上,单个消息是要判断事务的,而批量消息没有管
另外两者的函数略有不同
暂时不清楚两者的区别,为什么不直接用批量消息不直接用 List<MessageExtBrokerInner>