(01)RocketMQ - NameServer漫谈源码一

缘起

再一次面试过程中,和蔼可亲的面试官在聊完Kafka后,转而继续问RocketMQ的相关细节,因此就再特地的想详细的看看相关的具体实现原理。本文并非严格意义上的源码流程分析,涉及到的技术都会学习和研究一下。

早些年RocketMQ虽然开源,但是阿里内部使用的版本和社区版本还是有点区别,但现在RocketMQ已经入驻Apache基金委员会了,而且顺利孵化毕业。我从官网查到NameServerRocketMQ中的作用主要有以下2点(摘抄自官网):

  • Broker Management: NameServer accepts the register from Broker cluster and provides heartbeat mechanism to check whether a broker is alive.

  • Routing Management: each NameServer will hold whole routing info about the broker cluster and the queue info for clients query.

通过上述的描述,我们大体可以知道NameServer并不涉及过多的核心消息服务,充当的是Peer之间的协议与同步任务。

按照阿里系一贯的架构风格,比如dubbo就是注册中心 - 服务提供者- 服务消费者 三者之间的注册中心就等价于NameServer,只是dubbo官方推荐使用的是zookeeper作为注册中心,而RocketMQNameServer则是基于Netty来实现的。

NameServer的初体验

我们先从官方的例子摘出启动NameServer的代码:


public class NameServerInstanceTest {

    /**

    * 临时使用RocketMQ的内部的日志类

    */

    private static final InternalLogger LOG = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);

    public static void main(String[] args) {

        final int processID = getProcessID();

        final String uuid = UUID.randomUUID().toString();

        LOG.info("准备启动RocketMQ, 服务类型[type=MQ], 进程号是[pid={}], 全局[id={}]", processID, uuid);

        // NameServerConfig 的设置

        final NamesrvConfig namesrvConfig = new NamesrvConfig();

        // NettyServerConfig 的设置

        final NettyServerConfig nettyServerConfig = new NettyServerConfig();

        // Netty的监听设置端口

        nettyServerConfig.setListenPort(9876);

        // 创建 NameServerController

        NamesrvController namesrvController = new NamesrvController(namesrvConfig, nettyServerConfig);

        // 初始化

        boolean initialized = namesrvController.initialize();

        if (initialized) {

            try {

                // 启动

                namesrvController.start();

            } catch (Exception ex) {

                String errorMsg = String.format("启动RocketMQ失败, 服务类型[type=MQ], 进程号是[pid=%d], 全局[id=%s]", processID, uuid);

                LOG.error(errorMsg, ex);

                namesrvController.shutdown();

            }

        }

        // 注册个停机钩子, 确保能够优雅关机

        Runtime.getRuntime()

                .addShutdownHook(new Thread(() -> {

                    namesrvController.shutdown();

                    LOG.info("关闭RocketMQ成功, 服务类型[type=MQ], 进程号是[pid={}], 全局[id={}]", processID, uuid);

                }));

    }

    /**

    * 获取当前Java进程ID

    *

    * @return 返回当前Java进程ID

    */

    private static int getProcessID() {

        RuntimeMXBean runtimeMXBean = ManagementFactory.getRuntimeMXBean();

        return Integer.parseInt(runtimeMXBean.getName().split("@")[0]);

    }

}

在代码执行到namesrvController.initialize()之前都是各种内部配置的准备工作,进入namesrvController.initialize()内部,我们会发现做了不少准备工作操作,其中有两个线程已经启动了:


public boolean initialize() {

    // 加载KVConfigManager

    this.kvConfigManager.load();

    // 初始化但不启动Netty

    this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);

    /**

    *  这个指的是对于RocketMQ自定义注册事件之外的漏网之鱼,有一个默认的处理器去处理

    *

    *  @see org.apache.rocketmq.client.impl.MQClientAPIImpl

    */

    this.remotingExecutor =

        Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));

    this.registerProcessor();

    // 线程一: 定期扫描不活跃的broker

    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        // 每个NameServer节点都只扫描自己的链路信息,集群内的NameServer节点相互不会同步数据

        @Override

        public void run() {

                NamesrvController.this.routeInfoManager.scanNotActiveBroker();

        }

    }, 5, 10, TimeUnit.SECONDS);

    // 线程二: 定期打印KVConfigManager的信息

        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            // 为了方便观察输出, 临时改成5秒输出一次

            @Override

            public void run() {

                NamesrvController.this.kvConfigManager.printAllPeriodically();

            }

    }, 1, 5, TimeUnit.SECONDS);

    // SSL启动配置忽略

}

从这初步也能看出,所谓RocketMQNameServer集群中的节点其实都是相互之间不共享任何数据并且是无状态,官网明确提到了该架构下的生产者和消费者由于都往代码中指定的所有NameServer发送心跳信息,因此这些生产者和消费者也能从任意的NameServer获取到全部的元数据信息。

这里需要注意的就是,NameServer的个数决定了心跳数的个数。比如1个NameServer,3个broker,2个producer和5个consumer的部署架构,就是每次1*10=10个心跳包,此时如果NameServer为了避免单点改成了3个,那就是3*10=30个心跳包了。对于天然就是为了解决C10K问题的Netty来说,单机支撑几千个心跳包数据量的连接不是问题,所以设计简洁的的NameServer一般不会成为集群的瓶颈。

KVConfigManager的实现

KVConfigManager主要作用就是对基于存放key=namespace,value=HashMap的一个Hash表configTable进行读写操作。


public class KVConfigManager {



    private final NamesrvController namesrvController;

    // 读写锁

    private final ReadWriteLock lock = new ReentrantReadWriteLock();

    // 配置Hash表

    private final HashMap<String/* Namespace */, HashMap<String/* Key */, String/* Value */>> configTable =

        new HashMap<String, HashMap<String, String>>();



    // 对configTable的操作部分先省略

    ...   

}       

可以通过RemotingCommand(id=RequestCode.PUT_KV_CONFIG)命令发送到NameServer,可以实时对其configTable进行put操作,增加Key-Value键值对,然后在下一次其他节点发送心跳命令RemotingCommand(id=RequestCode.REGISTER_BROKER)后,把设置好的这些Key-Value作为Body返回回去。


// DefaultRequestProcessor的processRequest方法片段

// 从configTable取出该namespace的Key-Value值

byte[] jsonValue = this.namesrvController.getKvConfigManager().getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG);

// 然后设置到响应的body中

response.setBody(jsonValue);

response.setCode(ResponseCode.SUCCESS);

response.setRemark(null);

return response;

通过源码,发现涉及到namespace的参数都是hard-code成了一个静态常量NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG="ORDER_TOPIC_CONFIG",由于注释和文档有限,这里只能是猜测先前的设计是可以通节点之间通信的RemotingCommand,对configTable进行写操作,然后在返回给,比如按照namespace来对brokerproducerconsumer进行隔离,但可能是这个需求又不需要或者计划以后实现就搁浅了。至少是目前我看的4.7.*版本,还是这样。但是

刚刚提到内部采用了JUC的读写锁ReadWriteLock结合普通的HashMap来实现,那么现在问题来了,为什么不直接使用ConcurrentHashMap来实现呢?

我们先看一下RocketMQ是如何使用读写锁ReadWriteLock的:


// 设置或更新KV值

public void putKVConfig(final String namespace, final String key, final String value) {

    try {

        // 使用可以中断的写锁

        this.lock.writeLock().lockInterruptibly();

        try {

            HashMap<String, String> kvTable = this.configTable.get(namespace);

            // 如果configTable为空, 则创建一个新的

            if (null == kvTable) {

                kvTable = new HashMap<String, String>();

                this.configTable.put(namespace, kvTable);

                log.info("putKVConfig create new Namespace {}", namespace);

            }

            // 把最新的值添加或覆盖进去

            final String prev = kvTable.put(key, value);

            if (null != prev) {

                log.info("putKVConfig update config item, Namespace: {} Key: {} Value: {}",

                    namespace, key, value);

            } else {

                log.info("putKVConfig create new config item, Namespace: {} Key: {} Value: {}",

                    namespace, key, value);

            }

        } finally {

            this.lock.writeLock().unlock();

        }

    } catch (InterruptedException e) {

        log.error("putKVConfig InterruptedException", e);

    }

    // 持久化到硬盘上

    this.persist();

}



// 持久化操作

public void persist() {

    try {

        this.lock.readLock().lockInterruptibly();

        try {

            // 在FastJSON上进行了封装

            KVConfigSerializeWrapper kvConfigSerializeWrapper = new KVConfigSerializeWrapper();

            kvConfigSerializeWrapper.setConfigTable(this.configTable);

            String content = kvConfigSerializeWrapper.toJson();

            // 写入磁盘操作

            if (null != content) {

                // 写入磁盘的逻辑是

                // 1: 用内存中的configTable生成一个临时文件tmp,同时把旧的kvConfig.json中的内容读取出来,写入到一个bak的备份文件

                // 2: 把旧的kvConfig.json文件删除

                // 3: 把最新的tmp文件重名回kvConfig.json

                MixAll.string2File(content, this.namesrvController.getNamesrvConfig().getKvConfigPath());

            }

        } catch (IOException e) {

            log.error("persist kvconfig Exception, "

                + this.namesrvController.getNamesrvConfig().getKvConfigPath(), e);

        } finally {

            this.lock.readLock().unlock();

        }

    } catch (InterruptedException e) {

        log.error("persist InterruptedException", e);

    }

}   

从这里可以看出,使用读写锁ReentrantReadWriteLock而不是ConcurrentHashMap是因为,除了对集合Map操作外,还需要进行诸如序列化、磁盘同步回写等操作。那为什么不用synchronized关键字呢,不是说已经得到增强,在高JDK版本下性能反而更好?因为从代码可以看出代码在尝试获取锁的时候是可以允许中断的,而synchronized关键字等待则是不能被中断。所以对于这样的并发场景下,框架开发人员采用了ReentrantReadWriteLock

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,335评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,895评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,766评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,918评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,042评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,169评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,219评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,976评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,393评论 1 304
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,711评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,876评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,562评论 4 336
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,193评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,903评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,142评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,699评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,764评论 2 351

推荐阅读更多精彩内容