简单介绍
RocketMQ搭建集群无非就是搭建Master和Slave,而根据Master和Slave的数量不同又分为几种方式:
- 单Master无Slave:这种就是普通的单机模式了,Master一挂,消息服务就不可用了
- 多Master无Slave:这种如果有其中一个Master挂了,集群还能使用,但是由于没有Slave,如果挂的那台Master还有未被消费的消息,那么将会暂时无法消费直至这台Master恢复
- 多Master多Slave:这种方式最为理想,即使Master挂了,发送消息不送影响,可以发到另外的机器,消费消息也不受影响(极端情况不考虑),因为Master和Slave会同步消息和配置
注:我看的是3.5.8的版本,目前没有发现主从切换的功能
整体结构图如下:
- Producer:
- 和NameServer建立连接:获取相关配置信息,如Broker信息等
- 和Master,Slave建立连接:Producer向Master发送消息,消息只会向Master进行发送
- Master:
- 和Producer建立连接:Producer向Master发送消息,消息只会向Broker进行发送
- 和NameServer建立连接:获取相关配置信息,如Slave获取Master信息等
- 和Consumer建立连接:Consumer向Master拉取消息
- 和Slave建立连接:消息同步
- Master之间不建立连接:多个Master之间相互独立
- Slave:
- 和Master建立连接:消息同步
- 和NameServer建立连接:获取相关配置信息,如Slave获取Master信息等
- 和Consumer建立连接:Consumer向Slave拉取消息
- MS之间不建立连接:如Master
总体流程
1.Master默认会使用port+1作为HA相关的端口
2.Master会推送消息到Slave中,而Slave会定期处理这些消息并写入CommitLog中
3.Slave每5s会将处理到的offset发送回Master
4.Master收到Offset会记录下来,并打上标志位,在SYNC_MASTER的模式下通过该标志位判断是否已经将消息同步到Slave
整个流程简化如下:
源码分析
那么下面会从第3种情况进行源码分析,因为这种情况包含了上面的两种情况的处理
消息同步
Slave
先从Slave说起,Slave同步Broker的消息,主要是HAClient这个类,看下其核心的变量
// 主节点IP:PORT
private final AtomicReference<String> masterAddress = new AtomicReference<String>();
// 向Master汇报Slave最大Offset
private final ByteBuffer reportOffset = ByteBuffer.allocate(8);
// Slave向Master汇报Offset,汇报到哪里
private long currentReportedOffset = 0;
// 粘包拆包使用的
private int dispatchPosition = 0;
// 从Master接收数据Buffer
private ByteBuffer byteBufferRead = ByteBuffer.allocate(ReadMaxBufferSize);
具体每个变量有什么用需要到后面再详细介绍,先说一下这个masterAddress,这里保存着Master的信息,而Master其实也会初始化HAClient,但是他的masterAddress是空的,所以不会进行相应的操作
看下核心的run方法
public void run() {
while (!this.isStoped()) {
try {
if (this.connectMaster()) {// 如果masterAddress不为空,会进行连接并返回SocketChannel,Master没有masterAddress所以这里直接跳过
// 先汇报最大物理Offset || 定时心跳方式汇报
if (this.isTimeToReportOffset()) {// 默认5s进行同步
boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);// 将同步进度发送回Master
if (!result) {
this.closeMaster();
}
}
// 等待应答
this.selector.select(1000);
// 接收数据
boolean ok = this.processReadEvent();
if (!ok) {
this.closeMaster();
}
// 只要本地有更新,就汇报最大物理Offset
if (!reportSlaveMaxOffsetPlus()) {
continue;
}
// 检查Master的反向心跳
long interval =
HAService.this.getDefaultMessageStore().getSystemClock().now()
- this.lastWriteTimestamp;
if (interval > HAService.this.getDefaultMessageStore().getMessageStoreConfig()
.getHaHousekeepingInterval()) {
this.closeMaster();
}
}
else {
this.waitForRunning(1000 * 5);
}
}
catch (Exception e) {
this.waitForRunning(1000 * 5);
}
}
}
processReadEvent方法就是普通的nio程序处理接收数据的地方
private boolean processReadEvent() {
int readSizeZeroTimes = 0;
while (this.byteBufferRead.hasRemaining()) {
try {
int readSize = this.socketChannel.read(this.byteBufferRead);//读取Master发送的消息
if (readSize > 0) {
lastWriteTimestamp = HAService.this.defaultMessageStore.getSystemClock().now();
readSizeZeroTimes = 0;
boolean result = this.dispatchReadRequest();
if (!result) {
return false;
}
}
else if (readSize == 0) {
if (++readSizeZeroTimes >= 3) {
break;
}
}
else {
return false;
}
}
catch (IOException e) {
return false;
}
}
return true;
}
在Channel中读取消息,并放到缓冲区中,调用dispatchReadRequest对读取的数据进行处理
private boolean dispatchReadRequest() {
final int MSG_HEADER_SIZE = 8 + 4; // phyoffset + size
int readSocketPos = this.byteBufferRead.position();
while (true) {
int diff = this.byteBufferRead.position() - this.dispatchPostion;// dispatchPostion为上次处理到的位置,diff为这次读取的数据的位置与上次读取到的位置的差别大小
if (diff >= MSG_HEADER_SIZE) {//如果大于Head的大小
long masterPhyOffset = this.byteBufferRead.getLong(this.dispatchPostion);
int bodySize = this.byteBufferRead.getInt(this.dispatchPostion + 8);
long slavePhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
// 检验Master和Slave之间的Offset
if (slavePhyOffset != 0) {
if (slavePhyOffset != masterPhyOffset) {
return false;
}
}
// 粘包和拆包的处理
// 如果diff 小于MSG_HEADER_SIZE + bodySize的值,那么代表发生拆包,等待下一次读取数据
if (diff >= (MSG_HEADER_SIZE + bodySize)) {
byte[] bodyData = new byte[bodySize];
this.byteBufferRead.position(this.dispatchPostion + MSG_HEADER_SIZE);
this.byteBufferRead.get(bodyData);
// 将Master发送的消息写到CommitLog
HAService.this.defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData);
this.byteBufferRead.position(readSocketPos);
this.dispatchPostion += MSG_HEADER_SIZE + bodySize;
if (!reportSlaveMaxOffsetPlus()) {//将同步进度发送回Master
return false;
}
continue;
}
}
if (!this.byteBufferRead.hasRemaining()) {
this.reallocateByteBuffer();
}
break;
}
return true;
}
主要是两点:
- 获取Master发送的消息
- 将Master发送的消息写入CommitLog并告诉Master同步到的位置
再来看下reportSlaveMaxOffsetPlus的实现
private boolean reportSlaveMaxOffsetPlus() {
boolean result = true;
// 只要本地有更新,就汇报最大物理Offset
long currentPhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
if (currentPhyOffset > this.currentReportedOffset) {
this.currentReportedOffset = currentPhyOffset;
result = this.reportSlaveMaxOffset(this.currentReportedOffset);// 通过Channel写会Master
if (!result) {
this.closeMaster();
log.error("HAClient, reportSlaveMaxOffset error, " + this.currentReportedOffset);
}
}
return result;
}
逻辑很简单,获取CommitLog的最大位移,如果比上一次的发送的同步的位置大,那么就发送回Master并更新currentReportedOffset
Master
Master同步相关的类如下:
- HAService:启动同步相关服务的入口
- AcceptSocketService:接收Slave的连接并构建HAConnection实例
- GroupTransferService:BrokerRole为SYNC_MASTER的情况下,GroupTransferService会作为一个中间的服务,设置一个标志位,用来判断Slave是否已经同步完成
- HAConnection:每个Slave会对应一个HAConnection实例,用来与Slave交互
- WriteSocketService:向Slave推送消息
- ReadSocketService:读取Slave发回的同步进度
AcceptSocketService
AcceptSocketService中使用原生nio实现,nio相关的会省略,只会讲一下核心的流程。run方法是核心所在,主要是用来接收Slave的连接然后构建HAConnection对象
public void run() {
while (!this.isStoped()) {
try {
this.selector.select(1000);
Set<SelectionKey> selected = this.selector.selectedKeys();
if (selected != null) {
for (SelectionKey k : selected) {
if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();
if (sc != null) {
try {
// 每有一个Slave连接进来,都会构建一个HAConnection对象,并持有对应的Channel
HAConnection conn = new HAConnection(HAService.this, sc);
conn.start();// 分别启动WriteSocketService和ReadSocketService服务
HAService.this.addConnection(conn);// 将连接保存到connectionList中
}
catch (Exception e) {
//....
}
}
}
else {//....
}
}
selected.clear();
}
}
catch (Exception e) {//....
}
}
}
ReadSocketService
ReadSocketService主要是负责处理Slave上传的进度及其他相关操作,核心都是run方法
public void run() {
while (!this.isStoped()) {
try {
this.selector.select(1000);
boolean ok = this.processReadEvent();
if (!ok) {
HAConnection.log.error("processReadEvent error");
break;
}
// 检测心跳间隔时间,超过则强制断开
long interval =
HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now()
- this.lastReadTimestamp;
if (interval > HAConnection.this.haService.getDefaultMessageStore()
.getMessageStoreConfig().getHaHousekeepingInterval()) {
log.warn("ha housekeeping, found this connection[" + HAConnection.this.clientAddr
+ "] expired, " + interval);
break;
}
}
catch (Exception e) {
break;
}
}
// ....
}
处理读取数据的地方是processReadEvent方法
private boolean processReadEvent() {
// ....省略其他代码
while (this.byteBufferRead.hasRemaining()) {
try {
int readSize = this.socketChannel.read(this.byteBufferRead);
if (readSize > 0) {
readSizeZeroTimes = 0;
this.lastReadTimestamp =
HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
// 接收Slave上传的offset
if ((this.byteBufferRead.position() - this.processPostion) >= 8) {
int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);
long readOffset = this.byteBufferRead.getLong(pos - 8);
this.processPostion = pos;
// 更新slaveAckOffset和slaveRequestOffset
HAConnection.this.slaveAckOffset = readOffset;
if (HAConnection.this.slaveRequestOffset < 0) {
HAConnection.this.slaveRequestOffset = readOffset;
}
// 通知前端线程
HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset);
}
}
else if (readSize == 0) {
// ....省略其他代码
}
else {
return false;
}
}
catch (IOException e) {
return false;
}
}
return true;
}
其中有两块代码
1. this.byteBufferRead.position() - this.processPostion) >= 8
2. int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);
long readOffset = this.byteBufferRead.getLong(pos - 8);
this.processPostion = pos;
这个怎么理解呢?其中8是offset的字节长度
- processPostion为上次处理的到的位置那么第一句代码就是解决拆包的问题,如果出现拆包,则传输的小于8字节,不处理,等待下一次读取
- 这部分举个例子比较好理解:
- 第一次获取到数据,this.byteBufferRead.position()-0为3,那么忽略
- 第二次获取到数据,this.byteBufferRead.position()为10,那么pos=10-2=8,readOffset=getLong(0),刚好读取第一个long数据,processPostion设置为8
- 第三次获取到数据,this.byteBufferRead.position()-8>8,this.byteBufferRead.position()为21,那么pos=21-5=16,readOffset=getLong(8),刚好读的是第二个long数据
slaveAckOffset和slaveRequestOffset两个变量的作用:
slaveAckOffset是每次Slave上传的Offset
slaveRequestOffset是第一次Slave上传的offset
notifyTransferSome方法如下
public void notifyTransferSome(final long offset) {
for (long value = this.push2SlaveMaxOffset.get(); offset > value;) {
boolean ok = this.push2SlaveMaxOffset.compareAndSet(value, offset);
if (ok) {
this.groupTransferService.notifyTransferSome();
break;
}
else {
value = this.push2SlaveMaxOffset.get();
}
}
}
更新push2SlaveMaxOffset的值为当前Slave同步到的offset,并使用GroupTransferService进行通知
WriteSocketService
先介绍一下几个字段
private final int HEADER_SIZE = 8 + 4;//
private final ByteBuffer byteBufferHeader = ByteBuffer.allocate(HEADER_SIZE);
private long nextTransferFromWhere = -1;// 记录着从commitLog哪个offset拉取消息
private SelectMapedBufferResult selectMapedBufferResult;// 拉取消息后的结果
private boolean lastWriteOver = true;//标记着是否传输完成
private long lastWriteTimestamp = System.currentTimeMillis();
从run方法看起
public void run() {
while (!this.isStoped()) {
try {
this.selector.select(1000);
if (-1 == HAConnection.this.slaveRequestOffset) {
Thread.sleep(10);
continue;
}
// 计算nextTransferFromWhere
if (-1 == this.nextTransferFromWhere) {
if (0 == HAConnection.this.slaveRequestOffset) {
long masterOffset = HAConnection.this.haService.getDefaultMessageStore().getCommitLog().getMaxOffset();
masterOffset =
masterOffset
- (masterOffset % HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
.getMapedFileSizeCommitLog());
if (masterOffset < 0) {
masterOffset = 0;
}
this.nextTransferFromWhere = masterOffset;
} else {
this.nextTransferFromWhere = HAConnection.this.slaveRequestOffset;
}
}
if (this.lastWriteOver) {
long interval =
HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;
if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
.getHaSendHeartbeatInterval()) {
// Build Header
this.byteBufferHeader.position(0);
this.byteBufferHeader.limit(HEADER_SIZE);
this.byteBufferHeader.putLong(this.nextTransferFromWhere);
this.byteBufferHeader.putInt(0);
this.byteBufferHeader.flip();
this.lastWriteOver = this.transferData();
if (!this.lastWriteOver)
continue;
}
} else {
this.lastWriteOver = this.transferData();
if (!this.lastWriteOver)
continue;
}
// 传入一个offset,从CommitLog去拉取消息,和消费者拉取消息类似
SelectMapedBufferResult selectResult =
HAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);
if (selectResult != null) {
int size = selectResult.getSize();
//每次只同步32K的数据
if (size > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) {
size = HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize();
}
long thisOffset = this.nextTransferFromWhere;
this.nextTransferFromWhere += size;
selectResult.getByteBuffer().limit(size);//限制byteBuf最多只能操作32K的数据
this.selectMapedBufferResult = selectResult;
// 构造header,内容处理可以看下Slave的HAClient
this.byteBufferHeader.position(0);
this.byteBufferHeader.limit(HEADER_SIZE);
this.byteBufferHeader.putLong(thisOffset);
this.byteBufferHeader.putInt(size);
this.byteBufferHeader.flip();
this.lastWriteOver = this.transferData();//传输数据
} else {
HAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100);
}
} catch (Exception e) {
break;
}
}
// ....
}
传输数据看下transferData
private boolean transferData() throws Exception {
int writeSizeZeroTimes = 0;
// 写header
while (this.byteBufferHeader.hasRemaining()) {
int writeSize = this.socketChannel.write(this.byteBufferHeader);
if (writeSize > 0) {
writeSizeZeroTimes = 0;
this.lastWriteTimestamp =
HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
}
else if (writeSize == 0) {
if (++writeSizeZeroTimes >= 3) {
break;
}
}
else {
throw new Exception("ha master write header error < 0");
}
}
if (null == this.selectMapedBufferResult) {
return !this.byteBufferHeader.hasRemaining();
}
writeSizeZeroTimes = 0;
// 写消息体
if (!this.byteBufferHeader.hasRemaining()) {
while (this.selectMapedBufferResult.getByteBuffer().hasRemaining()) {
int writeSize = this.socketChannel.write(this.selectMapedBufferResult.getByteBuffer());
if (writeSize > 0) {
writeSizeZeroTimes = 0;
this.lastWriteTimestamp =
HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
}
else if (writeSize == 0) {
if (++writeSizeZeroTimes >= 3) {
break;
}
}
else {
throw new Exception("ha master write body error < 0");
}
}
}
boolean result =
!this.byteBufferHeader.hasRemaining()
&& !this.selectMapedBufferResult.getByteBuffer().hasRemaining();
if (!this.selectMapedBufferResult.getByteBuffer().hasRemaining()) {
this.selectMapedBufferResult.release();
this.selectMapedBufferResult = null;
}
return result;
}
处理很简单,通过Channel将Header和Body写到Slave,最后返回bytebuf是否写入完成
GroupTransferService
ReadSocketService处理的时候,收到Slave发送回来的Offset,会调用GroupTransferService的notifyTransferSome方法,看下这个方法做了什么
public void notifyTransferSome() {
this.notifyTransferObject.wakeup();
}
public void wakeup() {
synchronized (this) {
if (!this.hasNotified) {
this.hasNotified = true;
this.notify();
}
}
}
只是设置一个标志,然后调用notify方法,熟悉RocketMQ刷盘策略的人肯定能想起,这里的操作和异步刷盘的时候处理是一样的,只是设置一个标志,然后唤醒。
那么接下来需要看一下唤醒的是什么操作。notifyTransferObject使用了wakeup,那么就有等待的地方,通过这个找到了doWaitTransfer方法
private void doWaitTransfer() {
if (!this.requestsRead.isEmpty()) {
for (GroupCommitRequest req : this.requestsRead) {
boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
for (int i = 0; !transferOK && i < 5; i++) {// 重试 5次,每次条件不符合都等待Slave上传同步结果
this.notifyTransferObject.waitForRunning(1000);
transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
}
if (!transferOK) {
log.warn("transfer message to slave timeout, " + req.getNextOffset());
}
req.wakeupCustomer(transferOK);// 标记同步完成
}
this.requestsRead.clear();
}
}
public void wakeupCustomer(final boolean flushOK) {
this.flushOK = flushOK;
this.countDownLatch.countDown();
}
首先会遍历Request集合,这个集合是干嘛的暂不清楚,只能知道一个Request对应了一个offset,transferOK = push2SlaveMaxOffset(slave同步到的位置) 大于 该offset,如果为false,那么继续等待,到最后会把结果传给flushOK这个变量,并CDL countDown。
doWaitTransfer做什么已经搞清楚了,那么接下来就需要搞明白两个问题:
- GroupCommitRequest是什么?
- 这里的CDL阻塞的是哪里的线程?
在GroupCommitRequest中看到CDL是waitForFlush这个方法调用的
public boolean waitForFlush(long timeout) {
try {
boolean result = this.countDownLatch.await(timeout, TimeUnit.MILLISECONDS);
return result || this.flushOK;
}
catch (InterruptedException e) {
e.printStackTrace();
return false;
}
}
使用CDL阻塞,到最后返回flushOK的值,通过这个方法找CommitLog的putMessage中的两个调用处:
第一个调用处:
if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
HAService service = this.defaultMessageStore.getHaService();
// 如果消息属性WAIT为true
// 该值意思为:是否等待服务器将消息存储完毕再返回(可能是等待刷盘完成或者等待同步复制到其他服务器)
if (msg.isWaitStoreMsgOK()) {
// isSlaveOK满足如下两个条件
// 当Slave和Master的进度相差小于256M,则认为正常
// 当Slave连接数大于0,则认为正常
if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {
if (null == request) {
// GroupCommitRequest中的offset代表了当时写CommitLog之后CommitLog offset的位置
request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
}
service.putRequest(request);// 最终调用了GroupTransferService的putRequest方法,即doWaitTransfer方法的那个集合
service.getWaitNotifyObject().wakeupAll();
// 这里就是上面讲的地方,即等到同步完成
// 从代码层面来讲就是,就是等到Slave同步到的offset>result.getWroteOffset() + result.getWroteBytes()
boolean flushOK =
request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig()
.getSyncFlushTimeout());
if (!flushOK) {// 同步超时
log.error("do sync transfer other node, wait return, but failed, topic: "
+ msg.getTopic() + " tags: " + msg.getTags() + " client address: "
+ msg.getBornHostString());
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
}
}
else {
// 如果没有Slave的情况下,还配置了SYNC_MASTER的模式,
// 那么isSlaveOK中第二个条件就直接失败了,producer发送消息就会一直报这个错误
putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);
}
}
}
第二个调用处是同步刷盘,逻辑类似:
if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
if (msg.isWaitStoreMsgOK()) {
request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
service.putRequest(request);
boolean flushOK =
request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig()
.getSyncFlushTimeout());
if (!flushOK) {
log.error("do groupcommit, wait for flush failed, topic: " + msg.getTopic() + " tags: "
+ msg.getTags() + " client address: " + msg.getBornHostString());
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
}
}
else {
service.wakeup();
}
}
看完这两个调用,大概理清了整个流程,完善一下一开始的时序图:
Master和Slave相关的交互就介绍结束了,接下来分析一下Consumer还有Producer在多Master多Slave下的处理细节
Producer
Producer主要看下DefaultMQProducerImpl这个类的send
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());//1.获取Topic信息TopicPublishInfo
if (topicPublishInfo != null && topicPublishInfo.ok()) {
// ....
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;//异步为1
int times = 0;
String[] brokersSent = new String[timesTotal];
for (; times < timesTotal; times++) {
String lastBrokerName = null == mq ? null : mq.getBrokerName();
MessageQueue tmpmq = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
if (tmpmq != null) {
mq = tmpmq;
brokersSent[times] = mq.getBrokerName();
try {
beginTimestampPrev = System.currentTimeMillis();
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);//3.发送消息到broker
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
switch (communicationMode) {//默认是异步类型
case ASYNC:
return null;
case ONEWAY:
return null;
case SYNC:
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
continue;
}
}
return sendResult;
default:
break;
}
} catch ....{
// ....
}
} else {
break;
}
} // end of for
// ....
}
从先会获取Topic相关信息TopicPublishInfo,主要是其中的队列信息,格式如下(假设2个队列):
broker_a-queue-0
broker_a-queue-1
broker_b-queue-0
broker_b-queue-1
即为集群中所有队列
了解Producer发送机制的会知道,获取到队列后,会轮询队列进行发送,假设broker_a挂了导致发送失败,那么lastBrokerName不为空,选择队列的时候会忽略broker_a的队列
#TopicPublishInfo.selectOneMessageQueue(String)
//....
if (!mq.getBrokerName().equals(lastBrokerName)) {
return mq;
}
//....
Consumer
首先看下发起拉取消息请求的地方
#PullAPIWrapper.pullKernelImpl(MessageQueue, String, long, long, int, int, long, long, long, CommunicationMode, PullCallback)
FindBrokerResult findBrokerResult =
this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
this.recalculatePullFromWhichNode(mq), false);
public FindBrokerResult findBrokerAddressInSubscribe(//
final String brokerName, //
final long brokerId, //
final boolean onlyThisBroker) {
String brokerAddr = null;
boolean slave = false;
boolean found = false;
HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName);
if (map != null && !map.isEmpty()) {
brokerAddr = map.get(brokerId);
slave = (brokerId != MixAll.MASTER_ID);
found = (brokerAddr != null);
if (!found && !onlyThisBroker) {
Entry<Long, String> entry = map.entrySet().iterator().next();
brokerAddr = entry.getValue();
slave = (entry.getKey() != MixAll.MASTER_ID);
found = true;
}
}
if (found) {
return new FindBrokerResult(brokerAddr, slave);
}
return null;
}
public long recalculatePullFromWhichNode(final MessageQueue mq) {
if (this.isConnectBrokerByUser()) {// 默认为false
return this.defaultBrokerId;
}
// 通过队列获取建议的brokerId
AtomicLong suggest = this.pullFromWhichNodeTable.get(mq);
if (suggest != null) {
return suggest.get();
}
//如果为空那么返回master的id,即0
return MixAll.MASTER_ID;
}
findBrokerAddressInSubscribe方法首先会通过brokerName从rokerAddrTable获取已个map,如果是Master有Slave,那么map有两个元素,key分别为Master的id(0)和Slave的id,brokerId默认传入0,在Master挂掉的情况下,获取不到brokerAddr,那么会遍历map的元素,获取到slave的地址信息进行使用
那么到这里需要考虑几个问题:
- Master挂掉的情况下,Consumer如何感知(从代码层面来说,为何挂掉,传入0,从map中获取不到地址)
- "建议拉取消息的brokerId"(suggest)这个值怎么确认
第一个问题:
从map中获取不到Master的数据,可以猜到在master挂掉的时候,Client端从NameServer中获取不到该Broker的信息,然后从map中移除,看下具体怎么实现的,先找到移除的地方cleanOfflineBroker方法(该方法是在Client端启动的时候云收银的定时任务中进行的)
#MQClientInstance.cleanOfflineBroker()
Iterator<Entry<String, HashMap<Long, String>>> itBrokerTable = this.brokerAddrTable.entrySet().iterator();
while (itBrokerTable.hasNext()) {
Entry<String, HashMap<Long, String>> entry = itBrokerTable.next();
// 一个brokerName下对应的master和slave
String brokerName = entry.getKey();
HashMap<Long, String> oneTable = entry.getValue();
HashMap<Long, String> cloneAddrTable = new HashMap<Long, String>();
cloneAddrTable.putAll(oneTable);
Iterator<Entry<Long, String>> it = cloneAddrTable.entrySet().iterator();
while (it.hasNext()) {
Entry<Long, String> ee = it.next();
String addr = ee.getValue();
if (!this.isBrokerAddrExistInTopicRouteTable(addr)) {// 从topicRouteTable中判断地址是否有效
it.remove();
}
}
}
遍历brokerAddrTable,将无效的地址移除。topicRouteTable中的信息为TopicRouteData,包含了一个topic下的队列和broker信息,这个也是在定时任务中,Client向NameServer拉取topic下的broker信息,如果Broker挂掉了(Broker会定时向NameServer注册,类似发送心跳,如果),那么自然是拉取不到该Broker的信息。
第二个问题:
看下pullFromWhichNodeTable更新的地方,找到对应使用这个方法的地方,如下:
#PullAPIWrapper.processPullResult(MessageQueue, PullResult, SubscriptionData)
public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult,
final SubscriptionData subscriptionData) {
PullResultExt pullResultExt = (PullResultExt) pullResult;
this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId());
//....
return pullResult;
}
这个是Consumer拉取消息的时候的回调方法,可见这个pullFromWhichNodeTable的值是Broker返回的,看下Broker什么情况下会设置这个值,首先从Broker处理拉取消息请求的PullMessageProcessor类开始找起,找到设置这个值的地方
// ....
// 如果getMessageResult返回从Slave拉取,那么设置slave的id,否则设置Masterid,即0
// 这种情况为消费过慢
if (getMessageResult.isSuggestPullingFromSlave()) {
responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
} else {
responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
}
switch (this.brokerController.getMessageStoreConfig().getBrokerRole()) {
case ASYNC_MASTER:
case SYNC_MASTER:
break;
case SLAVE:
if (!this.brokerController.getBrokerConfig().isSlaveReadEnable()) {
response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
}
break;
}
if (this.brokerController.getBrokerConfig().isSlaveReadEnable()) {
// 消费过慢
if (getMessageResult.isSuggestPullingFromSlave()) {
responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
}
// consume ok
else {
responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());
}
} else {
responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
}
// ....
总的来说,只有消费过慢的时候会建议从Slave拉取消息,那么这个消费过慢是怎么判断的,要看下DefaultMessageStore的getMessage方法的几行代码:
final long maxOffsetPy = this.commitLog.getMaxOffset();// 当前消息写入的最大位置
// ....
int i = 0;
for (; i < bufferConsumeQueue.getSize() && i < MaxFilterMessageCount; i += ConsumeQueue.CQStoreUnitSize) {
long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();
maxPhyOffsetPulling = offsetPy;// 当前要拉取消息的位置
// ....
}
// ....
long diff = maxOffsetPy - maxPhyOffsetPulling;
long memory = (long) (StoreUtil.TotalPhysicalMemorySize
* (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
// 意思即为本地拉取的offset和最大的offset相差的大小为内存的40%的时候,那么建议去Slave拉取
getResult.setSuggestPullingFromSlave(diff > memory);
配置同步
Slave会定时Broker拉取配置和offset
# SlaveSynchronize.syncAll
public void syncAll() {
this.syncTopicConfig();// 从Broker拉取topic配置信息
this.syncConsumerOffset();// 从Broker拉取ConsumerOffset
this.syncDelayOffset();// 从Broker拉取DelayOffset
this.syncSubscriptionGroupConfig();// 从Broker拉取订阅信息
}