概览
BrokerController负责各组件的初始化、启动、关闭,初始化阶段针对不同请求类型创建了不同类型的Processer,processor分为七大类:send, pull, query, clientManage(心跳), consumerManage(offset管理), endTransaction(提交或回滚事务), default(控制台命令),不同请求由不同线程池处理。
protected final HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable = new HashMap<Integer, Pair<NettyRequestProcessor, ExecutorService>>(64);
如果是Linux平台,建议useEpollNativeSelector
设为true,这样创建NettyRemotingServer时,使用性能更高的EpollEventLoopGroup
和EpollSocketChannel
private boolean useEpoll() {
return RemotingUtil.isLinuxPlatform()
&& nettyServerConfig.isUseEpollNativeSelector()
&& Epoll.isAvailable();
}
服务端Handler
handshakeHandler
第一个入站handler,当客户端配置tls.enable=true时生效
public static boolean tlsEnable = Boolean.parseBoolean(System.getProperty(TLS_ENABLE, "false"));
会在第一次channelRead时动态添加sslhandler,并删除自己
NettyDecoder
实现了LengthFieldBasedFrameDecoder
,基于不定长度的协议,几个关键参数:
private final int maxFrameLength;//最大协议帧长度,超过将抛TooLongFrameException
private final int lengthFieldOffset;//长度域偏移量
private final int lengthFieldLength;//长度域的长度
private final int lengthAdjustment;//长度域的值+lengthAdjustment,就是netty从长度域开始向后获取包体的长度
private final int initialBytesToStrip;//截断头部多少个字节
线程模型
bossEventLoopGroup线程数只有1个,workerEventLoopGroup线程3个,handshakeHandler、encoder、NettyDecoder、IdleStateHandler、connectionManageHandler和serverHandler使用defaultEventExecutorGroup中的默认8个线程处理,serverHandler中具体请求处理交给不同类型的Processor对应的线程池处理。
Netty中,ChannelHandler中的方法可以使用不同的线程去处理,ChannelPipeLine.addLast可以指定EventExecutorGroup,它的next返回的EventExecutor处理,ChannelPipeLine维护了一个EventExecutorGroup到EventExecutor的map,确保对同一个channel来说,始终用同一个线程处理。可以通过ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP
设置成false来用不同线程处理同一个channel
ChannelPipeLine.addLast(EventExecutorGroup group, String name, ChannelHandler handler)
private EventExecutor childExecutor(EventExecutorGroup group) {
if (group == null) {
return null;
}
Boolean pinEventExecutor = channel.config().getOption(ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP);
if (pinEventExecutor != null && !pinEventExecutor) {
return group.next();
}
Map<EventExecutorGroup, EventExecutor> childExecutors = this.childExecutors;
if (childExecutors == null) {
// Use size of 4 as most people only use one extra EventExecutor.
childExecutors = this.childExecutors = new IdentityHashMap<EventExecutorGroup, EventExecutor>(4);
}
// Pin one of the child executors once and remember it so that the same child executor
// is used to fire events for the same channel.
EventExecutor childExecutor = childExecutors.get(group);
if (childExecutor == null) {
childExecutor = group.next();
childExecutors.put(group, childExecutor);
}
return childExecutor;
}
哪些请求走10909、哪些走10911
fastRemotingServer对应10909,是VIP通道,remotingServer对应10911,除了pull,其他6种processor都会注册到2个上,取决于客户端配置,如果配置了com.rocketmq.sendMessageWithVIPChannel
为true,则走10909
private boolean vipChannelEnabled = Boolean.parseBoolean(System.getProperty(SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY, "false"));