RocketMQ学习-NameServer-2

上篇文章主要梳理了NameServer的启动器和配置信息,并复习了JVM中的关闭钩子这个知识点。这篇文章看下NameServer的其他模块。建议带着如下三个问题阅读:

  1. NameServer管理哪些信息?如何管理的?
  2. NameServer中对Netty的使用案例?
  3. NameServer中对Java并发编程使用案例?

一、NamesrvController

  1. 作用:NameServer模块的控制器
  2. 主要属性
    • namesrvConfig:name server的配置信息
    • nettyServerConfig:name server中作为netty服务端的配置
    • scheduledExecutorService:调度线程池,用于:(1)周期性检查broker信息;(2)周期性打印路由信息;这两个检查每隔5秒交替进行。
    • kvConfigManager:name server配置的操作接口
    • routeInfoManager:name server路由信息的操作接口
    • remotingServer:netty服务器
    • brokerHousekeepingService:监听连接的broker的通道的关闭或异常事件,用于清理broker信息;
    • remotingExecutor:服务端处理请求的线程池
  3. 代码如下
public class NamesrvController {
    private static final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);

    //name server的配置
    private final NamesrvConfig namesrvConfig;

    //netty server的配置定义
    private final NettyServerConfig nettyServerConfig;

    //创建一个具备调度功能的线程池,该线程池里只有一个线程,用于:(1)周期性检查broker信息;(2)周期性打印路由信息
    private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
        "NSScheduledThread"));

    //name server配置的操作接口
    private final KVConfigManager kvConfigManager;

    //name server路由信息的操作接口
    private final RouteInfoManager routeInfoManager;

    //服务器
    private RemotingServer remotingServer;

    //broker信息清理器,监听通道事件
    private BrokerHousekeepingService brokerHousekeepingService;

    //服务端处理请求的线程池
    private ExecutorService remotingExecutor;

    private Configuration configuration;
   
    //other code....
}
  1. 主要方法

    • initialize:初始化

      public boolean initialize() {
      
              this.kvConfigManager.load();
      
              this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
      
              this.remotingExecutor =
                  Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
      
              this.registerProcessor();
      
              //服务器启动后5秒,开始每隔10秒检查broker的运行状态
              this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
      
                  @Override
                  public void run() {
                      NamesrvController.this.routeInfoManager.scanNotActiveBroker();
                  }
              }, 5, 10, TimeUnit.SECONDS);
      
              //服务器启动后1秒,开始每隔10秒检查
              this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
      
                  @Override
                  public void run() {
                      NamesrvController.this.kvConfigManager.printAllPeriodically();
                  }
              }, 1, 10, TimeUnit.MINUTES);
      
              return true;
          }
      
    • registerProcessor:注册处理器

          //在name server服务器上注册请求处理器,默认是DefaultRequestProcessor
          private void registerProcessor() {
              if (namesrvConfig.isClusterTest()) {
      
                  this.remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()),
                      this.remotingExecutor);
              } else {
      
                  this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor);
              }
          }
      

    • 其他还有:构造方法、start方法、shutdown方法

  2. Java并发

    • Executors.newFixedThreadPool(),用于创建固定数量的线程池,根据线程池的运行原理:线程池启动时候没有线程,当新任务到来时就创建线程处理;由于coreSize和maxSize设置为相同大小,如果任务来的时候线程已经达到coreSize,就直接放入等待队列;keepAlive设置为0,目的是让线程数不会超过coreSize;blockqueue设置为LinkedBlockingQueue,表示是无界队列,最多可以放Integer.MAX_VALUE个任务。

      public static ExecutorService newFixedThreadPool(int nThreads,ThreadFactory threadFactory) {
          return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>(),
                                          threadFactory);
      }
      
    • 周期线程池

      NameServerController中使用了调度线程池,我们看下创建一个调度线程池的方法,即Executors.newSingleThreadScheduledExecutor(),该方法的定义如下所示:

          public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
              return new DelegatedScheduledExecutorService
                  (new ScheduledThreadPoolExecutor(1, threadFactory));
          }
      

      这种线程池的创建又委托给了DelegatedScheduledExecutorService类,这里为什么这么设计,不是太理解。不过可以看下真正创建调度线程池的代码:

          public ScheduledThreadPoolExecutor(int corePoolSize,
                                             ThreadFactory threadFactory) {
              super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
                    new DelayedWorkQueue(), threadFactory);
          }
      

      上面这个方法,关键在于两点:(1)maxSize选了Integer.MAX_VALUE;(2)任务队列使用了延迟队列;再回头去看那个委托类的代码,就可以明白,委托类包装了ScheduledExecutorService执行器,提供了延迟或周期执行的接口。

          /**
           * A wrapper class that exposes only the ScheduledExecutorService
           * methods of a ScheduledExecutorService implementation.
           */
          static class DelegatedScheduledExecutorService
                  extends DelegatedExecutorService
                  implements ScheduledExecutorService {
              private final ScheduledExecutorService e;
              DelegatedScheduledExecutorService(ScheduledExecutorService executor) {
                  super(executor);
                  e = executor;
              }
              public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
                  return e.schedule(command, delay, unit);
              }
              public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
                  return e.schedule(callable, delay, unit);
              }
              public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
                  return e.scheduleAtFixedRate(command, initialDelay, period, unit);
              }
              public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
                  return e.scheduleWithFixedDelay(command, initialDelay, delay, unit);
              }
          }
      

      找到上面几个主要类和接口的类图,再综合上面的代码,可以这么理解:Executors是一个工具类,提供了生成不同的线程池的工厂方法,其中包括newSingleThreadScheduledExecutor方法,由于ScheduledExecutorService扩展了ExecutorService接口,同时又想重用AbstractExecutorService中的一些方法,因此需要一个委托类,将ExecutorService和ScheduledExecutorService的功能整合在一个类中。

      ScheduledExecutorService.png
  1. Netty

    RemotingServer是name server中的通信服务端,在name controller初始化name server模块的时候,会将name server的请求处理器注册到netty服务器上。

二、DefaultRequestProcessor

在NameServerController中会注册请求处理器,那么name server的请求处理器实现了哪些接口呢,请看代码:

public class DefaultRequestProcessor implements NettyRequestProcessor {
    private static final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);

    protected final NamesrvController namesrvController;

    public DefaultRequestProcessor(NamesrvController namesrvController) {
        this.namesrvController = namesrvController;
    }

    @Override
    public RemotingCommand processRequest(ChannelHandlerContext ctx,
        RemotingCommand request) throws RemotingCommandException {
        if (log.isDebugEnabled()) {
            log.debug("receive request, {} {} {}",
                request.getCode(),
                RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                request);
        }

        switch (request.getCode()) {
            case RequestCode.PUT_KV_CONFIG:
                return this.putKVConfig(ctx, request);
            case RequestCode.GET_KV_CONFIG:
                return this.getKVConfig(ctx, request);
            case RequestCode.DELETE_KV_CONFIG:
                return this.deleteKVConfig(ctx, request);
            case RequestCode.REGISTER_BROKER:
                Version brokerVersion = MQVersion.value2Version(request.getVersion());
                if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
                    return this.registerBrokerWithFilterServer(ctx, request);
                } else {
                    return this.registerBroker(ctx, request);
                }
            case RequestCode.UNREGISTER_BROKER:
                return this.unregisterBroker(ctx, request);
            case RequestCode.GET_ROUTEINTO_BY_TOPIC:
                return this.getRouteInfoByTopic(ctx, request);
            case RequestCode.GET_BROKER_CLUSTER_INFO:
                return this.getBrokerClusterInfo(ctx, request);
            case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
                return this.wipeWritePermOfBroker(ctx, request);
            case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:
                return getAllTopicListFromNameserver(ctx, request);
            case RequestCode.DELETE_TOPIC_IN_NAMESRV:
                return deleteTopicInNamesrv(ctx, request);
            case RequestCode.GET_KVLIST_BY_NAMESPACE:
                return this.getKVListByNamespace(ctx, request);
            case RequestCode.GET_TOPICS_BY_CLUSTER:
                return this.getTopicsByCluster(ctx, request);
            case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS:
                return this.getSystemTopicListFromNs(ctx, request);
            case RequestCode.GET_UNIT_TOPIC_LIST:
                return this.getUnitTopicList(ctx, request);
            case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST:
                return this.getHasUnitSubTopicList(ctx, request);
            case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:
                return this.getHasUnitSubUnUnitTopicList(ctx, request);
            case RequestCode.UPDATE_NAMESRV_CONFIG:
                return this.updateConfig(ctx, request);
            case RequestCode.GET_NAMESRV_CONFIG:
                return this.getConfig(ctx, request);
            default:
                break;
        }
        return null;
    }
    //其他具体的实现方法
}

从这个代码中可以看出两个方面的内容:

  1. 如何使用Netty处理网络请求。关键数据结构:(1)RemotingCommand:自定义的协议,携带请求参数和响应(2)ChannelHandlerContext:netty的数据结构,携带channel相关的信息。设计模型:processRequest:通过请求码进行请求转发;
  2. 请求处理方法(跟协议相关,具体参见remote模块)(1)processRequest:请求分发;(2)putKVConfig:将配置信息放在内存中;(3)getKVConfig:返回配置信息(4)deleteKVConfig:删除配置信息;(5)注册broker,支持两个注册方式:带过滤服务的(MQ版本在V3_0_11之后的)、不带过滤服务的,等其他处理方法。

三、BrokerHousekeepingService

该模块实现了ChannelEventListener接口,每个broker都会跟name server建立一个连接通道,当这个通道发生异常事件时,需要及时在name server这边清理掉对应的broker信息。异常事件的类型有:(1)通道关闭时;(2)通道抛出异常时;(3)通道空闲时。

public class BrokerHousekeepingService implements ChannelEventListener {
    private static final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
    private final NamesrvController namesrvController;

    public BrokerHousekeepingService(NamesrvController namesrvController) {
        this.namesrvController = namesrvController;
    }

    @Override
    public void onChannelConnect(String remoteAddr, Channel channel) {
    }

    @Override
    public void onChannelClose(String remoteAddr, Channel channel) {
        this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
    }

    @Override
    public void onChannelException(String remoteAddr, Channel channel) {
        this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
    }

    @Override
    public void onChannelIdle(String remoteAddr, Channel channel) {
        this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
    }
}

四、RouteInfoManager

这个模块是name server的核心模块,真正管理broker、消息队列等相关信息的地方。代码如下:

public class RouteInfoManager {
    private static final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
    private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
    private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
    private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
    private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
    private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

    public RouteInfoManager() {
        this.topicQueueTable = new HashMap<String, List<QueueData>>(1024);
        this.brokerAddrTable = new HashMap<String, BrokerData>(128);
        this.clusterAddrTable = new HashMap<String, Set<String>>(32);
        this.brokerLiveTable = new HashMap<String, BrokerLiveInfo>(256);
        this.filterServerTable = new HashMap<String, List<String>>(256);
    }
    //对外暴露的方法   
}

主要属性的含义如下:

  1. BROKER_CHANNEL_EXPIRED_TIME,表示一个broker距离上次发心跳包的最长时间,即120秒;
  2. 使用可重入读写锁实现并发安全、使用轻量级的非线程安全容器实现高效并发;【这点非常重要】
  3. topicQueueTable:用于管理topic和属于这个topic的队列的映射关系;
  4. brokerAddrTable:用于管理某个broker和它对应的信息
  5. clusterAddrTable:用于管理broker集群和集群中对应的broker的映射关系
  6. brokerLiveTable:用于管理broker的存活信息
  7. filterServerTable:用于管理broker和过滤服务列表【暂不理解】

关于ReentrantReadWriteLock:

  1. 这里使用的锁是非公平锁

  2. ReentrantReadWriteLock基于Sync、ReadLock、WriteLock三个模块实现,Sync负责处理公平与否的问题。ReadLock和WriteLock通过锁外部对象ReentrantReadWriteLock来处理并发。在RoutInfoManager中的使用案例如下:

        public void deleteTopic(final String topic) {
            try {
                try {
                    this.lock.writeLock().lockInterruptibly();
                    this.topicQueueTable.remove(topic);
                } finally {
                    this.lock.writeLock().unlock();
                }
            } catch (Exception e) {
                log.error("deleteTopic Exception", e);
            }
        }
    

五、KVConfigManager

这个模块用于管理name server自己的配置信息,配置信息以json信息存放在文件中,以二维数组形式存在于内存中,请看代码:

/**
 * 管理NameServer的配置属性
 */
public class KVConfigManager {
    private static final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);

    private final NamesrvController namesrvController;

    //可重入读写锁
    private final ReadWriteLock lock = new ReentrantReadWriteLock();

    //配置表
    private final HashMap<String/* Namespace */, HashMap<String/* Key */, String/* Value */>> configTable =
        new HashMap<String, HashMap<String, String>>();

    public KVConfigManager(NamesrvController namesrvController) {
        this.namesrvController = namesrvController;
    }
    //这个类对外暴露的方法,省了
}

这个类对外暴露的方法有:

  1. load方法:将配置信息加载到内存中
  2. putKVConfig方法:将配置信息持久化
  3. deleteKVConfig方法:删除指定的配置项
  4. getKVListByNamespace和getKVConfig用于查询配置信息

参考资料

  1. 消息队列技术点梳理
  2. netty的线程模型
  3. 《Java并发编程的艺术》

本号专注于后端技术、JVM问题排查和优化、Java面试题、个人成长和自我管理等主题,为读者提供一线开发者的工作和成长经验,期待你能在这里有所收获。


javaadu
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,723评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,003评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,512评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,825评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,874评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,841评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,812评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,582评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,033评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,309评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,450评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,158评论 5 341
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,789评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,409评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,609评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,440评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,357评论 2 352

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,649评论 18 139
  • 1. Java基础部分 基础部分的顺序:基本语法,类相关的语法,内部类的语法,继承相关的语法,异常的语法,线程的语...
    子非鱼_t_阅读 31,621评论 18 399
  • 文章摘要:在业务系统中,线程池框架技术一直是用来解决多线程并发的一种有效方法。 在JDK中,J.U.C并发包下的T...
    癫狂侠阅读 2,089评论 2 21
  • 对财富的理解 14年的时候,在一家安利机构接触过财富自由,前后半年左右,我所理解大概是这几点: 1、他教你如何生活...
    肖念灵阅读 663评论 3 16
  • 某某同学买房了,某某同学结婚了,某某同学提车了。大学时期总觉得漫长的时光一下子就飞走了,而我们貌似还没有接受...
    不将就Ynm阅读 95评论 0 0