Rocketmq源码-namesrv模块详解

Rocketmq 使用 namesrv 来管理所有的元数据信息,包括主题 Topic 路由信息和 Broker 信息。
首先我们介绍一下一些基础概念:

  1. Broker : 储存消息的服务器。
    • 分为主从两种模式,通过 brokerId 来区分,目前 brokerId = 0 就表示主节点。
    • 每个 Broker 启动时,会向 namesrv 注册自己的信息,并会定期发送心跳信息。
  2. Broker 组 : 相同 brokerName 名字的 Broker 服务器就是一个组的。

    注意: 这里就有一个小问题,如果两个 Broker 有相同brokerName名字,而且 brokerId 都是 0 时,它们都可以向 namesrv 注册自己信息,后面覆盖前面信息,而且因为它们都会发送心跳消息,就会导致不断地相互覆盖。

  3. Broker 集群 : 有相同 clusterName 名字的Broker 服务器就是同一个集群的。
  4. Topic : 主题 Topic 是以 Broker 组进行区分的。
    • Broker 组有一个 TopicConfigManager 来管理该 Broker 所拥有的所有主题 Topic 信息,包括主题Topic 的权限perm,读队列数量readQueueNumswriteQueueNums 写队列数量等等。
    • Broker 组中主 Broker 创建主题 Topic 在这个Broker 组拥有队列文件Queue,从 Broker 只是复制主 Broker
    • 生产者发送消息时,也是从主题 Topic 拥有的Broker 组数组中,挑选一个Broker 组,向这个Broker 组的主 Broker 发送消息,然后主 Broker 再将这些数据发送给从 Broker

一. NamesrvStartup

这个类是 namesrv 的启动类,用来开启 namesrv

1.1 main 方法

 public static void main(String[] args) {
        main0(args);
    }

    public static NamesrvController main0(String[] args) {

        try {
            // 创建 NamesrvController
            NamesrvController controller = createNamesrvController(args);
            // 启动 NamesrvController
            start(controller);
            String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
            log.info(tip);
            System.out.printf("%s%n", tip);
            return controller;
        } catch (Throwable e) {
            e.printStackTrace();
            System.exit(-1);
        }

        return null;
    }
  1. 通过 createNamesrvController() 方法创建 NamesrvController 实例。
  2. 调用 start(controller) 方法启动 NamesrvController

1.2 createNamesrvController 方法

 public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
        System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
        //PackageConflictDetect.detectFastjson();

        // 通过 commandLine 解析 args 参数
        Options options = ServerUtil.buildCommandlineOptions(new Options());
        commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
        if (null == commandLine) {
            System.exit(-1);
            return null;
        }

        // namesrv 相关的配置参数
        final NamesrvConfig namesrvConfig = new NamesrvConfig();
        // netty 相关的配置参数
        final NettyServerConfig nettyServerConfig = new NettyServerConfig();
        // 默认端口 9876, 但是可以通过 -c 传入的 properties 文件参数进行覆盖
        nettyServerConfig.setListenPort(9876);
        if (commandLine.hasOption('c')) {
            String file = commandLine.getOptionValue('c');
            if (file != null) {
                InputStream in = new BufferedInputStream(new FileInputStream(file));
                properties = new Properties();
                properties.load(in);
                MixAll.properties2Object(properties, namesrvConfig);
                MixAll.properties2Object(properties, nettyServerConfig);

                namesrvConfig.setConfigStorePath(file);

                System.out.printf("load config properties file OK, %s%n", file);
                in.close();
            }
        }

        if (commandLine.hasOption('p')) {
            InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
            MixAll.printObjectProperties(console, namesrvConfig);
            MixAll.printObjectProperties(console, nettyServerConfig);
            System.exit(0);
        }

        MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);

        if (null == namesrvConfig.getRocketmqHome()) {
            System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
            System.exit(-2);
        }

        LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
        JoranConfigurator configurator = new JoranConfigurator();
        configurator.setContext(lc);
        lc.reset();
        configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");

        log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);

        MixAll.printObjectProperties(log, namesrvConfig);
        MixAll.printObjectProperties(log, nettyServerConfig);

        // 创建 NamesrvController
        final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);

        // remember all configs to prevent discard
        controller.getConfiguration().registerConfig(properties);

        return controller;
    }
  1. 通过 commandLine 解析命令行 args 参数。
  2. 如果指定了配置文件,那么读取配置文件中的配置项,并赋值到 namesrvConfignettyServerConfig
  3. 创建 NamesrvController 实例,并将配置项数据 properties 赋值到 Configuration 中。

1.3 start 方法

    public static NamesrvController start(final NamesrvController controller) throws Exception {

        if (null == controller) {
            throw new IllegalArgumentException("NamesrvController is null");
        }

        // 初始化 NamesrvController
        boolean initResult = controller.initialize();
        if (!initResult) {
            // 初始化失败,就退出
            controller.shutdown();
            System.exit(-3);
        }

        // 添加钩子函数,保证 JVM 正常退出时,关闭 NamesrvController
        Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
            @Override
            public Void call() throws Exception {
                controller.shutdown();
                return null;
            }
        }));

        // 开启 NamesrvController
        controller.start();

        return controller;
    }

非常简单,先初始化 NamesrvController;然后添加钩子函数,保证 JVM 正常退出时,关闭 NamesrvController;最后调用 start() 方法启动。

二. NamesrvController

2.1 成员属性

    // Namesrv 的配置项
    private final NamesrvConfig namesrvConfig;

    // Netty 服务端配置项
    private final NettyServerConfig nettyServerConfig;

    // 定时器,用来定期检查是否有不活跃的 broker,以及定期打印 kvConfigManager 中的值
    private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
        "NSScheduledThread"));
    // 储存 KV 的值
    private final KVConfigManager kvConfigManager;
    //  namesrv 所有路由信息的管理器
    private final RouteInfoManager routeInfoManager;

    // 远程RPC服务服务端,用来处理远程请求命令
    private RemotingServer remotingServer;

    // ChannelEventListener 接口子类,监听 Netty 的 CONNECT, CLOSE, IDLE, EXCEPTION 事件,进行对应处理
    private BrokerHousekeepingService brokerHousekeepingService;

    // 处理远程请求的线程池执行器
    private ExecutorService remotingExecutor;
    // 配置项
    private Configuration configuration;
    // 监控 file 变化,主要用于 SSL
    private FileWatchService fileWatchService;
  1. namesrvConfignettyServerConfig: Namesrv 的配置项和 Netty 服务端配置项。
  2. scheduledExecutorService : 定时器

    用来定期检查是否有不活跃的 Broker,以及定期打印 kvConfigManager 中的值。

  3. kvConfigManager : KV 值的管理器。
  4. routeInfoManager : 所有路由信息的管理器。
  5. remotingServer : 远程RPC服务服务端,用来处理远程请求命令。
  6. brokerHousekeepingService : ChannelEventListener 接口子类。

    监听 NettyCONNECT, CLOSE, IDLE, EXCEPTION 事件,进行对应处理。

  7. remotingExecutor : 处理远程请求的线程池执行器。

2.2 initialize 方法

    public boolean initialize() {

        // 从 kvConfig.json 文件中加载之前存储的 KV 值
        this.kvConfigManager.load();

        // 通过 netty 创建一个服务端
        this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);

        // 用于处理请求的线程池 remotingExecutor
        this.remotingExecutor =
            Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));

        // 注册请求命令处理器
        this.registerProcessor();

        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                // 每隔10秒 检查是否有不活跃的 broker
                NamesrvController.this.routeInfoManager.scanNotActiveBroker();
            }
        }, 5, 10, TimeUnit.SECONDS);

        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                // 每隔10秒打印一下 kvConfigManager 中的值
                NamesrvController.this.kvConfigManager.printAllPeriodically();
            }
        }, 1, 10, TimeUnit.MINUTES);

        // 处理 SSL
        if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
            // Register a listener to reload SslContext
            try {
                fileWatchService = new FileWatchService(
                    new String[] {
                        TlsSystemConfig.tlsServerCertPath,
                        TlsSystemConfig.tlsServerKeyPath,
                        TlsSystemConfig.tlsServerTrustCertPath
                    },
                    new FileWatchService.Listener() {
                        boolean certChanged, keyChanged = false;
                        @Override
                        public void onChanged(String path) {
                            if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
                                log.info("The trust certificate changed, reload the ssl context");
                                reloadServerSslContext();
                            }
                            if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
                                certChanged = true;
                            }
                            if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
                                keyChanged = true;
                            }
                            if (certChanged && keyChanged) {
                                log.info("The certificate and private key changed, reload the ssl context");
                                certChanged = keyChanged = false;
                                reloadServerSslContext();
                            }
                        }
                        private void reloadServerSslContext() {
                            ((NettyRemotingServer) remotingServer).loadSslContext();
                        }
                    });
            } catch (Exception e) {
                log.warn("FileWatchService created error, can't load the certificate dynamically");
            }
        }

        return true;
    }
  1. kvConfig.json 文件中加载之前存储的 KV 值。
  2. 创建一个远程RPC服务服务端,用来处理远程请求命令。
  3. 用于处理请求的线程池 remotingExecutor
  4. 注册请求命令处理器。
  5. 通过 scheduledExecutorService 每隔10秒检查是否有不活跃的 Broker,以及每隔10秒打印一下 kvConfigManager 中的值。
  6. 最后处理 SSL

三. RouteInfoManager

3.1 成员属性

    // 主题 topic 对应的队列相关信息QueueData,这里是 List 原因是每个 broker 组都有一个 QueueData。
    private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
    // broker 组基础信息集合,包括 broker 名字,所属集群名字和主从 broker 的地址。
    private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
    // broker 集群集合,包括所有集群名字以及它拥有所有broker 组名字。
    private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
    // 每个 broker 的状态信息,每次收到心跳包时,都会替换对应数据。
    private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
    // 每个 broker 对应的 FilterServer 地址列表,用于类模式消息过滤。
    private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
  1. topicQueueTable : 所有主题相关信息集合。
    • 每个主题 Topic 对应一个 List<QueueData> 集合,因为一个主题有多个 Broker 组。
    • QueueData 包括 Broker组的名字,这个 Broker组中当期主题Topic对应的可读队列数量,可写队列数量,读写权限和同步标记。
  2. brokerAddrTable : Broker 组基础信息集合。

    BrokerData 包括 Broker 组名字,所属集群名字和主从 Broker 的地址。

  3. clusterAddrTable : Broker 集群集合,key 是集群名字,value 是集群拥有所有的Broker组名字。
  4. brokerLiveTable : 每个 Broker 的状态信息。
  5. filterServerTable : 每个 Broker 对应的 FilterServer 地址列表,用于类模式消息过滤。

3.1.1 QueueData

public class QueueData implements Comparable<QueueData> {
    // broker组的名字
    private String brokerName;
    // 可读队列数量
    private int readQueueNums;
    // 可写队列数量
    private int writeQueueNums;
    // 读写权限,具体参考 PermName,
    private int perm;
    // 主题Topic 同步标记; 参考TopicSysFlag类: FLAG_UNIT = 0x1 << 0, FLAG_UNIT_SUB = 0x1 << 1
    private int topicSynFlag;
}

3.1.2 BrokerData

public class BrokerData implements Comparable<BrokerData> {
    // broker 组所属集群的名字
    private String cluster;
    // broker 组的名字
    private String brokerName;
    // broker 组中所有 broker 的地址;其中 brokerId = 0 表示主 broker,其他的都是从 broker。
    private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;
}

3.1.3 BrokerLiveInfo

class BrokerLiveInfo {
    // 最近一次更新时间,用来判断这个 broker 是否活跃
    private long lastUpdateTimestamp;
    // 这个 broker 的数据版本,可以用来判断这个 Broker 的数据是否改变过。
    private DataVersion dataVersion;
    // 连接这个 broker 的通道channel
    private Channel channel;
    // 该 broker 的 HaServer地址
    private String haServerAddr;
}

3.2 重要方法

3.2.1 registerBroker 方法

    /**
     * 注册 Broker
     */
    public RegisterBrokerResult registerBroker(
        final String clusterName,
        final String brokerAddr,
        final String brokerName,
        final long brokerId,
        final String haServerAddr,
        final TopicConfigSerializeWrapper topicConfigWrapper,
        final List<String> filterServerList,
        final Channel channel) {
        RegisterBrokerResult result = new RegisterBrokerResult();
        try {
            try {
                this.lock.writeLock().lockInterruptibly();

                // 这个 broker 所属的集群clusterName 是否已经在 clusterAddrTable 中
                Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
                if (null == brokerNames) {
                    brokerNames = new HashSet<String>();
                    this.clusterAddrTable.put(clusterName, brokerNames);
                }
                // 因为 brokerNames 是 Set 类型,会自动去重,所以这里直接添加
                brokerNames.add(brokerName);

                // 是否第一次注册
                boolean registerFirst = false;

                // 通过 brokerName 从 brokerAddrTable 中获取对应的 BrokerData。
                BrokerData brokerData = this.brokerAddrTable.get(brokerName);
                if (null == brokerData) {
                    registerFirst = true;
                    // 如果不存在,就创建新的,并存入 brokerAddrTable 中。
                    brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
                    this.brokerAddrTable.put(brokerName, brokerData);
                }
                Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();
                //Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>
                //The same IP:PORT must only have one record in brokerAddrTable
                // 处理 slave 变成 master 的情况
                Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator();
                while (it.hasNext()) {
                    Entry<Long, String> item = it.next();
                    // 如果 brokerAddr 相同,但是 brokerId 不一样,说明这个 broker 修改 brokerId,
                    // 那么就先把它从 brokerAddrsMap 集合中移除。
                    if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) {
                        it.remove();
                    }
                }

                String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
                registerFirst = registerFirst || (null == oldAddr);

                /**
                 * 只有当 topicConfigWrapper 不为null且必须是master节点,才能进入
                 */
                if (null != topicConfigWrapper
                    && MixAll.MASTER_ID == brokerId) {
                    // 当 topicConfigWrapper 的数据版本dataVersion 和当前储存值不一样,或者是第一次注册时;
                    // 都需要处理 Topic
                    if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())
                        || registerFirst) {
                        ConcurrentMap<String, TopicConfig> tcTable =
                            topicConfigWrapper.getTopicConfigTable();
                        if (tcTable != null) {
                            // 遍历 Broker 上所有的 topic 配置,改变 topicQueueTable 集合数据
                            for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
                                // 只有 Broker 组中的主节点才有可能调用到这个方法。
                                this.createAndUpdateQueueData(brokerName, entry.getValue());
                            }
                        }
                    }
                }

                // 更新 brokerLiveTable 中该 Broker 地址对应状态信息,表示该 Broker 地址是活跃的。
                BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
                    new BrokerLiveInfo(
                        System.currentTimeMillis(),
                        topicConfigWrapper.getDataVersion(),
                        channel,
                        haServerAddr));
                if (null == prevBrokerLiveInfo) {
                    log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);
                }

                if (filterServerList != null) {
                    if (filterServerList.isEmpty()) {
                        this.filterServerTable.remove(brokerAddr);
                    } else {
                        this.filterServerTable.put(brokerAddr, filterServerList);
                    }
                }

                if (MixAll.MASTER_ID != brokerId) {
                    // 如果这个Broker 是 slave 节点, 那么给它设置主节点的地址和 HaServer的地址
                    String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
                    if (masterAddr != null) {
                        BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
                        if (brokerLiveInfo != null) {
                            result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
                            result.setMasterAddr(masterAddr);
                        }
                    }
                }
            } finally {
                this.lock.writeLock().unlock();
            }
        } catch (Exception e) {
            log.error("registerBroker Exception", e);
        }

        return result;
    }

注册 Broker 信息, 每个 Broker 会定时向 namesrv 发送自身的数据,就会调用到这个方法。方法流程:

  1. 先将这个 BrokerbrokerName 添加到集群集合 clusterAddrTable 中。
  2. 将这个 Broker 的相关信息添加到 brokerAddrTable 集合中,并判断这个 Broker 是否第一次注册 registerFirst
  3. 当这个 Broker 是主节点,topicConfigWrapper 的数据版本dataVersion 和当前储存值不一样,或者是第一次注册时;都需要将该 Broker的主题信息 topicConfigWrapper 添加到 topicQueueTable 中。
  4. 更新 brokerLiveTable 中该 Broker 地址对应状态信息,表示该 Broker 地址是活跃的。
  5. 如果这个 Brokerslave 节点, 那么给它设置主节点的地址和 HaServer的地址。

总结一下:

就是按照顺序,分别改变 clusterAddrTable, brokerAddrTable, topicQueueTable,brokerLiveTablefilterServerTable 的数据。

3.2.2 unregisterBroker 方法

 /**
     * 取消注册Broker
     */
    public void unregisterBroker(
        final String clusterName,
        final String brokerAddr,
        final String brokerName,
        final long brokerId) {
        try {
            try {
                this.lock.writeLock().lockInterruptibly();
                // 删除的时候,先删除简单的。
                // 1. 删除 brokerLiveTable 集合中 brokerAddr 对应数据
                BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.remove(brokerAddr);
                log.info("unregisterBroker, remove from brokerLiveTable {}, {}",
                    brokerLiveInfo != null ? "OK" : "Failed",
                    brokerAddr
                );
                // 2. 删除 filterServerTable 集合中 brokerAddr 对应数据
                this.filterServerTable.remove(brokerAddr);

                // 3. 再根据 brokerName 处理 brokerAddrTable 集合中brokerData 数据
                boolean removeBrokerName = false;
                BrokerData brokerData = this.brokerAddrTable.get(brokerName);
                if (null != brokerData) {
                    String addr = brokerData.getBrokerAddrs().remove(brokerId);
                    log.info("unregisterBroker, remove addr from brokerAddrTable {}, {}",
                        addr != null ? "OK" : "Failed",
                        brokerAddr
                    );

                    if (brokerData.getBrokerAddrs().isEmpty()) {
                        // 如果 brokerName 只包含这一个 brokerId,被删除了;
                        // 那么也要从 brokerAddrTable 集合中删除这个 brokerName
                        this.brokerAddrTable.remove(brokerName);
                        log.info("unregisterBroker, remove name from brokerAddrTable OK, {}",
                            brokerName
                        );

                        removeBrokerName = true;
                    }
                }

                if (removeBrokerName) {
                    // brokerName 被删除了,要更新 clusterAddrTable 集合数据
                    Set<String> nameSet = this.clusterAddrTable.get(clusterName);
                    if (nameSet != null) {
                        boolean removed = nameSet.remove(brokerName);
                        log.info("unregisterBroker, remove name from clusterAddrTable {}, {}",
                            removed ? "OK" : "Failed",
                            brokerName);

                        if (nameSet.isEmpty()) {
                            this.clusterAddrTable.remove(clusterName);
                            log.info("unregisterBroker, remove cluster from clusterAddrTable {}",
                                clusterName
                            );
                        }
                    }
                    // 只有当 brokerName 被删除了,那么就要更新 topicQueueTable 集合了。
                    this.removeTopicByBrokerName(brokerName);
                }
            } finally {
                this.lock.writeLock().unlock();
            }
        } catch (Exception e) {
            log.error("unregisterBroker Exception", e);
        }
    }
  1. 先删除 brokerLiveTable 集合中 brokerAddr 对应数据。
  2. 删除 filterServerTable 集合中 brokerAddr 对应数据。
  3. 再根据 brokerName 处理 brokerAddrTable 集合中 BrokerData 数据。

    如果 BrokerData 中只包含当前这个 Broker ,那么当它被删除后,那么就要从 brokerAddrTable 删除这个 brokerName 键。表示这个 Broker 组已经不存在了。

  4. 当一个Broker 组被删除后,那么就需要改变 clusterAddrTabletopicQueueTable 的数据了。
    • clusterAddrTable 中删除这个Broker 组名字 brokerName , 如果这个集群只有这一个Broker 组,那么这个集群也要从 clusterAddrTable 中删除。
    • 通过 removeTopicByBrokerName() 方法,更新 topicQueueTable 集合。

总结一下:

就是先删除 brokerLiveTablefilterServerTable 中的数据,因为它们中的数据是比较独立的;然后修改brokerAddrTable 集合中数据;最后根据 Broker 组是否被删除,来决定是否修改clusterAddrTabletopicQueueTable 集合中的数据。

3.2.3 onChannelDestroy 方法

    /**
     * 通道 Channel 销毁时的处理
     */
    public void onChannelDestroy(String remoteAddr, Channel channel) {
        String brokerAddrFound = null;
        if (channel != null) {
            // 根据 channel,从 brokerLiveTable 中查找对应的 brokerAddr 值。
            try {
                try {
                    this.lock.readLock().lockInterruptibly();
                    // 从 brokerLiveTable 中,根据 channel 查找对应的Broker地址 brokerAddrFound
                    Iterator<Entry<String, BrokerLiveInfo>> itBrokerLiveTable =
                        this.brokerLiveTable.entrySet().iterator();
                    while (itBrokerLiveTable.hasNext()) {
                        Entry<String, BrokerLiveInfo> entry = itBrokerLiveTable.next();
                        if (entry.getValue().getChannel() == channel) {
                            brokerAddrFound = entry.getKey();
                            break;
                        }
                    }
                } finally {
                    this.lock.readLock().unlock();
                }
            } catch (Exception e) {
                log.error("onChannelDestroy Exception", e);
            }
        }

        if (null == brokerAddrFound) {
            brokerAddrFound = remoteAddr;
        } else {
            log.info("the broker's channel destroyed, {}, clean it's data structure at once", brokerAddrFound);
        }

        if (brokerAddrFound != null && brokerAddrFound.length() > 0) {

            // 根据 brokerAddr 的值,进行移除操作
            try {
                try {
                    this.lock.writeLock().lockInterruptibly();
                    // 先移除 brokerLiveTable 和 filterServerTable 集合中,
                    // brokerAddrFound 对应数据
                    this.brokerLiveTable.remove(brokerAddrFound);
                    this.filterServerTable.remove(brokerAddrFound);
                    String brokerNameFound = null;
                    boolean removeBrokerName = false;
                    Iterator<Entry<String, BrokerData>> itBrokerAddrTable =
                        this.brokerAddrTable.entrySet().iterator();
                    // 根据Broker地址 brokerAddrFound 从brokerAddrTable 中删除这个 Broker
                    while (itBrokerAddrTable.hasNext() && (null == brokerNameFound)) {
                        BrokerData brokerData = itBrokerAddrTable.next().getValue();

                        Iterator<Entry<Long, String>> it = brokerData.getBrokerAddrs().entrySet().iterator();
                        while (it.hasNext()) {
                            Entry<Long, String> entry = it.next();
                            Long brokerId = entry.getKey();
                            String brokerAddr = entry.getValue();
                            if (brokerAddr.equals(brokerAddrFound)) {
                                // 找到了,就删除这个 Broker 地址
                                brokerNameFound = brokerData.getBrokerName();
                                it.remove();
                                log.info("remove brokerAddr[{}, {}] from brokerAddrTable, because channel destroyed",
                                    brokerId, brokerAddr);
                                break;
                            }
                        }

                        if (brokerData.getBrokerAddrs().isEmpty()) {
                            removeBrokerName = true;
                            itBrokerAddrTable.remove();
                            log.info("remove brokerName[{}] from brokerAddrTable, because channel destroyed",
                                brokerData.getBrokerName());
                        }
                    }

                    if (brokerNameFound != null && removeBrokerName) {
                        // 如果 brokerName 被删除了,那么就要改变 clusterAddrTable 集合了
                        Iterator<Entry<String, Set<String>>> it = this.clusterAddrTable.entrySet().iterator();
                        while (it.hasNext()) {
                            Entry<String, Set<String>> entry = it.next();
                            String clusterName = entry.getKey();
                            Set<String> brokerNames = entry.getValue();
                            boolean removed = brokerNames.remove(brokerNameFound);
                            if (removed) {
                                log.info("remove brokerName[{}], clusterName[{}] from clusterAddrTable, because channel destroyed",
                                    brokerNameFound, clusterName);

                                if (brokerNames.isEmpty()) {
                                    log.info("remove the clusterName[{}] from clusterAddrTable, because channel destroyed and no broker in this cluster",
                                        clusterName);
                                    it.remove();
                                }

                                break;
                            }
                        }
                    }

                    if (removeBrokerName) {
                        // 删除了 brokerName,那么就要删除 topicQueueTable 中所有这个 brokerName 对应的QueueData
                        Iterator<Entry<String, List<QueueData>>> itTopicQueueTable =
                            this.topicQueueTable.entrySet().iterator();
                        while (itTopicQueueTable.hasNext()) {
                            Entry<String, List<QueueData>> entry = itTopicQueueTable.next();
                            String topic = entry.getKey();
                            List<QueueData> queueDataList = entry.getValue();

                            Iterator<QueueData> itQueueData = queueDataList.iterator();
                            while (itQueueData.hasNext()) {
                                QueueData queueData = itQueueData.next();
                                if (queueData.getBrokerName().equals(brokerNameFound)) {
                                    itQueueData.remove();
                                    log.info("remove topic[{} {}], from topicQueueTable, because channel destroyed",
                                        topic, queueData);
                                }
                            }

                            if (queueDataList.isEmpty()) {
                                // 如果删除后,这个 queueDataList 为空,
                                // 说明这个 Topic 没有对应的 QueueData,也应该删除。
                                itTopicQueueTable.remove();
                                log.info("remove topic[{}] all queue, from topicQueueTable, because channel destroyed",
                                    topic);
                            }
                        }
                    }
                } finally {
                    this.lock.writeLock().unlock();
                }
            } catch (Exception e) {
                log.error("onChannelDestroy Exception", e);
            }
        }
    }

这个方法和 unregisterBroker 很像,区别是:

  1. 通过 channelbrokerLiveTable 找到对应的 Broker 地址 brokerAddr
  2. 删除 brokerAddrTable 集合数据时,是通过 brokerAddr 进行匹配的,而不是 brokerId
  3. 其他的操作流程和 unregisterBroker 一样。

3.2.4 scanNotActiveBroker 方法

  public void scanNotActiveBroker() {
        Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<String, BrokerLiveInfo> next = it.next();
            long last = next.getValue().getLastUpdateTimestamp();
            // 如果某个 Broker 超过 BROKER_CHANNEL_EXPIRED_TIME(1000 * 60 * 2) 没有接收到信息,
            // 那么我们就认为这个 Broker 已经出现问题,删除它
            if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
                RemotingUtil.closeChannel(next.getValue().getChannel());
                it.remove();
                log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
                this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
            }
        }
    }

扫描不活跃的 Broker,并将它进行关闭。

如果某个 Broker 超过 BROKER_CHANNEL_EXPIRED_TIME(1000 * 60 * 2) 没有接收到信息。那么我们就认为这个 Broker 已经出现问题,就关闭它。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,837评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,551评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,417评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,448评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,524评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,554评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,569评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,316评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,766评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,077评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,240评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,912评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,560评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,176评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,425评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,114评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,114评论 2 352

推荐阅读更多精彩内容