引导
前面介绍了RocketMQ的CommitLog文件相关的类分析CommitLog物理日志相关的CommitLog
类。其中有介绍到消息刷盘时高可用对应的handleHA
方法,handleHA
方法中如果配置的服务器的角色为SYNC_MASTER
(从master同步),就会等待主从之间消息同步的进度达到设定的值之后才正常返回,如果超时则返回同步超时
public void handleHA(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
//如果设置的主从之间是同步更新
if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
HAService service = this.defaultMessageStore.getHaService();
if (messageExt.isWaitStoreMsgOK()) {
// 检查slave同步的位置是否小于 最大容忍的同步落后偏移量参数 haSlaveFallbehindMax,如果是的则进行主从同步刷盘
if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
service.putRequest(request);
service.getWaitNotifyObject().wakeupAll();
//countDownLatch.await 同步等待刷新,除非等待超时
boolean flushOK =
request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
if (!flushOK) {
log.error("do sync transfer other node, wait return, but failed, topic: " + messageExt.getTopic() + " tags: "
+ messageExt.getTags() + " client address: " + messageExt.getBornHostNameString());
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
}
}
// Slave problem
else {
// Tell the producer, slave not available
//设置从服务不可用的状态
putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);
}
}
}
}
这段代码的主要逻辑如下:
- 如果服务器的角色设置为
SYNC_MASTER
,则进行下一步,否则直接跳过主从同步 - 获取
HAService
对象,检查消息是否本地存储完毕,如果没有则结束,否则进入下一步 - 检查slave同步的位置是否小于 最大容忍的同步落后偏移量参数
haSlaveFallbehindMax
,如果是的则进行主从同步刷盘。如果没有则返回slave不可用的状态 - 将消息落盘的最大物理偏移量也就是CommitLog上的偏移量作为参数构建一个
GroupCommitRequest
对象,然后提交到HAService
- 最多等待
syncFlushTimeout
长的时间,默认为5秒。在5秒内获取结果,然后根据结果判断是否返回超时
同步流程
上面那段代码比较简单,因为主从的逻辑全部交给了HAService
和HAConnection
两个类处理了。这里先简单介绍一下整个同步的流程(同步模式)
这个题可能不好理解,等源码逻辑分析完之后再看可能会清楚点。
高可用服务HAService
HAService
是在RocketMQ的Broker启动的时候就会创建的,而创建的点在DefaultMessageStore
这个消息存储相关的综合类中,在这个类的构造器中会创建HAService
无论当前的Broker是什么角色。这个类后续会有文章分析
这里需要说明的是Broker中的Master和Slaver两个角色,代码都是一样的,只不过是在实际执行的时候,走的分支不一样
内部属性
在HAService
中有几个比较重要的属性,这里需要简单的介绍一下
参数 | 说明 |
---|---|
connectionList | 连接到master的slave连接列表,用于管理连接 |
acceptSocketService | 用于接收连接用的服务,只监听OP_ACCEPT事件,监听到连接事件时候,创建HAConnection来处理读写请求事件 |
waitNotifyObject | 一个消费等待模型类,用于处理高可用线程和CommitLog的刷盘线程交互 |
push2SlaveMaxOffset | master同步到slave的偏移量 |
groupTransferService | 主从同步的检测服务,用于检查是否同步完成 |
haClient | 高可用的服务,slave用来跟master建立连接,像master汇报偏移量和拉取消息 |
/**
* 连接到本机的数量
*/
private final AtomicInteger connectionCount = new AtomicInteger(0);
/**
* 连接列表,用于管理连接
*/
private final List<HAConnection> connectionList = new LinkedList<>();
/**
* 接受和监听slave的连接服务
*/
private final AcceptSocketService acceptSocketService;
private final DefaultMessageStore defaultMessageStore;
/**
* 一个服务消费者线程模型的对象,用于跟CommitLog的刷盘线程交互
*/
private final WaitNotifyObject waitNotifyObject = new WaitNotifyObject();
/**
* master跟slave 消息同步的位移量
*/
private final AtomicLong push2SlaveMaxOffset = new AtomicLong(0);
/**
* 主从信息同步的服务
*/
private final GroupTransferService groupTransferService;
/**
* 操作主从的类
*/
private final HAClient haClient;
构造函数
HAService
只有一个构造器。逻辑也比较简单,创建一个AcceptSocketService
开放一个端口为 10912
的端口用于slave来简历连接,同时启动主从信息同步的任务groupTransferService
用于接收CommitLog
在高可用刷盘时提交任务
public HAService(final DefaultMessageStore defaultMessageStore) throws IOException {
this.defaultMessageStore = defaultMessageStore;
//创建,接受连接的服务, 开放的端口号为10912
this.acceptSocketService =
new AcceptSocketService(defaultMessageStore.getMessageStoreConfig().getHaListenPort());
//创建主从信息同步的线程
this.groupTransferService = new GroupTransferService();
this.haClient = new HAClient();
}
内部类分析
HAService
在创建之后,会在DefaultMessageStore
中调用其start
方法,这个方法会启动其内部的几个内部类,用来主从同步
public void start() throws Exception {
//接受连接的服务,开启端口,设置监听的事件
this.acceptSocketService.beginAccept();
//开启服务不断检查是否有连接
this.acceptSocketService.start();
//开启groupTransferService,接受CommitLog的主从同步请求
this.groupTransferService.start();
//开启haClient,用于slave来建立与Master连接和同步
this.haClient.start();
}
接下来对这几个内部类进行分析
用于接受Slave连接的AcceptSocketService
AcceptSocketService
这个类在Broker的Master和Slaver两个角色启动时都会创建,只不过区别是Slaver开启端口之后,并不会有别的Broker与其建立连接。因为只有在Broker的角色是Slave的时候才会指定要连接的Master地址。这个逻辑,在Broker启动的时候BrokerController
类中运行的。
public void beginAccept() throws Exception {
//创建ServerSocketChannel
this.serverSocketChannel = ServerSocketChannel.open();
//创建selector
this.selector = RemotingUtil.openSelector();
//设置SO_REUSEADDR https://blog.csdn.net/u010144805/article/details/78579528
this.serverSocketChannel.socket().setReuseAddress(true);
//设置绑定的地址
this.serverSocketChannel.socket().bind(this.socketAddressListen);
//设置为非阻塞模式
this.serverSocketChannel.configureBlocking(false);
//注册监听事件为 连接事件
this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
}
beginAccept
方法就是开启Socket,绑定10912端口,然后注册selector和指定监听的事件为OP_ACCEPT
也就是建立连接事件。对应的IO模式为NIO模式。主要看其run
方法,这个方法是Master用来接受Slave连接的核心。
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
//设置阻塞等待时间
this.selector.select(1000);
//获取selector 下的所有selectorKey ,后续迭代用
Set<SelectionKey> selected = this.selector.selectedKeys();
if (selected != null) {
for (SelectionKey k : selected) {
//检测有连接事件的selectorKey
if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
//获取selectorKey的Channel
SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();
if (sc != null) {
HAService.log.info("HAService receive new connection, "
+ sc.socket().getRemoteSocketAddress());
try {
//创建HAConnection,建立连接
HAConnection conn = new HAConnection(HAService.this, sc);
//建立连接
conn.start();
//添加连接到连接列表中
HAService.this.addConnection(conn);
} catch (Exception e) {
log.error("new HAConnection exception", e);
sc.close();
}
}
} else {
log.warn("Unexpected ops in select " + k.readyOps());
}
}
//清空连接事件,未下一次做准备
selected.clear();
}
} catch (Exception e) {
log.error(this.getServiceName() + " service has exception.", e);
}
}
log.info(this.getServiceName() + " service end");
}
这里的逻辑比较简单。就是每过一秒检查一次是否有连接事件,如果有则建立连接,并把建立起来的连接加入到连接列表中进行保存。一直循环这个逻辑。
检查同步进度和唤醒CommitLog刷盘线程的GroupTransferService
GroupTransferService
是CommitLog消息刷盘的类CommitLog
与HAService
打交道的一个中间类。在CommitLog
中进行主从刷盘的时候,会创建一个CommitLog.GroupCommitRequest
的内部类,这个类包含了当前Broker最新的消息的物理偏移量信息。然后把这个类丢给GroupTransferService
处理,然后唤醒GroupTransferService
。起始这个逻辑跟CommitLog
内部的GroupCommitService
逻辑一样。只不过对于同步部分的逻辑不一样,这里可以参考前面的文章存储部分(3)CommitLog物理日志相关的CommitLog
类。
先看run
方法
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
/**
* 这里进入等待,等待被唤醒,进入等待之前会调用onWaitEnd方法,然后调用swapRequests方法,
* 吧requestsWrite转换为requestsRead
*/
this.waitForRunning(10);
/**
* 进行请求处理
*/
this.doWaitTransfer();
} catch (Exception e) {
log.warn(this.getServiceName() + " service has exception. ", e);
}
}
log.info(this.getServiceName() + " service end");
}
在run
方法中会将传入的CommitLog.GroupCommitRequest
从requestsWrite
转换到requestsRead
中然后进行处理检查对应的同步请求的进度。检查的逻辑在doWaitTransfer
中
private void doWaitTransfer() {
//对requestsRead请求加锁
synchronized (this.requestsRead) {
//如果读请求不为空
if (!this.requestsRead.isEmpty()) {
for (CommitLog.GroupCommitRequest req : this.requestsRead) {
//如果push到slave的偏移量 大于等于 请求中的消息的最大偏移量 表示slave同步完成
boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
//计算这次同步超时的时间点 同步的超时时间段为5s
long waitUntilWhen = HAService.this.defaultMessageStore.getSystemClock().now()
+ HAService.this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout();
//如果没有同步完毕,并且还没达到超时时间,则等待1秒之后检查同步的进度,大概检查5次
while (!transferOK && HAService.this.defaultMessageStore.getSystemClock().now() < waitUntilWhen) {
this.notifyTransferObject.waitForRunning(1000);
transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
}
//如果超时了,检查是不是同步完成了,
if (!transferOK) {
log.warn("transfer messsage to slave timeout, " + req.getNextOffset());
}
//超时或者同步成功的时候 唤醒主线程
req.wakeupCustomer(transferOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
}
this.requestsRead.clear();
}
}
}
主要逻辑如下:
- 比较Master推送到Slave的 偏移量
push2SlaveMaxOffset
是不是大于传进来的CommitLog.GroupCommitRequest
中的偏移量 - 计算本次同步超时的时间节点,时间为当前时间加上参数系统配置参数
syncFlushTimeout
默认为5秒 - 如果第一步结果为true,则返回结果为
PUT_OK
。如果第一步为false,则每过一秒检查一次结果,如果超过5次了还没同步完成,则表示超时了那么返回结果为FLUSH_SLAVE_TIMEOUT
。同时会唤醒CommitLog
的刷盘线程。
与Slave紧密相关的HAClient
前面我们说到了只有是Salve角色的Broker才会真正的配置Master的地址,而HAClient
是需要Master地址的,因此这个类真正在运行的时候只有Slave才会真正的使用到。
先看看核心的参数信息
//Socket读缓存区大小
private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024 * 4;
//master地址
private final AtomicReference<String> masterAddress = new AtomicReference<>();
//Slave向Master发起主从同步的拉取偏移量,固定8个字节
private final ByteBuffer reportOffset = ByteBuffer.allocate(8);
private SocketChannel socketChannel;
private Selector selector;
//上次同步偏移量的时间戳
private long lastWriteTimestamp = System.currentTimeMillis();
//反馈Slave当前的复制进度,commitlog文件最大偏移量
private long currentReportedOffset = 0;
private int dispatchPosition = 0;
//读缓冲大小
private ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
private ByteBuffer byteBufferBackup = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
基本上都是缓冲相关的配置。这里主要分析的是run
方法中的逻辑
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
//连接master,同时监听读请求事件
if (this.connectMaster()) {
//是否需要汇报偏移量,间隔需要大于心跳的时间(5s)
if (this.isTimeToReportOffset()) {
//向master 汇报当前 salve 的CommitLog的最大偏移量,并记录这次的同步时间
boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);
//如果汇报完了就关闭连接
if (!result) {
this.closeMaster();
}
}
this.selector.select(1000);
//向master拉取的信息
boolean ok = this.processReadEvent();
if (!ok) {
this.closeMaster();
}
//再次同步slave的偏移量如果,最新的偏移量大于已经汇报的情况下
if (!reportSlaveMaxOffsetPlus()) {
continue;
}
//检查时间距离上次同步进度的时间间隔
long interval =
HAService.this.getDefaultMessageStore().getSystemClock().now()
- this.lastWriteTimestamp;
//如果间隔大于心跳的时间,那么就关闭
if (interval > HAService.this.getDefaultMessageStore().getMessageStoreConfig()
.getHaHousekeepingInterval()) {
log.warn("HAClient, housekeeping, found this connection[" + this.masterAddress
+ "] expired, " + interval);
this.closeMaster();
log.warn("HAClient, master not response some time, so close connection");
}
} else {
//等待
this.waitForRunning(1000 * 5);
}
} catch (Exception e) {
log.warn(this.getServiceName() + " service has exception. ", e);
this.waitForRunning(1000 * 5);
}
}
log.info(this.getServiceName() + " service end");
}
主要的逻辑如下:
- 连接master,如果当前的broker角色是master,那么对应的
masterAddress
是空的,不会有后续逻辑。如果是slave,并且配置了master地址,则会进行连接进行后续逻辑处理 - 检查是否需要向master汇报当前的同步进度,如果两次同步的时间小于5s,则不进行同步。每次同步之间间隔在5s以上,这个5s是心跳连接的间隔参数为
haSendHeartbeatInterval
- 向master 汇报当前 salve 的CommitLog的最大偏移量,并记录这次的同步时间
- 从master拉取日志信息,主要就是进行消息的同步,同步出问题则关闭连接
- 再次同步slave的偏移量,如果最新的偏移量大于已经汇报的情况下则从步骤1重头开始
这里分析完了run
方法,然后就要分析主要的日志同步的逻辑了,这个逻辑在processReadEvent
方法中
private boolean processReadEvent() {
int readSizeZeroTimes = 0;
//如果读取缓存还有没读取完,则一直读取
while (this.byteBufferRead.hasRemaining()) {
try {
//从master读取消息
int readSize = this.socketChannel.read(this.byteBufferRead);
if (readSize > 0) {
readSizeZeroTimes = 0;
//分发请求
boolean result = this.dispatchReadRequest();
if (!result) {
log.error("HAClient, dispatchReadRequest error");
return false;
}
} else if (readSize == 0) {
if (++readSizeZeroTimes >= 3) {
break;
}
} else {
log.info("HAClient, processReadEvent read socket < 0");
return false;
}
} catch (IOException e) {
log.info("HAClient, processReadEvent read socket exception", e);
return false;
}
}
return true;
}
private boolean dispatchReadRequest() {
//请求的头信息
final int msgHeaderSize = 8 + 4; // phyoffset + size
//获取请求长度
int readSocketPos = this.byteBufferRead.position();
while (true) {
//获取分发的偏移差
int diff = this.byteBufferRead.position() - this.dispatchPosition;
//如果偏移差大于头大小,说明存在请求体
if (diff >= msgHeaderSize) {
//获取主master的最大偏移量
long masterPhyOffset = this.byteBufferRead.getLong(this.dispatchPosition);
//获取消息体
int bodySize = this.byteBufferRead.getInt(this.dispatchPosition + 8);
//获取salve的最大偏移量
long slavePhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
if (slavePhyOffset != 0) {
if (slavePhyOffset != masterPhyOffset) {
log.error("master pushed offset not equal the max phy offset in slave, SLAVE: "
+ slavePhyOffset + " MASTER: " + masterPhyOffset);
return false;
}
}
//如果偏移差大于 消息头和 消息体大小。则读取消息体
if (diff >= (msgHeaderSize + bodySize)) {
byte[] bodyData = new byte[bodySize];
this.byteBufferRead.position(this.dispatchPosition + msgHeaderSize);
this.byteBufferRead.get(bodyData);
//吧消息同步到slave的 CommitLog
HAService.this.defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData);
this.byteBufferRead.position(readSocketPos);
//记录分发的位置
this.dispatchPosition += msgHeaderSize + bodySize;
if (!reportSlaveMaxOffsetPlus()) {
return false;
}
continue;
}
}
if (!this.byteBufferRead.hasRemaining()) {
this.reallocateByteBuffer();
}
break;
}
return true;
}
每一步的逻辑都是比较清楚的,这里不进行讲解。
Master用来同步日志用的HAConnection
前面说过,在HAService
的AcceptSocketService
内部类中,Master会在建立连接的时候创建HAConnection
用来处理读写事件。这里主要介绍构造函数和内部类就能了解原理了。
构造函数
public HAConnection(final HAService haService, final SocketChannel socketChannel) throws IOException {
//指定所属的 HAService
this.haService = haService;
//指定的NIO的socketChannel
this.socketChannel = socketChannel;
//客户端的地址
this.clientAddr = this.socketChannel.socket().getRemoteSocketAddress().toString();
//这是为非阻塞
this.socketChannel.configureBlocking(false);
/**
* 是否启动SO_LINGER
* SO_LINGER作用
* 设置函数close()关闭TCP连接时的行为。缺省close()的行为是,如果有数据残留在socket发送缓冲区中则系统将继续发送这些数据给对方,等待被确认,然后返回。
*
* https://blog.csdn.net/u012635648/article/details/80279338
*/
this.socketChannel.socket().setSoLinger(false, -1);
/**
* 是否开启TCP_NODELAY
* https://blog.csdn.net/lclwjl/article/details/80154565
*/
this.socketChannel.socket().setTcpNoDelay(true);
//接收缓冲的大小
this.socketChannel.socket().setReceiveBufferSize(1024 * 64);
//发送缓冲的大小
this.socketChannel.socket().setSendBufferSize(1024 * 64);
//端口写服务
this.writeSocketService = new WriteSocketService(this.socketChannel);
//端口读服务
this.readSocketService = new ReadSocketService(this.socketChannel);
//增加haService中的连接数字段
this.haService.getConnectionCount().incrementAndGet();
}
内部类分析
监听slave日志同步进度和同步日志的WriteSocketService
WriteSocketService
监听的是OP_WRITE
事件,注册的端口就是在HAService
中开启的端口。直接看对应的核心方法run
方法,方法有点长这里只看看核心的部分
public void run() {
HAConnection.log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
this.selector.select(1000);
//如果slave的读请求为 -1 表示没有slave 发出写请求,不需要处理
if (-1 == HAConnection.this.slaveRequestOffset) {
Thread.sleep(10);
continue;
}
//nextTransferFromWhere 为-1 表示初始第一次同步,需要进行计算
if (-1 == this.nextTransferFromWhere) {
//如果slave 同步完成 则下次同步从CommitLog的最大偏移量开始同步
if (0 == HAConnection.this.slaveRequestOffset) {
//获取master 上面的 CommitLog 最大偏移量
long masterOffset = HAConnection.this.haService.getDefaultMessageStore().getCommitLog().getMaxOffset();
masterOffset =
masterOffset
- (masterOffset % HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
.getMappedFileSizeCommitLog());
if (masterOffset < 0) {
masterOffset = 0;
}
//设置下次开始同步的位置
this.nextTransferFromWhere = masterOffset;
} else {
//设置下次同步的位置,为 salve 读请求的位置
this.nextTransferFromWhere = HAConnection.this.slaveRequestOffset;
}
log.info("master transfer data from " + this.nextTransferFromWhere + " to slave[" + HAConnection.this.clientAddr
+ "], and slave request " + HAConnection.this.slaveRequestOffset);
}
//上次同步是否完成
if (this.lastWriteOver) {
//获取两次写请求的周期时间
long interval =
HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;
//如果周期大于 心跳间隔 。需要先发送一次心跳 心跳间隔为 5000毫秒
if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
.getHaSendHeartbeatInterval()) {
// 创建请求头,心跳请求大小为12 字节
this.byteBufferHeader.position(0);
this.byteBufferHeader.limit(headerSize);
this.byteBufferHeader.putLong(this.nextTransferFromWhere);
this.byteBufferHeader.putInt(0);
this.byteBufferHeader.flip();
//进行消息同步
this.lastWriteOver = this.transferData();
if (!this.lastWriteOver)
continue;
}
} else {
//如果上次同步没有完成,则继续同步
this.lastWriteOver = this.transferData();
//如果还没同步完成则继续
if (!this.lastWriteOver)
continue;
}
//获取开始同步位置之后的消息
SelectMappedBufferResult selectResult =
HAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);
//
if (selectResult != null) {
int size = selectResult.getSize();
//检查要同步消息的长度,是不是大于单次同步的最大限制 默认为 32kb
if (size > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) {
size = HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize();
}
long thisOffset = this.nextTransferFromWhere;
this.nextTransferFromWhere += size;
selectResult.getByteBuffer().limit(size);
this.selectMappedBufferResult = selectResult;
// Build Header
this.byteBufferHeader.position(0);
this.byteBufferHeader.limit(headerSize);
this.byteBufferHeader.putLong(thisOffset);
this.byteBufferHeader.putInt(size);
this.byteBufferHeader.flip();
this.lastWriteOver = this.transferData();
} else {
//如果需要同步的为空,则在等待100毫秒
HAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100);
}
} catch (Exception e) {
HAConnection.log.error(this.getServiceName() + " service has exception.", e);
break;
}
}
主要的逻辑如下:
- 如果slave进行了日志偏移量的汇报,判断是不是第一次的进行同步以及对应的同步进度。设置下一次的同步位置
- 检查上次同步是不是已经完成了,检查两次同步的周期是不是超过心跳间隔,如果是的则需要把心跳信息放到返回的头里面,然后进行消息同步。如果上次同步还没完成,则等待上次同步完成之后再继续
- 从Master本地读取CommitLog的最大偏移量,根据上次同步的位置开始从CommitLog获取日志信息,然后放到缓存中
- 如果缓存的大小大于单次同步的最大大小
haTransferBatchSize
默认是32kb,那么只同步32kb大小的日志。如果缓存为null,则等待100毫秒
其中日志同步的逻辑在transferData
方法中,这里就把代码贴出来
private boolean transferData() throws Exception {
int writeSizeZeroTimes = 0;
//心跳的头没写满,先写头
while (this.byteBufferHeader.hasRemaining()) {
//把请求头传过去
int writeSize = this.socketChannel.write(this.byteBufferHeader);
if (writeSize > 0) {
writeSizeZeroTimes = 0;
//记录上次写的时间
this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
} else if (writeSize == 0) {
//重试3次 则不再重试
if (++writeSizeZeroTimes >= 3) {
break;
}
} else {
throw new Exception("ha master write header error < 0");
}
}
//如果要同步的日志为null,则直接返回这次同步的结果是否同步完成
if (null == this.selectMappedBufferResult) {
return !this.byteBufferHeader.hasRemaining();
}
writeSizeZeroTimes = 0;
// 填充请求体
if (!this.byteBufferHeader.hasRemaining()) {
//如果还没有同步完成,则一直同步
while (this.selectMappedBufferResult.getByteBuffer().hasRemaining()) {
//同步的大小
int writeSize = this.socketChannel.write(this.selectMappedBufferResult.getByteBuffer());
if (writeSize > 0) {
writeSizeZeroTimes = 0;
this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
} else if (writeSize == 0) {
//重试3次
if (++writeSizeZeroTimes >= 3) {
break;
}
} else {
throw new Exception("ha master write body error < 0");
}
}
}
boolean result = !this.byteBufferHeader.hasRemaining() && !this.selectMappedBufferResult.getByteBuffer().hasRemaining();
//释放缓存
if (!this.selectMappedBufferResult.getByteBuffer().hasRemaining()) {
this.selectMappedBufferResult.release();
this.selectMappedBufferResult = null;
}
return result;
}
根据同步进度来唤醒刷盘CommitLog线程的ReadSocketService
ReadSocketService
的作用主要是:根据Slave推送的日志同步进度,来唤醒HAService
的GroupTransferService
然后进一步唤醒CommitLog
的日志刷盘线程。这里主要看run
方法和processReadEvent
方法。
public void run() {
HAConnection.log.info(this.getServiceName() + " service started");
//任务是否结束
while (!this.isStopped()) {
try {
//设置selector的阻塞时间
this.selector.select(1000);
//处理salver读取消息的事件
boolean ok = this.processReadEvent();
if (!ok) {
HAConnection.log.error("processReadEvent error");
break;
}
//检查此次处理时间是否超过心跳连接时间
long interval = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastReadTimestamp;
if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaHousekeepingInterval()) {
log.warn("ha housekeeping, found this connection[" + HAConnection.this.clientAddr + "] expired, " + interval);
break;
}
} catch (Exception e) {
HAConnection.log.error(this.getServiceName() + " service has exception.", e);
break;
}
}
......
}
private boolean processReadEvent() {
int readSizeZeroTimes = 0;
//检查 读取请求缓冲是否已经满了,
if (!this.byteBufferRead.hasRemaining()) {
//读请求缓冲转变为读取模式。
this.byteBufferRead.flip();
this.processPosition = 0;
}
while (this.byteBufferRead.hasRemaining()) {
try {
//从byteBufferRead读取
int readSize = this.socketChannel.read(this.byteBufferRead);
if (readSize > 0) {
readSizeZeroTimes = 0;
this.lastReadTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
//读取请求缓冲的位置 如果大于处理的8字节 表示有读取的请求没处理。为什么是8个字节,因为salver向master发去拉取请求时,偏移量固定为8
if ((this.byteBufferRead.position() - this.processPosition) >= 8) {
//获取消息开始的位置
int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);
//从开始位置读取8个字节,获取 slave的读请求偏移量
long readOffset = this.byteBufferRead.getLong(pos - 8);
//设置处理的位置
this.processPosition = pos;
//设置 salver读取的位置
HAConnection.this.slaveAckOffset = readOffset;
//如果slave的 读请求 偏移量小于0 表示同步完成了
if (HAConnection.this.slaveRequestOffset < 0) {
//重新设置slave的 读请求的 偏移量
HAConnection.this.slaveRequestOffset = readOffset;
log.info("slave[" + HAConnection.this.clientAddr + "] request offset " + readOffset);
}
//唤醒阻塞的线程, 在消息的主从同步选择的模式是同步的时候,会唤醒被阻塞的消息写入的线程
HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset);
}
} else if (readSize == 0) {
//如果数据为0超过3次,表示同步完成,直接结束
if (++readSizeZeroTimes >= 3) {
break;
}
} else {
log.error("read socket[" + HAConnection.this.clientAddr + "] < 0");
return false;
}
} catch (IOException e) {
log.error("processReadEvent exception", e);
return false;
}
}
return true;
}
整体的逻辑如下:
- 每1s执行一次事件就绪选择,然后调用processReadEvent方法处理读请求,读取从服务器的拉取请求
- 获取slave已拉取偏移量,因为有新的从服务器反馈拉取进度,需要通知某些生产者以便返回,因为如果消息发送使用同步方式,需要等待将消息复制到从服务器,然后才返回,故这里需要唤醒相关线程去判断自己关注的消息是否已经传输完成。也就是
HAService
的GroupTransferService
- 如果读取到的字节数等于0,则重复三次,否则结束本次读请求处理;如果读取到的字节数小于0,表示连接被断开,返回false,后续会断开该连接。
总结
RocketMQ的主从同步之间的核心类就是HAService
和HAConnection
和其中的几个子类。结合前面的那个图可以简单的理解一下。