Rocketmq
使用 namesrv
来管理所有的元数据信息,包括主题 Topic
路由信息和 Broker
信息。
首先我们介绍一下一些基础概念:
-
Broker
: 储存消息的服务器。- 分为主从两种模式,通过
brokerId
来区分,目前brokerId = 0
就表示主节点。 - 每个
Broker
启动时,会向namesrv
注册自己的信息,并会定期发送心跳信息。
- 分为主从两种模式,通过
-
Broker
组 : 相同brokerName
名字的Broker
服务器就是一个组的。注意: 这里就有一个小问题,如果两个
Broker
有相同brokerName
名字,而且brokerId
都是0
时,它们都可以向namesrv
注册自己信息,后面覆盖前面信息,而且因为它们都会发送心跳消息,就会导致不断地相互覆盖。 -
Broker
集群 : 有相同clusterName
名字的Broker
服务器就是同一个集群的。 -
Topic
: 主题Topic
是以Broker
组进行区分的。-
Broker
组有一个TopicConfigManager
来管理该Broker
所拥有的所有主题Topic
信息,包括主题Topic
的权限perm
,读队列数量readQueueNums
,writeQueueNums
写队列数量等等。 -
Broker
组中主Broker
创建主题Topic
在这个Broker
组拥有队列文件Queue
,从Broker
只是复制主Broker
。 - 生产者发送消息时,也是从主题
Topic
拥有的Broker
组数组中,挑选一个Broker
组,向这个Broker
组的主Broker
发送消息,然后主Broker
再将这些数据发送给从Broker
。
-
一. NamesrvStartup
类
这个类是 namesrv
的启动类,用来开启 namesrv
。
1.1 main
方法
public static void main(String[] args) {
main0(args);
}
public static NamesrvController main0(String[] args) {
try {
// 创建 NamesrvController
NamesrvController controller = createNamesrvController(args);
// 启动 NamesrvController
start(controller);
String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
log.info(tip);
System.out.printf("%s%n", tip);
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
return null;
}
- 通过
createNamesrvController()
方法创建NamesrvController
实例。 - 调用
start(controller)
方法启动NamesrvController
。
1.2 createNamesrvController
方法
public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
//PackageConflictDetect.detectFastjson();
// 通过 commandLine 解析 args 参数
Options options = ServerUtil.buildCommandlineOptions(new Options());
commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
if (null == commandLine) {
System.exit(-1);
return null;
}
// namesrv 相关的配置参数
final NamesrvConfig namesrvConfig = new NamesrvConfig();
// netty 相关的配置参数
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
// 默认端口 9876, 但是可以通过 -c 传入的 properties 文件参数进行覆盖
nettyServerConfig.setListenPort(9876);
if (commandLine.hasOption('c')) {
String file = commandLine.getOptionValue('c');
if (file != null) {
InputStream in = new BufferedInputStream(new FileInputStream(file));
properties = new Properties();
properties.load(in);
MixAll.properties2Object(properties, namesrvConfig);
MixAll.properties2Object(properties, nettyServerConfig);
namesrvConfig.setConfigStorePath(file);
System.out.printf("load config properties file OK, %s%n", file);
in.close();
}
}
if (commandLine.hasOption('p')) {
InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
MixAll.printObjectProperties(console, namesrvConfig);
MixAll.printObjectProperties(console, nettyServerConfig);
System.exit(0);
}
MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);
if (null == namesrvConfig.getRocketmqHome()) {
System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
System.exit(-2);
}
LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
JoranConfigurator configurator = new JoranConfigurator();
configurator.setContext(lc);
lc.reset();
configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");
log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
MixAll.printObjectProperties(log, namesrvConfig);
MixAll.printObjectProperties(log, nettyServerConfig);
// 创建 NamesrvController
final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
// remember all configs to prevent discard
controller.getConfiguration().registerConfig(properties);
return controller;
}
- 通过
commandLine
解析命令行args
参数。 - 如果指定了配置文件,那么读取配置文件中的配置项,并赋值到
namesrvConfig
和nettyServerConfig
。 - 创建
NamesrvController
实例,并将配置项数据properties
赋值到Configuration
中。
1.3 start
方法
public static NamesrvController start(final NamesrvController controller) throws Exception {
if (null == controller) {
throw new IllegalArgumentException("NamesrvController is null");
}
// 初始化 NamesrvController
boolean initResult = controller.initialize();
if (!initResult) {
// 初始化失败,就退出
controller.shutdown();
System.exit(-3);
}
// 添加钩子函数,保证 JVM 正常退出时,关闭 NamesrvController
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
@Override
public Void call() throws Exception {
controller.shutdown();
return null;
}
}));
// 开启 NamesrvController
controller.start();
return controller;
}
非常简单,先初始化 NamesrvController
;然后添加钩子函数,保证 JVM
正常退出时,关闭 NamesrvController
;最后调用 start()
方法启动。
二. NamesrvController
类
2.1 成员属性
// Namesrv 的配置项
private final NamesrvConfig namesrvConfig;
// Netty 服务端配置项
private final NettyServerConfig nettyServerConfig;
// 定时器,用来定期检查是否有不活跃的 broker,以及定期打印 kvConfigManager 中的值
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
"NSScheduledThread"));
// 储存 KV 的值
private final KVConfigManager kvConfigManager;
// namesrv 所有路由信息的管理器
private final RouteInfoManager routeInfoManager;
// 远程RPC服务服务端,用来处理远程请求命令
private RemotingServer remotingServer;
// ChannelEventListener 接口子类,监听 Netty 的 CONNECT, CLOSE, IDLE, EXCEPTION 事件,进行对应处理
private BrokerHousekeepingService brokerHousekeepingService;
// 处理远程请求的线程池执行器
private ExecutorService remotingExecutor;
// 配置项
private Configuration configuration;
// 监控 file 变化,主要用于 SSL
private FileWatchService fileWatchService;
-
namesrvConfig
和nettyServerConfig
:Namesrv
的配置项和Netty
服务端配置项。 -
scheduledExecutorService
: 定时器用来定期检查是否有不活跃的
Broker
,以及定期打印kvConfigManager
中的值。 -
kvConfigManager
:KV
值的管理器。 -
routeInfoManager
: 所有路由信息的管理器。 -
remotingServer
: 远程RPC
服务服务端,用来处理远程请求命令。 -
brokerHousekeepingService
:ChannelEventListener
接口子类。监听
Netty
的CONNECT
,CLOSE
,IDLE
,EXCEPTION
事件,进行对应处理。 -
remotingExecutor
: 处理远程请求的线程池执行器。
2.2 initialize
方法
public boolean initialize() {
// 从 kvConfig.json 文件中加载之前存储的 KV 值
this.kvConfigManager.load();
// 通过 netty 创建一个服务端
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
// 用于处理请求的线程池 remotingExecutor
this.remotingExecutor =
Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
// 注册请求命令处理器
this.registerProcessor();
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
// 每隔10秒 检查是否有不活跃的 broker
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
// 每隔10秒打印一下 kvConfigManager 中的值
NamesrvController.this.kvConfigManager.printAllPeriodically();
}
}, 1, 10, TimeUnit.MINUTES);
// 处理 SSL
if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
// Register a listener to reload SslContext
try {
fileWatchService = new FileWatchService(
new String[] {
TlsSystemConfig.tlsServerCertPath,
TlsSystemConfig.tlsServerKeyPath,
TlsSystemConfig.tlsServerTrustCertPath
},
new FileWatchService.Listener() {
boolean certChanged, keyChanged = false;
@Override
public void onChanged(String path) {
if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
log.info("The trust certificate changed, reload the ssl context");
reloadServerSslContext();
}
if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
certChanged = true;
}
if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
keyChanged = true;
}
if (certChanged && keyChanged) {
log.info("The certificate and private key changed, reload the ssl context");
certChanged = keyChanged = false;
reloadServerSslContext();
}
}
private void reloadServerSslContext() {
((NettyRemotingServer) remotingServer).loadSslContext();
}
});
} catch (Exception e) {
log.warn("FileWatchService created error, can't load the certificate dynamically");
}
}
return true;
}
- 从
kvConfig.json
文件中加载之前存储的KV
值。 - 创建一个远程
RPC
服务服务端,用来处理远程请求命令。 - 用于处理请求的线程池
remotingExecutor
。 - 注册请求命令处理器。
- 通过
scheduledExecutorService
每隔10
秒检查是否有不活跃的Broker
,以及每隔10
秒打印一下kvConfigManager
中的值。 - 最后处理
SSL
。
三. RouteInfoManager
类
3.1 成员属性
// 主题 topic 对应的队列相关信息QueueData,这里是 List 原因是每个 broker 组都有一个 QueueData。
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
// broker 组基础信息集合,包括 broker 名字,所属集群名字和主从 broker 的地址。
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
// broker 集群集合,包括所有集群名字以及它拥有所有broker 组名字。
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
// 每个 broker 的状态信息,每次收到心跳包时,都会替换对应数据。
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
// 每个 broker 对应的 FilterServer 地址列表,用于类模式消息过滤。
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
-
topicQueueTable
: 所有主题相关信息集合。- 每个主题
Topic
对应一个List<QueueData>
集合,因为一个主题有多个Broker
组。 -
QueueData
包括Broker
组的名字,这个Broker
组中当期主题Topic
对应的可读队列数量,可写队列数量,读写权限和同步标记。
- 每个主题
-
brokerAddrTable
:Broker
组基础信息集合。BrokerData
包括Broker
组名字,所属集群名字和主从Broker
的地址。 -
clusterAddrTable
:Broker
集群集合,key
是集群名字,value
是集群拥有所有的Broker
组名字。 -
brokerLiveTable
: 每个Broker
的状态信息。 -
filterServerTable
: 每个Broker
对应的FilterServer
地址列表,用于类模式消息过滤。
3.1.1 QueueData
类
public class QueueData implements Comparable<QueueData> {
// broker组的名字
private String brokerName;
// 可读队列数量
private int readQueueNums;
// 可写队列数量
private int writeQueueNums;
// 读写权限,具体参考 PermName,
private int perm;
// 主题Topic 同步标记; 参考TopicSysFlag类: FLAG_UNIT = 0x1 << 0, FLAG_UNIT_SUB = 0x1 << 1
private int topicSynFlag;
}
3.1.2 BrokerData
类
public class BrokerData implements Comparable<BrokerData> {
// broker 组所属集群的名字
private String cluster;
// broker 组的名字
private String brokerName;
// broker 组中所有 broker 的地址;其中 brokerId = 0 表示主 broker,其他的都是从 broker。
private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;
}
3.1.3 BrokerLiveInfo
类
class BrokerLiveInfo {
// 最近一次更新时间,用来判断这个 broker 是否活跃
private long lastUpdateTimestamp;
// 这个 broker 的数据版本,可以用来判断这个 Broker 的数据是否改变过。
private DataVersion dataVersion;
// 连接这个 broker 的通道channel
private Channel channel;
// 该 broker 的 HaServer地址
private String haServerAddr;
}
3.2 重要方法
3.2.1 registerBroker
方法
/**
* 注册 Broker
*/
public RegisterBrokerResult registerBroker(
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId,
final String haServerAddr,
final TopicConfigSerializeWrapper topicConfigWrapper,
final List<String> filterServerList,
final Channel channel) {
RegisterBrokerResult result = new RegisterBrokerResult();
try {
try {
this.lock.writeLock().lockInterruptibly();
// 这个 broker 所属的集群clusterName 是否已经在 clusterAddrTable 中
Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
if (null == brokerNames) {
brokerNames = new HashSet<String>();
this.clusterAddrTable.put(clusterName, brokerNames);
}
// 因为 brokerNames 是 Set 类型,会自动去重,所以这里直接添加
brokerNames.add(brokerName);
// 是否第一次注册
boolean registerFirst = false;
// 通过 brokerName 从 brokerAddrTable 中获取对应的 BrokerData。
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
if (null == brokerData) {
registerFirst = true;
// 如果不存在,就创建新的,并存入 brokerAddrTable 中。
brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
this.brokerAddrTable.put(brokerName, brokerData);
}
Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();
//Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>
//The same IP:PORT must only have one record in brokerAddrTable
// 处理 slave 变成 master 的情况
Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator();
while (it.hasNext()) {
Entry<Long, String> item = it.next();
// 如果 brokerAddr 相同,但是 brokerId 不一样,说明这个 broker 修改 brokerId,
// 那么就先把它从 brokerAddrsMap 集合中移除。
if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) {
it.remove();
}
}
String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
registerFirst = registerFirst || (null == oldAddr);
/**
* 只有当 topicConfigWrapper 不为null且必须是master节点,才能进入
*/
if (null != topicConfigWrapper
&& MixAll.MASTER_ID == brokerId) {
// 当 topicConfigWrapper 的数据版本dataVersion 和当前储存值不一样,或者是第一次注册时;
// 都需要处理 Topic
if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())
|| registerFirst) {
ConcurrentMap<String, TopicConfig> tcTable =
topicConfigWrapper.getTopicConfigTable();
if (tcTable != null) {
// 遍历 Broker 上所有的 topic 配置,改变 topicQueueTable 集合数据
for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
// 只有 Broker 组中的主节点才有可能调用到这个方法。
this.createAndUpdateQueueData(brokerName, entry.getValue());
}
}
}
}
// 更新 brokerLiveTable 中该 Broker 地址对应状态信息,表示该 Broker 地址是活跃的。
BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
new BrokerLiveInfo(
System.currentTimeMillis(),
topicConfigWrapper.getDataVersion(),
channel,
haServerAddr));
if (null == prevBrokerLiveInfo) {
log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);
}
if (filterServerList != null) {
if (filterServerList.isEmpty()) {
this.filterServerTable.remove(brokerAddr);
} else {
this.filterServerTable.put(brokerAddr, filterServerList);
}
}
if (MixAll.MASTER_ID != brokerId) {
// 如果这个Broker 是 slave 节点, 那么给它设置主节点的地址和 HaServer的地址
String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
if (masterAddr != null) {
BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
if (brokerLiveInfo != null) {
result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
result.setMasterAddr(masterAddr);
}
}
}
} finally {
this.lock.writeLock().unlock();
}
} catch (Exception e) {
log.error("registerBroker Exception", e);
}
return result;
}
注册 Broker
信息, 每个 Broker
会定时向 namesrv
发送自身的数据,就会调用到这个方法。方法流程:
- 先将这个
Broker
的brokerName
添加到集群集合clusterAddrTable
中。 - 将这个
Broker
的相关信息添加到brokerAddrTable
集合中,并判断这个Broker
是否第一次注册registerFirst
。 - 当这个
Broker
是主节点,topicConfigWrapper
的数据版本dataVersion
和当前储存值不一样,或者是第一次注册时;都需要将该Broker
的主题信息topicConfigWrapper
添加到topicQueueTable
中。 - 更新
brokerLiveTable
中该Broker
地址对应状态信息,表示该Broker
地址是活跃的。 - 如果这个
Broker
是slave
节点, 那么给它设置主节点的地址和HaServer
的地址。
总结一下:
就是按照顺序,分别改变
clusterAddrTable
,brokerAddrTable
,topicQueueTable
,brokerLiveTable
和filterServerTable
的数据。
3.2.2 unregisterBroker
方法
/**
* 取消注册Broker
*/
public void unregisterBroker(
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId) {
try {
try {
this.lock.writeLock().lockInterruptibly();
// 删除的时候,先删除简单的。
// 1. 删除 brokerLiveTable 集合中 brokerAddr 对应数据
BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.remove(brokerAddr);
log.info("unregisterBroker, remove from brokerLiveTable {}, {}",
brokerLiveInfo != null ? "OK" : "Failed",
brokerAddr
);
// 2. 删除 filterServerTable 集合中 brokerAddr 对应数据
this.filterServerTable.remove(brokerAddr);
// 3. 再根据 brokerName 处理 brokerAddrTable 集合中brokerData 数据
boolean removeBrokerName = false;
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
if (null != brokerData) {
String addr = brokerData.getBrokerAddrs().remove(brokerId);
log.info("unregisterBroker, remove addr from brokerAddrTable {}, {}",
addr != null ? "OK" : "Failed",
brokerAddr
);
if (brokerData.getBrokerAddrs().isEmpty()) {
// 如果 brokerName 只包含这一个 brokerId,被删除了;
// 那么也要从 brokerAddrTable 集合中删除这个 brokerName
this.brokerAddrTable.remove(brokerName);
log.info("unregisterBroker, remove name from brokerAddrTable OK, {}",
brokerName
);
removeBrokerName = true;
}
}
if (removeBrokerName) {
// brokerName 被删除了,要更新 clusterAddrTable 集合数据
Set<String> nameSet = this.clusterAddrTable.get(clusterName);
if (nameSet != null) {
boolean removed = nameSet.remove(brokerName);
log.info("unregisterBroker, remove name from clusterAddrTable {}, {}",
removed ? "OK" : "Failed",
brokerName);
if (nameSet.isEmpty()) {
this.clusterAddrTable.remove(clusterName);
log.info("unregisterBroker, remove cluster from clusterAddrTable {}",
clusterName
);
}
}
// 只有当 brokerName 被删除了,那么就要更新 topicQueueTable 集合了。
this.removeTopicByBrokerName(brokerName);
}
} finally {
this.lock.writeLock().unlock();
}
} catch (Exception e) {
log.error("unregisterBroker Exception", e);
}
}
- 先删除
brokerLiveTable
集合中brokerAddr
对应数据。 - 删除
filterServerTable
集合中brokerAddr
对应数据。 - 再根据
brokerName
处理brokerAddrTable
集合中BrokerData
数据。如果
BrokerData
中只包含当前这个Broker
,那么当它被删除后,那么就要从brokerAddrTable
删除这个brokerName
键。表示这个Broker
组已经不存在了。 - 当一个
Broker
组被删除后,那么就需要改变clusterAddrTable
和topicQueueTable
的数据了。- 从
clusterAddrTable
中删除这个Broker
组名字brokerName
, 如果这个集群只有这一个Broker
组,那么这个集群也要从clusterAddrTable
中删除。 - 通过
removeTopicByBrokerName()
方法,更新topicQueueTable
集合。
- 从
总结一下:
就是先删除
brokerLiveTable
和filterServerTable
中的数据,因为它们中的数据是比较独立的;然后修改brokerAddrTable
集合中数据;最后根据Broker
组是否被删除,来决定是否修改clusterAddrTable
和topicQueueTable
集合中的数据。
3.2.3 onChannelDestroy
方法
/**
* 通道 Channel 销毁时的处理
*/
public void onChannelDestroy(String remoteAddr, Channel channel) {
String brokerAddrFound = null;
if (channel != null) {
// 根据 channel,从 brokerLiveTable 中查找对应的 brokerAddr 值。
try {
try {
this.lock.readLock().lockInterruptibly();
// 从 brokerLiveTable 中,根据 channel 查找对应的Broker地址 brokerAddrFound
Iterator<Entry<String, BrokerLiveInfo>> itBrokerLiveTable =
this.brokerLiveTable.entrySet().iterator();
while (itBrokerLiveTable.hasNext()) {
Entry<String, BrokerLiveInfo> entry = itBrokerLiveTable.next();
if (entry.getValue().getChannel() == channel) {
brokerAddrFound = entry.getKey();
break;
}
}
} finally {
this.lock.readLock().unlock();
}
} catch (Exception e) {
log.error("onChannelDestroy Exception", e);
}
}
if (null == brokerAddrFound) {
brokerAddrFound = remoteAddr;
} else {
log.info("the broker's channel destroyed, {}, clean it's data structure at once", brokerAddrFound);
}
if (brokerAddrFound != null && brokerAddrFound.length() > 0) {
// 根据 brokerAddr 的值,进行移除操作
try {
try {
this.lock.writeLock().lockInterruptibly();
// 先移除 brokerLiveTable 和 filterServerTable 集合中,
// brokerAddrFound 对应数据
this.brokerLiveTable.remove(brokerAddrFound);
this.filterServerTable.remove(brokerAddrFound);
String brokerNameFound = null;
boolean removeBrokerName = false;
Iterator<Entry<String, BrokerData>> itBrokerAddrTable =
this.brokerAddrTable.entrySet().iterator();
// 根据Broker地址 brokerAddrFound 从brokerAddrTable 中删除这个 Broker
while (itBrokerAddrTable.hasNext() && (null == brokerNameFound)) {
BrokerData brokerData = itBrokerAddrTable.next().getValue();
Iterator<Entry<Long, String>> it = brokerData.getBrokerAddrs().entrySet().iterator();
while (it.hasNext()) {
Entry<Long, String> entry = it.next();
Long brokerId = entry.getKey();
String brokerAddr = entry.getValue();
if (brokerAddr.equals(brokerAddrFound)) {
// 找到了,就删除这个 Broker 地址
brokerNameFound = brokerData.getBrokerName();
it.remove();
log.info("remove brokerAddr[{}, {}] from brokerAddrTable, because channel destroyed",
brokerId, brokerAddr);
break;
}
}
if (brokerData.getBrokerAddrs().isEmpty()) {
removeBrokerName = true;
itBrokerAddrTable.remove();
log.info("remove brokerName[{}] from brokerAddrTable, because channel destroyed",
brokerData.getBrokerName());
}
}
if (brokerNameFound != null && removeBrokerName) {
// 如果 brokerName 被删除了,那么就要改变 clusterAddrTable 集合了
Iterator<Entry<String, Set<String>>> it = this.clusterAddrTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, Set<String>> entry = it.next();
String clusterName = entry.getKey();
Set<String> brokerNames = entry.getValue();
boolean removed = brokerNames.remove(brokerNameFound);
if (removed) {
log.info("remove brokerName[{}], clusterName[{}] from clusterAddrTable, because channel destroyed",
brokerNameFound, clusterName);
if (brokerNames.isEmpty()) {
log.info("remove the clusterName[{}] from clusterAddrTable, because channel destroyed and no broker in this cluster",
clusterName);
it.remove();
}
break;
}
}
}
if (removeBrokerName) {
// 删除了 brokerName,那么就要删除 topicQueueTable 中所有这个 brokerName 对应的QueueData
Iterator<Entry<String, List<QueueData>>> itTopicQueueTable =
this.topicQueueTable.entrySet().iterator();
while (itTopicQueueTable.hasNext()) {
Entry<String, List<QueueData>> entry = itTopicQueueTable.next();
String topic = entry.getKey();
List<QueueData> queueDataList = entry.getValue();
Iterator<QueueData> itQueueData = queueDataList.iterator();
while (itQueueData.hasNext()) {
QueueData queueData = itQueueData.next();
if (queueData.getBrokerName().equals(brokerNameFound)) {
itQueueData.remove();
log.info("remove topic[{} {}], from topicQueueTable, because channel destroyed",
topic, queueData);
}
}
if (queueDataList.isEmpty()) {
// 如果删除后,这个 queueDataList 为空,
// 说明这个 Topic 没有对应的 QueueData,也应该删除。
itTopicQueueTable.remove();
log.info("remove topic[{}] all queue, from topicQueueTable, because channel destroyed",
topic);
}
}
}
} finally {
this.lock.writeLock().unlock();
}
} catch (Exception e) {
log.error("onChannelDestroy Exception", e);
}
}
}
这个方法和 unregisterBroker
很像,区别是:
- 通过
channel
从brokerLiveTable
找到对应的Broker
地址brokerAddr
。 - 删除
brokerAddrTable
集合数据时,是通过brokerAddr
进行匹配的,而不是brokerId
; - 其他的操作流程和
unregisterBroker
一样。
3.2.4 scanNotActiveBroker
方法
public void scanNotActiveBroker() {
Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, BrokerLiveInfo> next = it.next();
long last = next.getValue().getLastUpdateTimestamp();
// 如果某个 Broker 超过 BROKER_CHANNEL_EXPIRED_TIME(1000 * 60 * 2) 没有接收到信息,
// 那么我们就认为这个 Broker 已经出现问题,删除它
if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
RemotingUtil.closeChannel(next.getValue().getChannel());
it.remove();
log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
}
}
}
扫描不活跃的 Broker
,并将它进行关闭。
如果某个
Broker
超过BROKER_CHANNEL_EXPIRED_TIME
(1000 * 60 * 2
) 没有接收到信息。那么我们就认为这个Broker
已经出现问题,就关闭它。