简介
HAConnection用于描述master和slave用于同步数据的连接
两个service如下
ReadSocketService :读来自 Slave节点 的数据。
WriteSocketService :写到往 Slave节点 的数据。
重要方法
ReadSocketService#run
ReadSocketService#processReadEvent
WriteSocketService#run
WriteSocketService#transferData
部分涉及HAService的代码后面再讲
master和slave同步数据的协议如下
属性
private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
private final HAService haService;//上层ha服务
private final SocketChannel socketChannel;
private final String clientAddr;//slave地址
private WriteSocketService writeSocketService;//内部类
private ReadSocketService readSocketService;//内部类
private volatile long slaveRequestOffset = -1;//slave第一次请求的offset
private volatile long slaveAckOffset = -1;//确认slave的最大位置
函数
构造函数和状态处理函数比较简单
public HAConnection(final HAService haService, final SocketChannel socketChannel) throws IOException {
this.haService = haService;
this.socketChannel = socketChannel;
this.clientAddr = this.socketChannel.socket().getRemoteSocketAddress().toString();
this.socketChannel.configureBlocking(false);
this.socketChannel.socket().setSoLinger(false, -1);
this.socketChannel.socket().setTcpNoDelay(true);
this.socketChannel.socket().setReceiveBufferSize(1024 * 64);
this.socketChannel.socket().setSendBufferSize(1024 * 64);
this.writeSocketService = new WriteSocketService(this.socketChannel);
this.readSocketService = new ReadSocketService(this.socketChannel);
this.haService.getConnectionCount().incrementAndGet();//ha服务连接数+1
}
public void start() {
this.readSocketService.start();
this.writeSocketService.start();
}
public void shutdown() {
this.writeSocketService.shutdown(true);
this.readSocketService.shutdown(true);
this.close();
}
public void close() {
if (this.socketChannel != null) {
try {
this.socketChannel.close();
} catch (IOException e) {
HAConnection.log.error("", e);
}
}
}
public SocketChannel getSocketChannel() {
return socketChannel;
}
内部类
ReadSocketService
读socket服务
属性以及构造函数
private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024;
private final Selector selector;
private final SocketChannel socketChannel;
private final ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);//读的buffer,分配1M
private int processPostion = 0;//byteBufferRead处理到的位置
private volatile long lastReadTimestamp = System.currentTimeMillis();//记录最后读取的时间,20s超时
public ReadSocketService(final SocketChannel socketChannel) throws IOException {
this.selector = RemotingUtil.openSelector();
this.socketChannel = socketChannel;
this.socketChannel.register(this.selector, SelectionKey.OP_READ);
this.thread.setDaemon(true);
}
run方法
代码,注释如下
/**
* while循环中
* 1.调用processReadEvent,如果出错则break
* 2.如果读的间隔超过了指定时间(默认20s)则break
* 退出了break
* 1.读写service stop
* 2.ha服务移除当前HAConnection记录
* 3.selector,channel关闭,清理
*/
@Override
public void run() {
HAConnection.log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
this.selector.select(1000);
boolean ok = this.processReadEvent();//处理读事件
if (!ok) {//处理错误
HAConnection.log.error("processReadEvent error");
break;
}
long interval = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastReadTimestamp;
if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaHousekeepingInterval()) {//心跳间隔超过了指定间隔,默认20s
log.warn("ha housekeeping, found this connection[" + HAConnection.this.clientAddr + "] expired, " + interval);
break;
}
} catch (Exception e) {
HAConnection.log.error(this.getServiceName() + " service has exception.", e);
break;
}
}
this.makeStop();//读线程stop
writeSocketService.makeStop();//写线程stop
//ha服务移除当前HAConnection记录
haService.removeConnection(HAConnection.this);
//ha服务的连接数-1
HAConnection.this.haService.getConnectionCount().decrementAndGet();
//取消注册的key
SelectionKey sk = this.socketChannel.keyFor(this.selector);
if (sk != null) {
sk.cancel();
}
//关闭socket以及channel
try {
this.selector.close();
this.socketChannel.close();
} catch (IOException e) {
HAConnection.log.error("", e);
}
HAConnection.log.info(this.getServiceName() + " service end");
}
processReadEvent
/**
* 处理读事件
* 1.如果byteBufferRead写满了,就flip准备重新写,更新processPostion
* 2.只要buffer还能写
* 从socketChannel内容写到byteBufferRead中
* 更新lastReadTimestamp
* 找到pos之前最近%8==0的位置(每次传输的是一个long,刚好8个字节)
* 读取之前8个字节,记录slaveAckOffset,代表slave发送过来的offset,即slave同步到的offset
* notifyTransferSome。master通知slave: "已经知道slave同步到slaveAckOffset这个位置了"
*/
private boolean processReadEvent() {
int readSizeZeroTimes = 0;
if (!this.byteBufferRead.hasRemaining()) {//没有什么要读了
this.byteBufferRead.flip();//从头读
this.processPostion = 0;
}
while (this.byteBufferRead.hasRemaining()) {
try {
int readSize = this.socketChannel.read(this.byteBufferRead);//channel中的内容读到byteBufferRead中
if (readSize > 0) {
readSizeZeroTimes = 0;
//最后读取时间
this.lastReadTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
if ((this.byteBufferRead.position() - this.processPostion) >= 8) {//buffer最新的位置离处理的位置超过了8字节(一个long占的位置)
int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);//找到对应的整
long readOffset = this.byteBufferRead.getLong(pos - 8);//读取这个long
this.processPostion = pos;//记录处理到的位置
// 设置已确认的最大位置
HAConnection.this.slaveAckOffset = readOffset;
if (HAConnection.this.slaveRequestOffset < 0) {//初始请求的位置,代表是slave第一次请求
HAConnection.this.slaveRequestOffset = readOffset;
log.info("slave[" + HAConnection.this.clientAddr + "] request offset " + readOffset);
}
// 通知目前Slave进度
HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset);
}
} else if (readSize == 0) {
if (++readSizeZeroTimes >= 3) {
break;
}
} else {
log.error("read socket[" + HAConnection.this.clientAddr + "] < 0");
return false;
}
} catch (IOException e) {
log.error("processReadEvent exception", e);
return false;
}
}
return true;
}
WriteSocketService
写socket服务,完成master向slave发送数据
属性以及构造函数
private final Selector selector;
private final SocketChannel socketChannel;
private final int headerSize = 8 + 4;//头部信息 包含 physical offset(long) + bodySize(int)
private final ByteBuffer byteBufferHeader = ByteBuffer.allocate(headerSize);//master给slave同步数据时的头部buffer
private long nextTransferFromWhere = -1;//传递数据从哪开始
private SelectMappedBufferResult selectMappedBufferResult;//master给slave同步数据的内容,即body
private boolean lastWriteOver = true;
private long lastWriteTimestamp = System.currentTimeMillis();//最后一次写的时间
public WriteSocketService(final SocketChannel socketChannel) throws IOException {
this.selector = RemotingUtil.openSelector();
this.socketChannel = socketChannel;
this.socketChannel.register(this.selector, SelectionKey.OP_WRITE);
this.thread.setDaemon(true);
}
run方法
/**
* 正常情况下while循环
* 1.等到slave请求,读取服务中更新slaveRequestOffset
* 2.计算nextTransferFromWhere
* 2.1如果为首次连接,那么slave传递的offset为0, master收到为0的offset后,从最后一个mappedFile开始复制
* 2.2否则为slave同步过来的slaveRequestOffset
* 3.如果之前发送给slave内容没有完成,那么一直接着发送
* 4.如果之前发送给slave内容发完了,而且过了心跳时间(默认5s),那么只传递一个header过去(header中记录的bodySize为0)
* 5.根据nextTransferFromWhere去mappedFile找合适大小的mappedFile内容,记录在selectMappedBufferResult
* 6.调用transferData把头部byteBufferHeader以及body内容selectMappedBufferResult发送过去
* 如果遇到异常,break处while循环
* 1.读写service stop
* 2.haService移除当前记录
* 3.socket,channel关闭,清理
*
*/
@Override
public void run() {
HAConnection.log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
this.selector.select(1000);
if (-1 == HAConnection.this.slaveRequestOffset) {//slave还未请求过
Thread.sleep(10);
continue;
}
if (-1 == this.nextTransferFromWhere) {
//如果为首次连接,那么offset为0, master收到为0的offset后,从最后一个mappedFile开始复制
if (0 == HAConnection.this.slaveRequestOffset) {
long masterOffset = HAConnection.this.haService.getDefaultMessageStore().getCommitLog().getMaxOffset();
masterOffset =
masterOffset
- (masterOffset % HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
.getMapedFileSizeCommitLog());
if (masterOffset < 0) {
masterOffset = 0;
}
this.nextTransferFromWhere = masterOffset;
} else {//否则从指定的offset开始同步数据给slave
this.nextTransferFromWhere = HAConnection.this.slaveRequestOffset;//如果不是首次连接,就从确认的slave的offset开始
}
log.info("master transfer data from " + this.nextTransferFromWhere + " to slave[" + HAConnection.this.clientAddr
+ "], and slave request " + HAConnection.this.slaveRequestOffset);
}
if (this.lastWriteOver) {//上次写完了
long interval =
HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;
//5s一次心跳,如果超时,记录的bodySize为0
if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
.getHaSendHeartbeatInterval()) {
// Build Header
this.byteBufferHeader.position(0);
this.byteBufferHeader.limit(headerSize);
this.byteBufferHeader.putLong(this.nextTransferFromWhere);//头部信息 phyOffset为nextTransferFromWhere
this.byteBufferHeader.putInt(0);//头部信息 bodySize为0
this.byteBufferHeader.flip();
this.lastWriteOver = this.transferData();
if (!this.lastWriteOver)
continue;
}
} else {
this.lastWriteOver = this.transferData();
if (!this.lastWriteOver)
continue;
}
//获取指定位置之后的buffer
SelectMappedBufferResult selectResult =
HAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);
if (selectResult != null) {
int size = selectResult.getSize();
//同步数据不得超过默认32k
if (size > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) {
size = HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize();
}
long thisOffset = this.nextTransferFromWhere;
this.nextTransferFromWhere += size;//下次传输的位置
selectResult.getByteBuffer().limit(size);
this.selectMappedBufferResult = selectResult;
// Build Header
this.byteBufferHeader.position(0);
this.byteBufferHeader.limit(headerSize);//头部12字节
this.byteBufferHeader.putLong(thisOffset);//开始位置
this.byteBufferHeader.putInt(size);//bodySize大小
this.byteBufferHeader.flip();
this.lastWriteOver = this.transferData();
} else {
// 没新的消息,挂起等待
HAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100);
}
} catch (Exception e) {
HAConnection.log.error(this.getServiceName() + " service has exception.", e);
break;
}
}
//break出来,代表异常
if (this.selectMappedBufferResult != null) {
this.selectMappedBufferResult.release();
}
//停止读写service
this.makeStop();
readSocketService.makeStop();
haService.removeConnection(HAConnection.this);//移除该记录
SelectionKey sk = this.socketChannel.keyFor(this.selector);
if (sk != null) {
sk.cancel();
}
try {
this.selector.close();
this.socketChannel.close();
} catch (IOException e) {
HAConnection.log.error("", e);
}
HAConnection.log.info(this.getServiceName() + " service end");
}
transferData
master给slave写数据
/**
* 传输数据
* 1.写头部
* 2.写body
*/
private boolean transferData() throws Exception {
int writeSizeZeroTimes = 0;//长度为0的写的次数
// Write Header,写头部
while (this.byteBufferHeader.hasRemaining()) {
int writeSize = this.socketChannel.write(this.byteBufferHeader);
if (writeSize > 0) {
writeSizeZeroTimes = 0;
//更新写时间
this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
} else if (writeSize == 0) {
if (++writeSizeZeroTimes >= 3) {
break;
}
} else {
throw new Exception("ha master write header error < 0");
}
}
if (null == this.selectMappedBufferResult) {
return !this.byteBufferHeader.hasRemaining();
}
writeSizeZeroTimes = 0;
// Write Body,写内容
if (!this.byteBufferHeader.hasRemaining()) {//头部写完了
while (this.selectMappedBufferResult.getByteBuffer().hasRemaining()) {
int writeSize = this.socketChannel.write(this.selectMappedBufferResult.getByteBuffer());//把master的内容传递给slave
if (writeSize > 0) {
writeSizeZeroTimes = 0;//清空次数
this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
} else if (writeSize == 0) {
if (++writeSizeZeroTimes >= 3) {
break;
}
} else {
throw new Exception("ha master write body error < 0");
}
}
}
boolean result = !this.byteBufferHeader.hasRemaining() && !this.selectMappedBufferResult.getByteBuffer().hasRemaining();
if (!this.selectMappedBufferResult.getByteBuffer().hasRemaining()) {
this.selectMappedBufferResult.release();
this.selectMappedBufferResult = null;//发送完了之后置为null
}
return result;
}
思考
两个线程run方法的流程,思路
都写在注释上了
processReadEvent时如果byteBufferRead一次读入了大量数据怎么办
源码上
如果一下子读的内容比processPostion 大了几十几百,会怎么办,为什么只读最后8个字节就行
这里应该是HAClient的发送内容保证的,最后发送的offset一定是最大的,因此之前的offset,即使收到了没有处理,也没有关系
WriteSocketService和ReadSocketService的异同
同:都继承ServiceThread,都记录一个最后读(写)时间,出现问题时都需要关闭两个Service,处理socket等
异:
1.同步协议不一样
读数据只要读一个long,代表maxPhyOffset
写要有一个12字节的header,后面是同步的内容
2.检测的时长不一样
读数据是20s没有读算超时(houseKeep)
写是5s没有写算超时(heartBeat)
问题
processReadEvent代码重复
我并不知道为啥要这样写
备注
思考中第二个对于HAClient的讲解,源码还没看
refer
http://blog.csdn.net/quhongwei_zhanqiu/article/details/39144469
http://technoboy.iteye.com/blog/2368458