1.Master-Slave主从同步
2.同步信息:数据内容+元数据信息(配置信息)
3.元数据同步:Broker角色识别,为Slave则启动同步任务
//如果角色为SLAVE
if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {
this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
this.updateMasterHAServerAddrPeriodically = false;
} else {
this.updateMasterHAServerAddrPeriodically = true;
}
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.slaveSynchronize.syncAll();//启动定时任务进行同步元数据信息
} catch (Throwable e) {
log.error("ScheduledTask syncAll slave exception", e);
}
}
}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
} else {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.printMasterAndSlaveDiff();
} catch (Throwable e) {
log.error("schedule printMasterAndSlaveDiff error.", e);
}
}
}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
}
进入syncAll()方法看一下
public void syncAll() {
this.syncTopicConfig();//同步topic配置信息
this.syncConsumerOffset();//同步消费者偏移量
this.syncDelayOffset();//同步延迟偏移量
this.syncSubscriptionGroupConfig();//同步订阅组配置信息
}
4.消息同步:HAService、HAconnection、WaitNotifyObject
/**
* Master/Slave组件:
*
* Master节点:
* AcceptSocketService : 接收Slave节点连接
* HAConnection
* ReadSocketService : 读来自Slave节点的数据
* WriteSocketService : 写往到Slave节点的数据
* Slave节点:
* HAService
* HAClient : 对Master节点连接、读写数据。
*
* 通信协议: Master节点与Slave节点 通信协议很简单,只有如下两条:
*
* 对象 用途 第几位 字段 数据类型 字节数 说明
* Slave=>Master 上报CommitLog已经同步到的物理位置 0 maxPhyOffset Long 8 CommitLog最大物理位置
* Master=>Slave 传输新的CommitLog数据 0 fromPhyOffset Long 8 CommitLog开始物理位置
* 1 size Int 4 传输CommitLog数据长度
* 2 body Bytes size 传输CommitLog数据
*/