1.解析配置文件,填充NameServerConfig和NettyServerConfig属性值
NameServerConfig属性
/**
* rocketMQ 主目录
*/
private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
/**
* KV配置属性的持久化路径
*/
private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json";
/**
* 默认配置路径,一般有-c option
*/
private String configStorePath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "namesrv.properties";
private String productEnvName = "center";
private boolean clusterTest = false;
private boolean orderMessageEnable = false;
NettyServerConfig属性
/**
* 监听端口
*/
private int listenPort = 8888;
/**
* netty业务线程数
*/
private int serverWorkerThreads = 8;
/**
* Netty public任务线程池线程数,Netty网络设计根据业务类型会创建不同的线程池,
* 比如处理消息、消费消息、心跳检测等。如果该业务类型(RequestCode)未注册线程池,
* 则由public线程执行
*/
private int serverCallbackExecutorThreads = 0;
/**
* IO线程池线程池
*/
private int serverSelectorThreads = 3;
/**
* sendOneWay消息并发度
*/
private int serverOnewaySemaphoreValue = 256;
/**
* 异步消息最大并发度
*/
private int serverAsyncSemaphoreValue = 64;
/**
* 网络最大空闲时间默认120s
*/
private int serverChannelMaxIdleTimeSeconds = 120;
/**
* 网络socket发送缓存区大小,默认64k
*/
private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize;
/**
* 网络socket接受缓存区大小,默认64k
*/
private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;
/**
* ByteBuffer是否开启缓存
*/
private boolean serverPooledByteBufAllocatorEnable = true;
2.根据属性创建NameSrvController实例,并实例化对象,启动线程
3.注册JVM钩子函数并启动服务器
NameServer路由注册、故障剔除
- 路由元信息
RouterInfoManager 类的成员变量
/* Topic消息队列路由信息 */
private final HashMap<String, List<QueueData>> topicQueueTable;
/* Broker基本信息,包含BrokerName、所属集群名称、主备Broker地址 */
private final HashMap<String, BrokerData> brokerAddrTable;
/* 集群信息 */
private final HashMap<String, Set<String/* brokerName */>> clusterAddrTable;
/* Broker状态信息 */
private final HashMap<String, BrokerLiveInfo> brokerLiveTable;
/* Broker上的FliterServer列表,用户类模式消息过滤 */
private final HashMap<String, List<String>/* Filter Server */> filterServerTable;
QueueData
public class QueueData implements Comparable<QueueData> {
/**
* Broker名称
*/
private String brokerName;
/**
* 读队列的数量
*/
private int readQueueNums;
/**
* 写队列的数量
*/
private int writeQueueNums;
/**
* 操作权限
*/
private int perm;
/**
* topic同步标志
*/
private int topicSynFlag;
BrokerData
/**
* 集群名称
*/
private String cluster;
/**
* Broker名称
*/
private String brokerName;
/**
* BrokerId等于0表示Matser,大于0表示Slave
*/
private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;
class BrokerLiveInfo {
/**
* 最后更新时间
*/
private long lastUpdateTimestamp;
/**
* 数据版本
*/
private DataVersion dataVersion;
/**
* Io通道
*/
private Channel channel;
private String haServerAddr;