RocketMQ源码解析(二)-nameserv

NameServ是rocketMQ的注册中心,保存所有Broker、Topic的元数据。Broker启动后会向nameserv发送心跳,nameserv也会定时检测broker的可用性,并移除不可用的broker。

Nameserv的启动过程

启动脚本

> nohup sh bin/mqnamesrv &

nameserv启动过程会将所有初始化和启动工作交给NamesrvController来完成。

NamesrvController

nameserv的主要控制类,负责初始化和后台任务启动,Controller包含的主要组件都在构造函数中做了初始化
Controller构造函数

public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) {
        //nameserv参数配置
        this.namesrvConfig = namesrvConfig; 
        //netty的参数配置
        this.nettyServerConfig = nettyServerConfig;
        this.kvConfigManager = new KVConfigManager(this);
        //初始化RouteInfoManager
        this.routeInfoManager = new RouteInfoManager();
        //监听客户端连接(Channel)的变化,通知RouteInfoManager检查broker是否有变化
        this.brokerHousekeepingService = new BrokerHousekeepingService(this);
        this.configuration = new Configuration(
            log,
            this.namesrvConfig, this.nettyServerConfig
        );
        //Nameserv的配置参数会保存到磁盘文件中
        this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath");
    }

构造函数中初始化了RouteInfoManager,这个最重要的类,负责缓存整个集群的broker信息,以及topic和queue的配置信息。
RouteInfoManager数据结构
RouteInfoManager的所有数据通过HashMap缓存在内存中,通过读写锁来控制并发更新。这样可最大程度的提高客户端查询数据的速度。数据更新时会将数据保存到文件中,重启后可恢复数据。

    //1、Topic和broker的Map,保存了topic在每个broker上的读写Queue的个数以及读写权限
    private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
    //2、注册到nameserv上的所有Broker,按照brokername分组
    private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
    //3、broker的集群对应关系
    private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
    //4、broker最新的心跳时间和配置版本号
    private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
    //5、broker和FilterServer的对应关系
    private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

从以上的信息可以看出:
1)Broker使用brokerName来标识主从关系,同一个brokerName下只能由一个master。
2)使用clusterName来判断多个broker是不是属于同一个集群。对于同一个cluster下的broker,producer在发送消息时只会选择发送给其中一个。
3)nameserv会记录brokerAddr的最后活跃时间,如果超过一定没有心跳或其他数据交互,会认为broker已下线。
4)nameserv和broker上都会保存DataVersion字段,当broker配置有变更时,DataVersion会+1。下次心跳时nameserv通过这个字段来判断配置是否有变更。
【注意】因为nameserv是用brokername来区分broker,所以注册到同一个nameserv上的多个集群,brokerName和topic也是不能重复的。
Controller initialize
启动过程中新建了一个Controller的实例后会调用它的initialize()方法:

public boolean initialize() {
        //1、初始化KVConfigManager
        this.kvConfigManager.load();
        //2、初始化netty server
        this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
        //3、客户端请求处理的线程池
        this.remotingExecutor =
            Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
       //4、注册DefaultRequestProcessor,所有的客户端请求都会转给这个Processor来处理
        this.registerProcessor();
       //5、启动定时调度,每10秒钟扫描所有Broker,检查存活状态
         this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                NamesrvController.this.routeInfoManager.scanNotActiveBroker();
            }
        }, 5, 10, TimeUnit.SECONDS);
        //6、日志打印的调度器,定时打印kvConfigManager的内容      
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                NamesrvController.this.kvConfigManager.printAllPeriodically();
            }
        }, 1, 10, TimeUnit.MINUTES);
        //7、监听ssl证书文件变化,
        if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
            // Register a listener to reload SslContext
            ...
        }

        return true;
    }

以上最重要的就是第2,4步,初始化nameserv的Server,用来接收客户端请求。所有的客户端请求都会转给第4步中注册的DefaultRequestProcessor来处理。
第5步中,启动了一个定时器来扫描RouteInfoManager中缓存的broker信息,如果broker已经长时间没有心跳信息,则认为broker已经down掉了,将其移除。
Controller启动:

   public void start() throws Exception {
        this.remotingServer.start();
        //监听ssl文件变化,可以实时更新证书
        if (this.fileWatchService != null) {
            this.fileWatchService.start();
        }
    }

启动的过程比较简单,就是启动netty server开始接收客户端请求。

DefaultRequestProcessor请求处理

前面讲过nameserv最重要的两个作用,一个是路由管理,通过RouteInfoManager管理路由信息供客户端查询。一个是Broker管理,接收broker注册并通过心跳机制检查broker的可用性。
NameServer通过DefaultRequestProcessor的processRequest方法来提供请求处理。

@Override
    public RemotingCommand processRequest(ChannelHandlerContext ctx,
        RemotingCommand request) throws RemotingCommandException {
        ...
        switch (request.getCode()) {
            ...
            //broker注册请求
            case RequestCode.REGISTER_BROKER:
                Version brokerVersion = MQVersion.value2Version(request.getVersion());
                if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
                    return this.registerBrokerWithFilterServer(ctx, request);
                } else {
                    return this.registerBroker(ctx, request);
                }
             //Broker注销请求
            case RequestCode.UNREGISTER_BROKER:
                return this.unregisterBroker(ctx, request);
            //根据topic获取broker路由信息
            case RequestCode.GET_ROUTEINTO_BY_TOPIC:
                return this.getRouteInfoByTopic(ctx, request);
            //获取broker集群信息
            case RequestCode.GET_BROKER_CLUSTER_INFO:
                return this.getBrokerClusterInfo(ctx, request);
            case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
                return this.wipeWritePermOfBroker(ctx, request);
            //获取所有topic信息
            case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:
                return getAllTopicListFromNameserver(ctx, request);
            //删除topic
            case RequestCode.DELETE_TOPIC_IN_NAMESRV:
                return deleteTopicInNamesrv(ctx, request);
            ...
        }
        return null;
    }

查询路由信息

public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx,
        RemotingCommand request) throws RemotingCommandException {
        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
        final GetRouteInfoRequestHeader requestHeader =
            (GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);
        //从RouteInfoManager中获取topic的路由信息
        TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());
        //如果支持顺序消息,则填充KVConfig信息
        if (topicRouteData != null) {
            if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) {
                String orderTopicConf =
                    this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,
                        requestHeader.getTopic());
                topicRouteData.setOrderTopicConf(orderTopicConf);
            }

            byte[] content = topicRouteData.encode();
            response.setBody(content);
            response.setCode(ResponseCode.SUCCESS);
            response.setRemark(null);
            return response;
        }

        response.setCode(ResponseCode.TOPIC_NOT_EXIST);
        response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic()
            + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
        return response;
    }

路由信息就是直接到RouteInfoManager查询,我们看下具体实现:

public TopicRouteData pickupTopicRouteData(final String topic) {
          ...
            try {
                //获取读锁
                this.lock.readLock().lockInterruptibly();
                //获取所有支持该topic的broker的queue配置
                List<QueueData> queueDataList = this.topicQueueTable.get(topic);
                if (queueDataList != null) {
                    topicRouteData.setQueueDatas(queueDataList);
                    foundQueueData = true;
                    //获取brokerName
                    Iterator<QueueData> it = queueDataList.iterator();
                    while (it.hasNext()) {
                        QueueData qd = it.next();
                        brokerNameSet.add(qd.getBrokerName());
                    }
              
                    for (String brokerName : brokerNameSet) {
                        //根据brokerName获取broker主从地址信息
                        BrokerData brokerData = this.brokerAddrTable.get(brokerName);
                        if (null != brokerData) {
                            BrokerData brokerDataClone = new BrokerData(brokerData.getCluster(), brokerData.getBrokerName(), (HashMap<Long, String>) brokerData
                                .getBrokerAddrs().clone());
                            brokerDataList.add(brokerDataClone);
                            foundBrokerData = true;
                            for (final String brokerAddr : brokerDataClone.getBrokerAddrs().values()) {
                                List<String> filterServerList = this.filterServerTable.get(brokerAddr);
                                filterServerMap.put(brokerAddr, filterServerList);
                            }
                        }
                    }
                }
            } finally {
                this.lock.readLock().unlock();
            }
            ...
    }

获取topic路由的过程就是直接从HashMap中获取缓存的broker配置。
Broker注册
Broker在启动的时候会将topic和queue的配置同步给nameserv。

public RemotingCommand registerBrokerWithFilterServer(ChannelHandlerContext ctx, RemotingCommand request)
        throws RemotingCommandException {
        ...
        //checksum,检查CRC32是否相等
        if (!checksum(ctx, request, requestHeader)) {
            response.setCode(ResponseCode.SYSTEM_ERROR);
            response.setRemark("crc32 not match");
            return response;
        }

        RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody();
        //decode request body,如果body已压缩,则先解压。如果body为空,会将topic的版本号默认置为0.
        if (request.getBody() != null) {
            try {
                registerBrokerBody = RegisterBrokerBody.decode(request.getBody(), requestHeader.isCompressed());
            } catch (Exception e) {
                throw new RemotingCommandException("Failed to decode RegisterBrokerBody", e);
            }
        } else {
            registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setCounter(new AtomicLong(0));
            registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setTimestamp(0);
        }
        //使用broker上报的信息更新nameserv的RouteInfo
        RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(
            requestHeader.getClusterName(),
            requestHeader.getBrokerAddr(),
            requestHeader.getBrokerName(),
            requestHeader.getBrokerId(),
            requestHeader.getHaServerAddr(),
            registerBrokerBody.getTopicConfigSerializeWrapper(),
            registerBrokerBody.getFilterServerList(),
            ctx.channel());
        //如果broker是slave的话,会将master address和ha server address通过response返回给broker
        responseHeader.setHaServerAddr(result.getHaServerAddr());
        responseHeader.setMasterAddr(result.getMasterAddr());
        //将Order topic的KV配置信息通过response返回
        byte[] jsonValue = this.namesrvController.getKvConfigManager().getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG);
        response.setBody(jsonValue);

        response.setCode(ResponseCode.SUCCESS);
        response.setRemark(null);
        return response;
    }

nameserv收到broker注册后,更新routeInfo过程

public RegisterBrokerResult registerBroker(
        final String clusterName,
        final String brokerAddr,
        final String brokerName,
        final long brokerId,
        final String haServerAddr,
        final TopicConfigSerializeWrapper topicConfigWrapper,
        final List<String> filterServerList,
        final Channel channel) {
        RegisterBrokerResult result = new RegisterBrokerResult();
        try {
            try {
                this.lock.writeLock().lockInterruptibly();
                //更新cluster和broker对应关系
                Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
                if (null == brokerNames) {
                    brokerNames = new HashSet<String>();
                    this.clusterAddrTable.put(clusterName, brokerNames);
                }
                brokerNames.add(brokerName);

                boolean registerFirst = false;
                //更新brokername和brokerdata的map
                BrokerData brokerData = this.brokerAddrTable.get(brokerName);
                if (null == brokerData) {
                    registerFirst = true;
                    brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
                    this.brokerAddrTable.put(brokerName, brokerData);
                }
                String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
                registerFirst = registerFirst || (null == oldAddr);
                //如果是master broker,第一次注册或者是topic信息发生变化了,更新topicQueueTable
                if (null != topicConfigWrapper
                    && MixAll.MASTER_ID == brokerId) {
                    if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())
                        || registerFirst) {
                        ConcurrentMap<String, TopicConfig> tcTable =
                            topicConfigWrapper.getTopicConfigTable();
                        if (tcTable != null) {
                            for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
                                this.createAndUpdateQueueData(brokerName, entry.getValue());
                            }
                        }
                    }
                }
                //更新broker的心跳时间
                BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
                    new BrokerLiveInfo(
                        System.currentTimeMillis(),
                        topicConfigWrapper.getDataVersion(),
                        channel,
                        haServerAddr));
                if (null == prevBrokerLiveInfo) {
                    log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);
                }
                //更新filter server table
                if (filterServerList != null) {
                    if (filterServerList.isEmpty()) {
                        this.filterServerTable.remove(brokerAddr);
                    } else {
                        this.filterServerTable.put(brokerAddr, filterServerList);
                    }
                }
                //如果是slave broker注册,如果master存在,则返回master broker信息
                if (MixAll.MASTER_ID != brokerId) {
                    String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
                    if (masterAddr != null) {
                        BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
                        if (brokerLiveInfo != null) {
                            result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
                            result.setMasterAddr(masterAddr);
                        }
                    }
                }
            } finally {
                this.lock.writeLock().unlock();
            }
        } catch (Exception e) {
            log.error("registerBroker Exception", e);
        }

        return result;
    }

总结

1、nameserv通过clusterName来判断broker是不是属于同一个集群
2、nameserv通过brokerName来判断两个broker是不是主从关系
3、对于相同的brokerName,只有一个master(id=0),不同的slave必须使用不同的Id (id>0)
4、NameServ只会保存master的topic配置信息,因为理论上slave的topic信息是从master同步过去的
5、所有的topic信息以broker上报为准,broker在启动的时候是不会去nameserv获取topic配置的,只会从自己持久化文件中加载。所以,一个新的broker启动后默认只有System topic信息。如果broker是新的,或者broker在挂掉一段时间重启topic不是最新的话,只能通过客户端更新topic来使broker能加入到正常的消息收发中。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,864评论 6 494
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,175评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,401评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,170评论 1 286
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,276评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,364评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,401评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,179评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,604评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,902评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,070评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,751评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,380评论 3 319
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,077评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,312评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,924评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,957评论 2 351

推荐阅读更多精彩内容

  • 姓名:周小蓬 16019110037 转载自:http://blog.csdn.net/YChenFeng/art...
    aeytifiw阅读 34,719评论 13 425
  • kafka的定义:是一个分布式消息系统,由LinkedIn使用Scala编写,用作LinkedIn的活动流(Act...
    时待吾阅读 5,314评论 1 15
  • 核心组件(4个组件+消息存储结构) 客户端消费模式 1. MQ的使用场景 昨天在写完之后,有些读者在评论中提出:到...
    楼亭樵客阅读 1,030评论 0 3
  • 有一天,球球问我:妈妈,什么是旅行啊?我就跟他说:旅行就是从自己待腻的地方到别人待腻的地方去待一待,然后再回到自己...
    麦子妹妹宁阅读 770评论 0 0
  • 大大咧咧的我,没心没肺!可是最近一年多,忽然开始觉得人言可畏起来。暗箭伤人,背后下刀的招数在比武场上姑且是...
    就叫我田田阅读 255评论 0 0