NameServer源码分析

namesrv

NameServer介绍

NameServer是RocketMQ集群的服务配置、管理中心,负责维护Broker集群、Broker信息、主题和队列信息。

Broker启动时会向所有NameServer注册,Producer在发送消息之前先从NameServer获取Broker服务器地址列表,然后根据负载均衡算法从列表中选择一个Broker进行消息发送。NameServer与每个Broker保持长链接,如果检测到Broker宕机,则从路由列表中将其剔除。

NameServer和Zookeeper、Eureka不同,每个NameServer之间是不通信的。NameServer本身的高可用是通过部署多台NameServer服务器来实现,由于每个NameServer互不通信,在某个时刻的数据并不会完全相同,但这对消息发送不会造成任何影响,这也是RocketMQ NameServer设计的一个亮点,RocketMQ NameServer设计的亮点,追求简单高效。

NameServer包结构

namesrv包结构
NamesrvStartup启动类

最下面可以看到核心的启动类,NamesrvStartup,来看下这个启动类的main方法。

public static void main(String[] args) {
    main0(args);
}

public static NamesrvController main0(String[] args) {

    try {
        // 创建NamesrvController
        NamesrvController controller = createNamesrvController(args);
        // 启动NamesrvController
        start(controller);
        String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
        log.info(tip);
        System.out.printf("%s%n", tip);
        return controller;
    } catch (Throwable e) {
        e.printStackTrace();
        System.exit(-1);
    }

    return null;
}

在main方法中会创建NamesrvController,并启动,接下来进入创建NamesrvController的createNamesrvController(args)

public static NamesrvController createNamesrvController(final String[] args) throws IOException, JoranException {
    System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
    //PackageConflictDetect.detectFastjson();

    final Options options = ServerUtil.buildCommandlineOptions(new Options());
    commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
    if (null == commandLine) {
        System.exit(-1);
        return null;
    }

    // NameServer属性配置类
    final NamesrvConfig namesrvConfig = new NamesrvConfig();
    // NettyServer属性配置类
    final NettyServerConfig nettyServerConfig = new NettyServerConfig();
    nettyServerConfig.setListenPort(9876);
    if (commandLine.hasOption('c')) {
        final String file = commandLine.getOptionValue('c');
        if (file != null) {
            final InputStream in = new BufferedInputStream(new FileInputStream(file));
            properties = new Properties();
            properties.load(in);
            // 填充NameServer和NettyServer属性
            MixAll.properties2Object(properties, namesrvConfig);
            MixAll.properties2Object(properties, nettyServerConfig);

            namesrvConfig.setConfigStorePath(file);

            System.out.printf("load config properties file OK, %s%n", file);
            in.close();
        }
    }

    if (commandLine.hasOption('p')) {
        final InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
        // 填充NameServer和NettyServer属性
        MixAll.printObjectProperties(console, namesrvConfig);
        MixAll.printObjectProperties(console, nettyServerConfig);
        System.exit(0);
    }

    MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);

    if (null == namesrvConfig.getRocketmqHome()) {
        System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
        System.exit(-2);
    }

    final LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
    final JoranConfigurator configurator = new JoranConfigurator();
    configurator.setContext(lc);
    lc.reset();
    configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");

    log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);

    MixAll.printObjectProperties(log, namesrvConfig);
    MixAll.printObjectProperties(log, nettyServerConfig);

    // 创建NamesrvController实例 并把NamesrvConfig、NettyServerConfig注入进去
    final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);

    // remember all configs to prevent discard
    controller.getConfiguration().registerConfig(properties);

    return controller;
}

在这里会看到两个配置类NamesrvConfig和NettyServerConfig,在启动时会把制定的配置文件或启动命令中的选项值填充到这两个类的实例中。这两个类的具体参数信息,此处不再做详细解释。该方法,会完成创建NamesrvController类,创建好之后会启动NamesrvController类,即调用start方法

public static NamesrvController start(final NamesrvController controller) throws Exception {

    if (null == controller) {
        throw new IllegalArgumentException("NamesrvController is null");
    }

    // 调用NamesrvController的初始化方法
    final boolean initResult = controller.initialize();
    if (!initResult) {
        // 如果初始化失败,则调用shutdown方法
        controller.shutdown();
        System.exit(-3);
    }

    // 注册JVM钩子函数,监听Broker
    Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
        @Override
        public Void call() throws Exception {
            controller.shutdown();
            return null;
        }
    }));

    controller.start();

    return controller;
}   

这个启动方法相对容易理解,不过,注册JVM钩子函数这里是一种常见的技巧,如果我们在代码中使用了线程池,一种优雅的做法就是注册一个JVM钩子函数,在JVM关闭之前,先将线程池关闭,及时释放资源。
至此NamesrvStartup类主要内容已分析完毕。

NamesrvController

我们先看下在NamesrvStartup类调用的构造器方法

public class NamesrvController {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);

    private final NamesrvConfig namesrvConfig;

    private final NettyServerConfig nettyServerConfig;

    private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
        "NSScheduledThread"));
    private final KVConfigManager kvConfigManager;
    private final RouteInfoManager routeInfoManager;

    private RemotingServer remotingServer;

    private BrokerHousekeepingService brokerHousekeepingService;

    private ExecutorService remotingExecutor;

    private Configuration configuration;
    private FileWatchService fileWatchService;

    public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) {
        this.namesrvConfig = namesrvConfig;
        this.nettyServerConfig = nettyServerConfig;
        this.kvConfigManager = new KVConfigManager(this);
        this.routeInfoManager = new RouteInfoManager();
        this.brokerHousekeepingService = new BrokerHousekeepingService(this);
        this.configuration = new Configuration(
            log,
            this.namesrvConfig, this.nettyServerConfig
        );
        this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath");
    }
    ···省略代码
}

在构造器方法中会对一些成员变量完成初始化,下面我们对这些成员变量简单解释一下:

  • KVConfigManager:读取或变更NameServer的配置属性,加载NamesrvConfig中配置的配置文件到内存,此类一个亮点就是使用轻量级的非线程安全容器,再结合读写锁对资源读写进行保护。尽最大程度提高线程的并发度。
  • RouteInfoManager:路由信息的管理类,记录broker、topic、queue、cluster、brokerLive等信息
  • RemotingServer:远程调用服务
  • BrokerHousekeepingService:BrokerHouseKeepingService实现 ChannelEventListener接口,可以说是通道在发送异常时的回调方法(Nameserver与Broker的连接通道在关闭、通道发送异常、通道空闲时),例如移除已Down掉的Broker。
    现在看一下initialize方法
public boolean initialize() {
    // 将配置文件的内容加载内存
    this.kvConfigManager.load();

    // 初始化netty组件
    this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);

    // 创建一个线程容量为serverWorkerThreads的的线程池
    this.remotingExecutor =
            Executors.newFixedThreadPool(this.nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));

    // 将DefaultRequestProcessor和remotingExecutor绑定到一起
    this.registerProcessor();

    // 每隔10s扫描一次Broker,移除处于不激活状态的Broker
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            NamesrvController.this.routeInfoManager.scanNotActiveBroker();
        }
    }, 5, 10, TimeUnit.SECONDS);

    // 每隔10分钟打印一次kv配置
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            NamesrvController.this.kvConfigManager.printAllPeriodically();
        }
    }, 1, 10, TimeUnit.MINUTES);

    if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
        // Register a listener to reload SslContext 注册一个重新加载SslContext的监听
        try {
            this.fileWatchService = new FileWatchService(
                    new String[]{
                            TlsSystemConfig.tlsServerCertPath,
                            TlsSystemConfig.tlsServerKeyPath,
                            TlsSystemConfig.tlsServerTrustCertPath
                    },
                    new FileWatchService.Listener() {
                        boolean certChanged, keyChanged = false;

                        @Override
                        public void onChanged(final String path) {
                            if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
                                log.info("The trust certificate changed, reload the ssl context");
                                reloadServerSslContext();
                            }
                            if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
                                this.certChanged = true;
                            }
                            if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
                                this.keyChanged = true;
                            }
                            if (this.certChanged && this.keyChanged) {
                                log.info("The certificate and private key changed, reload the ssl context");
                                this.certChanged = this.keyChanged = false;
                                reloadServerSslContext();
                            }
                        }

                        private void reloadServerSslContext() {
                            ((NettyRemotingServer) NamesrvController.this.remotingServer).loadSslContext();
                        }
                    });
        } catch (final Exception e) {
            log.warn("FileWatchService created error, can't load the certificate dynamically");
        }
    }

    return true;
}

如果NamesrvController的initialize方法执行成功,会继续调用start方法

public void start() throws Exception {
    // 启动netty服务端
    this.remotingServer.start();

    if (this.fileWatchService != null) {
        this.fileWatchService.start();
    }
}

正确启动后,就可以提供路由发现、注册服务了。
下一篇会分析一下路由注册和发现的源码,敬请期待

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,254评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,875评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,682评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,896评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,015评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,152评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,208评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,962评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,388评论 1 304
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,700评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,867评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,551评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,186评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,901评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,142评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,689评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,757评论 2 351