RocketMQ NameServer 原理分析

概述

NameServer 是RocketMQ 消息队列的状态服务器(服务发现功能),集群中的各个服务都需要通过 NameServer 来了解集群中各个服务的状态。相当于 SpringCloud 中的 Eureka 的功能。

NameServer 中维护着 Producer 集群、Broker 集群、 Consumer 集群的服务状态。通过定时发送心跳数据包进行维护更新各个服务的状态。

当有新的Producer 加入集群时,通过上报自身的服务信息,及获取各个 Broker Master的信息(Broker 地址、Topic、Queue 等信息),这样就可以决定把对应的Topic消息存储到那个Broker、哪个Queue 上。Consumer 同理。

NameServer 可以部署多个,多个NameServer互相独立,不会交换消息。Producer、Broker、Consumer 启动的时候都需要指定多个 NameServer,各个服务的信息会同时注册到多个 NameServer 上,从而能到达高可用。

NameServer 模块结构


可以看出 NameServer 中的类比较少,8个类。分析起来也比较轻松。

NameServer 启动

org.apache.rocketmq.namesrv.NamesrvStartup 是 NameServer 的启动类。


通过 createNamesrvController 方法创建 NamesrvController 。


NameServer 启动时首先判断是否传入了命令行参数。

命令行参数有两个,-p 和 -c
-c 可以指定 NameServer 的配置文件,如果不指定,则使用默认值。
-p 打印 NameServer 的配置参数信息。打印完参数后退出进程。

下面是打印 NameServer 默认的配置参数信息。


如果想修改这些默认的参数,则可以使用 -c 参数,指定配置文件,进行更改。

初始化 NamesrvController

1、调用NamesrvController.initialize() 初始化 NamesrvController,然后调用 NamesrvController.start() 方法来开启 NameServer 服务。
2、注册 ShutdownHookThread 服务。在 JVM 退出之前,调用 ShutdownHookThread 来进行关闭服务,释放资源。

注意:使用 kill -9 强制杀进程是不会执行 ShutdownHook 的。

NamesrvController.initialize()

public boolean initialize() {
    //从 /namesrv/kvConfig.json 中加载 NameServer 的配置
    this.kvConfigManager.load();
    //创建 Netty Server
    this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
    // 创建 Netty Server 执行的线程池
    this.remotingExecutor =
            Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));

    //注册 NameServer 服务接受请求的处理类
    this.registerProcessor();
    //定时清理超时的Broker
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            NamesrvController.this.routeInfoManager.scanNotActiveBroker();
        }
    }, 5, 10, TimeUnit.SECONDS);
    //定时打印 NameServer 的配置信息
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            NamesrvController.this.kvConfigManager.printAllPeriodically();
        }
    }, 1, 10, TimeUnit.MINUTES);

    ...
}
  • 1、KVConfigManager类默认是从 /namesrv/kvConfig.json 配置文件中加载NameServer的配置参数.将配置参数加载保存到HashMap<String/* Namespace */, HashMap<String/* Key */, String/* Value */>> configTable = new HashMap<String, HashMap<String, String>>();变量中。

kvConfig.json 文件的默认路径为:
private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json";

  • 2、创建并初始化 NettyRemotingServer 。
    NettyRomotingServer 是 NameServer 对外提供服务功能的。

  • 3、创建 Netty Server 执行使用的线程池。

  • 4、注册默认的处理类DefaultRequestProcessor,所有的请求均由该处理类的processRequest方法来处理。

  • 5、创建一个定时清理超时的 Broker 定时任务。
    每隔10秒检查一遍所有Broker的状态的定时任务,判断每一个Broker 最近两分钟是否更新过。如果没有更新则把该 Broker 的 channel 关闭(关闭该Broker
    的长连接),并清除相关数据。

  • 6、创建一个打印 NameServer 配置的定时任务。
    每隔10分钟打印一次NameServer的配置参数。即KVConfigManager.configTable变量的内容。

NamesrvController.registerProcessor()

注册接收请求的处理类。
private void registerProcessor() {
        if (namesrvConfig.isClusterTest()) {
            this.remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()),
                this.remotingExecutor);
        } else {
            this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor);
        }
    }

默认注册的是 DefaultRequestProcessor 处理器。
如果设置了 NamesrvConfig.clusterTest = true,则会注册 ClusterTestRequestProcessor 处理器。

ClusterTestRequestProcessor继承DefaultRequestProcessor。

ClusterTestRequestProcessor.getRouteInfoByTopic 方法

ClusterTestRequestProcessor仅重写了 getRouteInfoByTopic()方法。
判断如果获取不到 topicRouteData数据,则会去其它的NameServer 上查找该数据并返回。

DefaultRequestProcessor

通过 processRequest 方法来处理客户端发过来的请求。

@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
    RemotingCommand request) throws RemotingCommandException {
    if (ctx != null) {
        log.debug("receive request, {} {} {}",
            request.getCode(),
            RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
            request);
    }

    switch (request.getCode()) {
        case RequestCode.PUT_KV_CONFIG:
            return this.putKVConfig(ctx, request);
        case RequestCode.GET_KV_CONFIG:
            return this.getKVConfig(ctx, request);
        case RequestCode.DELETE_KV_CONFIG:
            return this.deleteKVConfig(ctx, request);
        case RequestCode.QUERY_DATA_VERSION:
            return queryBrokerTopicConfig(ctx, request);
        case RequestCode.REGISTER_BROKER:
            Version brokerVersion = MQVersion.value2Version(request.getVersion());
            if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
                return this.registerBrokerWithFilterServer(ctx, request);
            } else {
                return this.registerBroker(ctx, request);
            }
        case RequestCode.UNREGISTER_BROKER:
            return this.unregisterBroker(ctx, request);
        case RequestCode.GET_ROUTEINTO_BY_TOPIC:
            return this.getRouteInfoByTopic(ctx, request);
        case RequestCode.GET_BROKER_CLUSTER_INFO:
            return this.getBrokerClusterInfo(ctx, request);
        case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
            return this.wipeWritePermOfBroker(ctx, request);
        case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:
            return getAllTopicListFromNameserver(ctx, request);
        case RequestCode.DELETE_TOPIC_IN_NAMESRV:
            return deleteTopicInNamesrv(ctx, request);
        case RequestCode.GET_KVLIST_BY_NAMESPACE:
            return this.getKVListByNamespace(ctx, request);
        case RequestCode.GET_TOPICS_BY_CLUSTER:
            return this.getTopicsByCluster(ctx, request);
        case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS:
            return this.getSystemTopicListFromNs(ctx, request);
        case RequestCode.GET_UNIT_TOPIC_LIST:
            return this.getUnitTopicList(ctx, request);
        case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST:
            return this.getHasUnitSubTopicList(ctx, request);
        case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:
            return this.getHasUnitSubUnUnitTopicList(ctx, request);
        case RequestCode.UPDATE_NAMESRV_CONFIG:
            return this.updateConfig(ctx, request);
        case RequestCode.GET_NAMESRV_CONFIG:
            return this.getConfig(ctx, request);
        default:
            break;
    }
    return null;
}

所有请求的操作说明如下:

requectcode 说明
PUT_KV_CONFIG 向Namesrv追加KV配置
GET_KV_CONFIG 从Namesrv获取KV配置
DELETE_KV_CONFIG 从Namesrv获取KV配置
QUERY_DATA_VERSION 获取版本信息
REGISTER_BROKER 注册一个Broker,数据都是持久化的,如果存在则覆盖配置
UNREGISTER_BROKER 卸载一个Broker,数据都是持久化的
GET_ROUTEINTO_BY_TOPIC 根据Topic获取Broker Name、topic配置信息
GET_BROKER_CLUSTER_INFO 获取注册到Name Server的所有Broker集群信息
WIPE_WRITE_PERM_OF_BROKER 去掉BrokerName的写权限
GET_ALL_TOPIC_LIST_FROM_NAMESERVER 从Name Server获取完整Topic列表
DELETE_TOPIC_IN_NAMESRV 从Namesrv删除Topic配置
GET_KVLIST_BY_NAMESPACE 通过NameSpace获取所有的KV List
GET_TOPICS_BY_CLUSTER 获取指定集群下的所有 topic
GET_SYSTEM_TOPIC_LIST_FROM_NS 获取所有系统内置 Topic 列表
GET_UNIT_TOPIC_LIST 单元化相关 topic
GET_HAS_UNIT_SUB_TOPIC_LIST 获取含有单元化订阅组的 Topic 列表
GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST 获取含有单元化订阅组的非单元化
UPDATE_NAMESRV_CONFIG 更新Name Server配置

根据 processRequest 方法分析源码,发现接收到的所有请求操作的数据都保存在 RouteInfoManager 类中,所有的操作都是对RouteInfoManager 类的操作。

RouteInfoManager

public class RouteInfoManager {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
    private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
    private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
    private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
    private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
    private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

1、topicQueueTable

private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;

Map 中 key存储的是 Topic 的名称, value 存储的是 QueueData 的集合。

QueueData 的集合 size 等于 Topic 对应的 Broker Master 的个数。

QueueData 的数据结构如下:

public class QueueData implements Comparable<QueueData> {
    private String brokerName;   //broker 名字
    private int readQueueNums;   //可读 queue 数
    private int writeQueueNums;  //可写 queue 数 
    private int perm;  //读写权限
    private int topicSynFlag;  //同步标识

2、brokerAddrTable

private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;

Map 中 key 存储的是 Broker Name, value 存储的是 BrokerData 数据(Broker 的相关信息)。

BrokerData 的数据结构如下:

public class BrokerData implements Comparable<BrokerData> {
    private String cluster;  // 集群名称
    private String brokerName;  // Broker Name
    // 存储的是该 Broker Name 对应的多个 Broker 地址信息。
    private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;

因为相同的名称的 BrokerName 可以多有个。一个 Master 和多个 Slave。所有使用 brokerAddrs 来存储相同 BrokerName 下所有的 Broker 信息(判断Master 和 Slave 的关系是通过 Master 和 Slave 名称是否相同,brokerId 为 0 的是Master, 大于0 的是 Slave)。

3、clusterAddrTable

private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;

Map 中 key存储的是 clusterName 的名称, value 存储的是 brokerName 的集合。

4、brokerLiveTable

private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;

Map 中 key 存储的是 brokerAddr 信息,value 存储的是 BrokerLiveInfo 信息,BrokerLiveInfo 中存储了 Broker 的实时状态。

class BrokerLiveInfo {
    // 最后更新时间
    private long lastUpdateTimestamp;
    private DataVersion dataVersion;
    private Channel channel;
    private String haServerAddr;

上面介绍的 NamesrvController.initialize() 中有一个schedule定时任务,每个10秒钟定时调用 scanNotActiveBroker() 方法进行扫描不活动的 Broker,并把不活动的 Broker 删除掉,就是判断的 这个 lastUpdateTimestamp 这个数据。

private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;

超过 2分钟没有更新这个值,就认为 Broker 不可用了。

5、filterServerTable

private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

Map 中 key 存储的是 brokerAddr 信息,value 存储的是 Filter Server 信息。
Filter Server 是消息的过滤服务器,一个 Broker 可以对应多个 Filter Server。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,558评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,002评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,036评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,024评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,144评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,255评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,295评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,068评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,478评论 1 305
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,789评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,965评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,649评论 4 336
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,267评论 3 318
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,982评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,223评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,800评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,847评论 2 351

推荐阅读更多精彩内容