NameServer的启动

1.解析配置文件,填充NameServerConfig和NettyServerConfig属性值

NameServerConfig属性

    /**
     * rocketMQ 主目录
     */
    private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
    /**
     * KV配置属性的持久化路径
     */
    private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json";
    /**
     * 默认配置路径,一般有-c option
     */
    private String configStorePath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "namesrv.properties";
    private String productEnvName = "center";
    private boolean clusterTest = false;
    private boolean orderMessageEnable = false;

NettyServerConfig属性

  /**
     * 监听端口
     */
    private int listenPort = 8888;
    /**
     * netty业务线程数
     */
    private int serverWorkerThreads = 8;
    /**
     * Netty public任务线程池线程数,Netty网络设计根据业务类型会创建不同的线程池,
     * 比如处理消息、消费消息、心跳检测等。如果该业务类型(RequestCode)未注册线程池,
     * 则由public线程执行
     */
    private int serverCallbackExecutorThreads = 0;
    /**
     * IO线程池线程池
     */
    private int serverSelectorThreads = 3;
    /**
     * sendOneWay消息并发度
     */
    private int serverOnewaySemaphoreValue = 256;
    /**
     * 异步消息最大并发度
     */
    private int serverAsyncSemaphoreValue = 64;
    /**
     * 网络最大空闲时间默认120s
     */
    private int serverChannelMaxIdleTimeSeconds = 120;
    /**
     * 网络socket发送缓存区大小,默认64k
     */
    private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize;
    /**
     * 网络socket接受缓存区大小,默认64k
     */
    private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;
    /**
     * ByteBuffer是否开启缓存
     */
    private boolean serverPooledByteBufAllocatorEnable = true;

2.根据属性创建NameSrvController实例,并实例化对象,启动线程

3.注册JVM钩子函数并启动服务器


NameServer路由注册、故障剔除

  • 路由元信息

RouterInfoManager 类的成员变量

   /* Topic消息队列路由信息 */
    private final HashMap<String, List<QueueData>> topicQueueTable;
    /* Broker基本信息,包含BrokerName、所属集群名称、主备Broker地址 */
    private final HashMap<String, BrokerData> brokerAddrTable;
    /* 集群信息 */
    private final HashMap<String, Set<String/* brokerName */>> clusterAddrTable;
    /* Broker状态信息 */
    private final HashMap<String, BrokerLiveInfo> brokerLiveTable;
    /* Broker上的FliterServer列表,用户类模式消息过滤 */
    private final HashMap<String, List<String>/* Filter Server */> filterServerTable;

QueueData

public class QueueData implements Comparable<QueueData> {
    /**
     * Broker名称
     */
    private String brokerName;
    /**
     * 读队列的数量
     */
    private int readQueueNums;
    /**
     * 写队列的数量
     */
    private int writeQueueNums;
    /**
     * 操作权限
     */
    private int perm;
    /**
     * topic同步标志
     */
    private int topicSynFlag;

BrokerData

/**
     * 集群名称
     */
    private String cluster;
    /**
     * Broker名称
     */
    private String brokerName;
    /**
     * BrokerId等于0表示Matser,大于0表示Slave
     */
    private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;
class BrokerLiveInfo {
    /**
     *  最后更新时间
     */
    private long lastUpdateTimestamp;
    /**
     * 数据版本
     */
    private DataVersion dataVersion;
    /**
     * Io通道
     */
    private Channel channel;
    private String haServerAddr;
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容