概述
NameServer 是 RocketMQ 集群中的核心组件,负责管理
Broker
和Topic
的路由信息,同时提供服务发现的功能。作为一个轻量级的无状态服务,NameServer
主要用于记录和维护各个Broker
的注册信息,并为生产者(Producer
)和消费者(Consumer
)提供实时的路由查询服务。每个Broker
在启动后会主动向NameServer
注册自己的状态,并定期发送心跳以保证信息的实时更新。当Producer
或Consumer
需要发送或消费消息时,首先会从NameServer
获取最新的路由信息。
启动流程
NameServer 的启动入口是 NamesrvStartup ,启动过程中涉及配置加载、初始化并启动 Netty 服务端、创建路由信息管理服务、启动定时任务监测 Broker 是否下线等多个步骤。
#NamesrvStartup
public static NamesrvController main0(String[] args) {
// ...
// 创建NamesrvController
NamesrvController controller = createNamesrvController(args);
// 启动NamesrvController
start(controller);
// ...
}
启动时,首先会调用createNamesrvController()
方法创建NamesrvController
,NamesrvController
是 NameServer
的核心控制组件,负责初始化和启动 NameServer
的各个组件;紧接着,会调用start()
方法,start()
方法内部调用了NamesrvController
的initialize()方法和start()方法。可以看出,NamesrvController
在 NameServer
中起着重要作用。
创建NamesrvController
上文提到过,NamesrvController
负责初始化和启动 NameServer
的各个组件。那么具体有哪些组件,以及这些组件各自的作用是什么呢?接下来我会逐步进行详细讲解。首先,让我们来看一下 NamesrvController
的构造方法:
#NamesrvController
public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) {
this.namesrvConfig = namesrvConfig;
this.nettyServerConfig = nettyServerConfig;
this.kvConfigManager = new KVConfigManager(this);
this.routeInfoManager = new RouteInfoManager();
this.brokerHousekeepingService = new BrokerHousekeepingService(this);
this.configuration = new Configuration(log, this.namesrvConfig, this.nettyServerConfig);
this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath");
}
在创建 NamesrvController
时,首先会解析命令行参数,从中读取配置文件,并将其内容加载到 namesrvConfig
和 nettyServerConfig
中。namesrvConfig
主要存储 NameServer
的配置参数,例如是否开启顺序消息等。nettyServerConfig
则用于配置 Netty
网络服务的参数,包括监听端口、I/O 线程数等。kvConfigManager
负责管理 Key-Value 类型的配置数据,提供动态配置支持,使得在运行时可以更新某些配置,而无需重启 NameServer
。routeInfoManager
用于管理和维护 Broker
的路由信息。brokerHousekeepingService
实现了 ChannelEventListener
接口,用于监听 channel
的变化事件。关于初始化的这些组件有什么作用,将在后续进行详细介绍。
初始化NamesrvController
NameServer
启动时会调用 NamesrvController
的initialize()
方法进行初始化:
#NamesrvStartup
public static NamesrvController start(final NamesrvController controller) throws Exception {
// ...
// 初始化
boolean initResult = controller.initialize();
// ...
return controller;
}
#NamesrvController
public boolean initialize() {
// ...
// 创建Netty服务端
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
// ...
// 注册处理器
this.registerProcessor();
// 定时监测Broker是否存活
this.scheduledExecutorService.scheduleAtFixedRate(
NamesrvController.this.routeInfoManager::scanNotActiveBroker, 5, 10, TimeUnit.SECONDS);
// ...
return true;
}
代码省略了非关键步骤,初始化过程主要分以下三个步骤:
-
创建Netty服务端
Producer
、Consumer
、Broker
和NameServer
之间的通信主要是通过长连接实现的,这样可以提高通信效率,减少延迟,同时在网络环境较差的情况下也能保持稳定性。 -
注册处理器
监听并接收到来自
Producer
、Consumer
和Broker
的网络请求后,会将接收到的请求分发到相应的处理器进行处理,比如Broker注册、Broker注销、查询路由信息等请求。这些处理器就是在初始化NamesrvController
时完成注册的。 -
定时监测Broker是否下线
启动一个定时任务,定期扫描注册的
Broker
,通过检测心跳来判断它们是否仍然活跃。对于长时间没有发送心跳的Broker
,NameServer
会将其从路由表中移除,保持路由信息的准确性和实时性。
启动NamesrvController
在完成了 NamesrvController
的初始化之后,会调用 NamesrvController
的start()
方法启动 NamesrvController
:
#NamesrvController
public void start() throws Exception {
// 启动Netty服务端
this.remotingServer.start();
// ...
}
#NettyRemotingServer
public void start() {
// ...
prepareSharableHandlers();
// ...
}
Netty
服务端的启动过程不是本篇内容的重点,直接省略,重点关注prepareSharableHandlers()
方法。在该方法中指定不同的网络请求事件所对应的处理器,这些处理器则是整个NameServer
的核心:
#NettyRemotingServer
private void prepareSharableHandlers() {
// ...
serverHandler = new NettyServerHandler();
}
#NettyRemotingServer
@ChannelHandler.Sharable
class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
processMessageReceived(ctx, msg);
}
}
#NettyRemotingAbstract
public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
// ...
processRequestCommand(ctx, cmd);
// ...
}
上面省略了握手、编解码、连接管理所对应的处理器等非关键代码,只关注Producer
、Consumer
和Broker
所发送的网络请求事件对应的处理逻辑:
#NettyRemotingAbstract
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
final int opaque = cmd.getOpaque();
Runnable run = new Runnable() {
@Override
public void run() {
// ...
processor.asyncProcessRequest(ctx, cmd, callback);
// ...
};
final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
pair.getObject2().submit(requestTask);
// ...
}
defaultRequestProcessor
就是上文初始化方法中所注册的处理器:DefaultRequestProcessor
。在DefaultRequestProcessor
的processRequest()
方法中会根据请求code
的不同调用不同的处理逻辑:
#DefaultRequestProcessor
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
// ...
switch (request.getCode()) {
// ...省略对 Key-Value 配置的存储、查询、删除处理,查询Broker主题配置数据版本等操作...
// 注册Broker
case RequestCode.REGISTER_BROKER:
Version brokerVersion = MQVersion.value2Version(request.getVersion());
if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
return this.registerBrokerWithFilterServer(ctx, request);
} else {
return this.registerBroker(ctx, request);
}
// 注销Broker
case RequestCode.UNREGISTER_BROKER:
return this.unregisterBroker(ctx, request);
// 查询主题路由信息
case RequestCode.GET_ROUTEINFO_BY_TOPIC:
return this.getRouteInfoByTopic(ctx, request);
// 查询Broker集群信息
case RequestCode.GET_BROKER_CLUSTER_INFO:
return this.getBrokerClusterInfo(ctx, request);
// ...
// 获取所有主题列表
case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:
return getAllTopicListFromNameserver(ctx, request);
// 删除主题
case RequestCode.DELETE_TOPIC_IN_NAMESRV:
return deleteTopicInNamesrv(ctx, request);
// ...
}
return null;
}
在创建 NamesrvController
时,初始化了 kVConfigManager
和 routeInfoManager
等组件。这些组件的初始化确保了 NameServer
在启动时能够有效地管理配置和路由信息,从而提升消息传递的可靠性和系统的整体性能。接下来,让我们具体探讨一下 routeInfoManager
是如何维护和管理路由信息、主题和队列等相关信息的。
public class RouteInfoManager {
// ...
private final HashMap<String/* topic */, Map<String /* brokerName */ , QueueData>> topicQueueTable;
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
// ...
}
从 RouteInfoManager
的属性可以看出,它使用了五个 Map
来管理路由信息、主题和队列等相关数据:
-
topicQueueTable
存储每个主题对应的队列信息。
-
brokerAddrTable
存储
Broker
的信息。 -
clusterAddrTable
存储每个集群及其下属
Broker
的关系。 -
brokerLiveTable
存储每个
Broker
的实时状态信息。 -
filterServerTable
存储每个
Broker
对应的过滤服务器列表。
除了主动向 NameServer
发送请求会更新RouteInfoManager
的属性信息外,Broker
与 NameServer
连接状态发生变化也会更新相关的路由信息,确保系统能够及时反映 Broker
的健康状态:
#NettyRemotingServer
public void start() {
// ...
if (this.channelEventListener != null) {
this.nettyEventExecutor.start();
}
// ...
}
class NettyEventExecutor extends ServiceThread {
@Override
public void run() {
final ChannelEventListener listener = NettyRemotingAbstract.this.getChannelEventListener();
NettyEvent event = this.eventQueue.poll(3000, TimeUnit.MILLISECONDS);
if (event != null && listener != null) {
switch (event.getType()) {
case IDLE:
listener.onChannelIdle(event.getRemoteAddr(), event.getChannel());
break;
case CLOSE:
listener.onChannelClose(event.getRemoteAddr(), event.getChannel());
break;
case CONNECT:
listener.onChannelConnect(event.getRemoteAddr(), event.getChannel());
break;
case EXCEPTION:
listener.onChannelException(event.getRemoteAddr(), event.getChannel());
break;
default:
break;
}
}
// ...
}
其中,listener
就是创建 NamesrvController
时初始化的brokerHousekeepingService
属性,BrokerHousekeepingService
的主要职责是监控 Broker
的连接状态并维护路由信息。当连接关闭、异常或闲置时,它会通过 RouteInfoManager
更新相关的路由信息,确保系统能够及时反映 Broker
的健康状态:
public class BrokerHousekeepingService implements ChannelEventListener {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
private final NamesrvController namesrvController;
public BrokerHousekeepingService(NamesrvController namesrvController) {
this.namesrvController = namesrvController;
}
@Override
public void onChannelConnect(String remoteAddr, Channel channel) {
}
@Override
public void onChannelClose(String remoteAddr, Channel channel) {
this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
}
@Override
public void onChannelException(String remoteAddr, Channel channel) {
this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
}
@Override
public void onChannelIdle(String remoteAddr, Channel channel) {
this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
}
}
总结
整体来看,NameServer的设计实现相比于Zookeeper等注册中心显得非常轻量级。它通过创建Netty服务端(NettyRemotingServer
)与其他组件进行网络通信,并使用RouteInfoManager
来维护路由信息。此外,在NameServer的集群部署中,各个实例之间并不进行通信,仅作为数据备份,这大大降低了实现的复杂度。