开篇
- 这篇文章主要目的是阐述服务提供方provider在接受服务消费者请求后的处理流程,包括从NettyServer到DubboProtocol的整个处理流程。
处理请求流程
- 步骤1 至 步骤12是connected的流程。
- 步骤13 至 步骤23是received的流程。
- 整体流程中核心在于AllChannelHandler内部调用ExecutorService提交ChannelEventRunnable事件进行多线程处理。
- 整体处理流程:
NettyServer => NettyServerHandler => MultiMessageHandler =>
HeartbeatHandler => AllChannelHandler => ExecutorService(ChannelEventRunnable) => DecodeHandler => HeaderExchangeHandler => DubboProtocol(ExchangeHandlerAdapter)
- NettyServerHandler的构造参数为NettyServer本身对象。
- 了解NettyServer本身的类关系方便理解处理流程。
- NettyServer的父类AbstractPeer负责处理连接和读取事件。
- NettyServer的对象本身的handler是MultiMessageHandler对象。
- NettyServer在解码后会执行AbstractPeer的读写操作并开始进入MultiMessageHandler的处理流程。
源码分析流程
NettyServer
public class NettyServer extends AbstractServer implements Server {
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}
@Override
protected void doOpen() throws Throwable {
bootstrap = new ServerBootstrap();
bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
new DefaultThreadFactory("NettyServerWorker", true));
// Netty的回调函数
final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
channels = nettyServerHandler.getChannels();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
.childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
.addLast("decoder", adapter.getDecoder())
.addLast("encoder", adapter.getEncoder())
.addLast("handler", nettyServerHandler);
}
});
// bind
ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
channelFuture.syncUninterruptibly();
channel = channelFuture.channel();
}
}
- NettyServer的NettyServerHandler是处理请求的入口,处理解码之后的数据。
- decoder的解码结果是com.alibaba.dubbo.remoting.exchange.Request对象。
- decoder的解码结果对象如上图。
- 解码的核心逻辑在com.alibaba.dubbo.rpc.protocol.dubbo.DubboCodec#decodeBody。
- 解码流程可以参考Dubbo Provider 编解码过程
NettyServerHandler
public class NettyServerHandler extends ChannelDuplexHandler {
private final Map<String, Channel> channels = new ConcurrentHashMap<String, Channel>(); // <ip:port, channel>
private final URL url;
// MultiMessageHandler
private final ChannelHandler handler;
public NettyServerHandler(URL url, ChannelHandler handler) {
this.url = url;
this.handler = handler;
}
public Map<String, Channel> getChannels() {
return channels;
}
// Netty处理连接建立事件
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelActive();
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
try {
if (channel != null) {
channels.put(NetUtils.toAddressString((InetSocketAddress) ctx.channel().remoteAddress()), channel);
}
handler.connected(channel);
} finally {
NettyChannel.removeChannelIfDisconnected(ctx.channel());
}
}
// Netty处理数据可读事件
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
try {
handler.received(channel, msg);
} finally {
NettyChannel.removeChannelIfDisconnected(ctx.channel());
}
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
super.write(ctx, msg, promise);
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
try {
handler.sent(channel, msg);
} finally {
NettyChannel.removeChannelIfDisconnected(ctx.channel());
}
}
}
- NettyServerHandler的channelActive处理连接建立的事件。
- NettyServerHandler的channelRead处理数据请求事件。
- NettyServerHandler的handler为NettyServer对象本身。
AbstractChannelHandlerDelegate
public abstract class AbstractChannelHandlerDelegate implements ChannelHandlerDelegate {
protected ChannelHandler handler;
protected AbstractChannelHandlerDelegate(ChannelHandler handler) {
Assert.notNull(handler, "handler == null");
this.handler = handler;
}
@Override
public ChannelHandler getHandler() {
if (handler instanceof ChannelHandlerDelegate) {
return ((ChannelHandlerDelegate) handler).getHandler();
}
return handler;
}
@Override
public void connected(Channel channel) throws RemotingException {
handler.connected(channel);
}
@Override
public void disconnected(Channel channel) throws RemotingException {
handler.disconnected(channel);
}
@Override
public void sent(Channel channel, Object message) throws RemotingException {
handler.sent(channel, message);
}
@Override
public void received(Channel channel, Object message) throws RemotingException {
handler.received(channel, message);
}
@Override
public void caught(Channel channel, Throwable exception) throws RemotingException {
handler.caught(channel, exception);
}
}
- AbstractChannelHandlerDelegate作为MultiMessageHandler、HeartbeatHandler、DecodeHandler的基类,提供通用的connected、received事件。
MultiMessageHandler
public class MultiMessageHandler extends AbstractChannelHandlerDelegate {
public MultiMessageHandler(ChannelHandler handler) {
// HeartbeatHandler
super(handler);
}
@SuppressWarnings("unchecked")
@Override
public void received(Channel channel, Object message) throws RemotingException {
if (message instanceof MultiMessage) {
MultiMessage list = (MultiMessage) message;
for (Object obj : list) {
// HeartbeatHandler
handler.received(channel, obj);
}
} else {
// HeartbeatHandler
handler.received(channel, message);
}
}
}
- MultiMessageHandler负责处理多个消息的事件,针对每个消息调用父类的AbstractChannelHandlerDelegate的received方法。
- 通过父类AbstractChannelHandlerDelegate的connected方法处理连接事件。
HeartbeatHandler
public class HeartbeatHandler extends AbstractChannelHandlerDelegate {
private static final Logger logger = LoggerFactory.getLogger(HeartbeatHandler.class);
public static String KEY_READ_TIMESTAMP = "READ_TIMESTAMP";
public static String KEY_WRITE_TIMESTAMP = "WRITE_TIMESTAMP";
public HeartbeatHandler(ChannelHandler handler) {
// AllChannelHandler
super(handler);
}
@Override
public void connected(Channel channel) throws RemotingException {
setReadTimestamp(channel);
setWriteTimestamp(channel);
// AllChannelHandler
handler.connected(channel);
}
@Override
public void disconnected(Channel channel) throws RemotingException {
clearReadTimestamp(channel);
clearWriteTimestamp(channel);
// AllChannelHandler
handler.disconnected(channel);
}
@Override
public void sent(Channel channel, Object message) throws RemotingException {
setWriteTimestamp(channel);
// AllChannelHandler
handler.sent(channel, message);
}
@Override
public void received(Channel channel, Object message) throws RemotingException {
setReadTimestamp(channel);
if (isHeartbeatRequest(message)) {
Request req = (Request) message;
if (req.isTwoWay()) {
Response res = new Response(req.getId(), req.getVersion());
res.setEvent(Response.HEARTBEAT_EVENT);
channel.send(res);
}
return;
}
if (isHeartbeatResponse(message)) {
return;
}
// AllChannelHandler
handler.received(channel, message);
}
}
- HeartbeatHandler的received方法处理数据请求事件。
- HeartbeatHandler的connected方法处理连接事件。
AllChannelHandler
public class AllChannelHandler extends WrappedChannelHandler {
public AllChannelHandler(ChannelHandler handler, URL url) {
super(handler, url);
}
@Override
public void connected(Channel channel) throws RemotingException {
ExecutorService cexecutor = getExecutorService();
try {
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
} catch (Throwable t) {
throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);
}
}
@Override
public void disconnected(Channel channel) throws RemotingException {
ExecutorService cexecutor = getExecutorService();
try {
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
} catch (Throwable t) {
throw new ExecutionException("disconnect event", channel, getClass() + " error when process disconnected event .", t);
}
}
@Override
public void received(Channel channel, Object message) throws RemotingException {
ExecutorService cexecutor = getExecutorService();
try {
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
//TODO A temporary solution to the problem that the exception information can not be sent to the opposite end after the thread pool is full. Need a refactoring
//fix The thread pool is full, refuses to call, does not return, and causes the consumer to wait for time out
if(message instanceof Request && t instanceof RejectedExecutionException){
Request request = (Request)message;
if(request.isTwoWay()){
String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage();
Response response = new Response(request.getId(), request.getVersion());
response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
response.setErrorMessage(msg);
channel.send(response);
return;
}
}
throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
}
}
}
- AllChannelHandler的received处理数据请求事件。
- AllChannelHandler的connected处理连接事件。
- AllChannelHandler的ExecutorService是FixedThreadPool线程池。
- AllChannelHandler的将所有事件封装成ChannelEventRunnable对象,由线程池处理。
ChannelEventRunnable
public class ChannelEventRunnable implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(ChannelEventRunnable.class);
// DecodeHandler
private final ChannelHandler handler;
private final Channel channel;
private final ChannelState state;
private final Throwable exception;
private final Object message;
public ChannelEventRunnable(Channel channel, ChannelHandler handler, ChannelState state, Object message, Throwable exception) {
this.channel = channel;
this.handler = handler;
this.state = state;
this.message = message;
this.exception = exception;
}
@Override
public void run() {
if (state == ChannelState.RECEIVED) {
try {
handler.received(channel, message);
} catch (Exception e) {
logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
+ ", message is " + message, e);
}
} else {
switch (state) {
case CONNECTED:
try {
handler.connected(channel);
} catch (Exception e) {
logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
}
break;
case DISCONNECTED:
try {
handler.disconnected(channel);
} catch (Exception e) {
logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
}
break;
case SENT:
try {
handler.sent(channel, message);
} catch (Exception e) {
logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
+ ", message is " + message, e);
}
case CAUGHT:
try {
handler.caught(channel, exception);
} catch (Exception e) {
logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
+ ", message is: " + message + ", exception is " + exception, e);
}
break;
default:
logger.warn("unknown state: " + state + ", message is " + message);
}
}
}
}
- ChannelEventRunnable的handler是DecodeHandler对象。
- ChannelEventRunnable内部处理各类的事件,如state=RECEIVED处理消息请求事件。
DecodeHandler
public class DecodeHandler extends AbstractChannelHandlerDelegate {
private static final Logger log = LoggerFactory.getLogger(DecodeHandler.class);
public DecodeHandler(ChannelHandler handler) {
super(handler);
}
@Override
public void received(Channel channel, Object message) throws RemotingException {
if (message instanceof Decodeable) {
decode(message);
}
if (message instanceof Request) {
decode(((Request) message).getData());
}
if (message instanceof Response) {
decode(((Response) message).getResult());
}
// HeaderExchangeHandler
handler.received(channel, message);
}
private void decode(Object message) {
if (message != null && message instanceof Decodeable) {
try {
((Decodeable) message).decode();
if (log.isDebugEnabled()) {
log.debug("Decode decodeable message " + message.getClass().getName());
}
} catch (Throwable e) {
if (log.isWarnEnabled()) {
log.warn("Call Decodeable.decode failed: " + e.getMessage(), e);
}
} // ~ end of catch
} // ~ end of if
} // ~ end of method decode
}
- DecodeHandler的父类AbstractChannelHandlerDelegate负责处理各类事件,如received、connectd等。
HeaderExchangeHandler
public class HeaderExchangeHandler implements ChannelHandlerDelegate {
protected static final Logger logger = LoggerFactory.getLogger(HeaderExchangeHandler.class);
public static String KEY_READ_TIMESTAMP = HeartbeatHandler.KEY_READ_TIMESTAMP;
public static String KEY_WRITE_TIMESTAMP = HeartbeatHandler.KEY_WRITE_TIMESTAMP;
// com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol#requestHandler
private final ExchangeHandler handler;
public HeaderExchangeHandler(ExchangeHandler handler) {
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
this.handler = handler;
}
public void connected(Channel channel) throws RemotingException {
channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
channel.setAttribute(KEY_WRITE_TIMESTAMP, System.currentTimeMillis());
ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
try {
handler.connected(exchangeChannel);
} finally {
HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
}
public void received(Channel channel, Object message) throws RemotingException {
channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
try {
// 处理请求
if (message instanceof Request) {
// handle request.
Request request = (Request) message;
if (request.isEvent()) {
handlerEvent(channel, request);
} else {
// 处理需要返回的请求,双向
if (request.isTwoWay()) {
Response response = handleRequest(exchangeChannel, request);
channel.send(response);
} else {
// 不需要返回值的请求
handler.received(exchangeChannel, request.getData());
}
}
} else if (message instanceof Response) {
// 处理响应
handleResponse(channel, (Response) message);
} else if (message instanceof String) {
if (isClientSide(channel)) {
Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
logger.error(e.getMessage(), e);
} else {
// telnet的请求逻辑
String echo = handler.telnet(channel, (String) message);
if (echo != null && echo.length() > 0) {
channel.send(echo);
}
}
} else {
handler.received(exchangeChannel, message);
}
} finally {
HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
}
Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException {
Response res = new Response(req.getId(), req.getVersion());
if (req.isBroken()) {
Object data = req.getData();
String msg;
if (data == null) msg = null;
else if (data instanceof Throwable) msg = StringUtils.toString((Throwable) data);
else msg = data.toString();
res.setErrorMessage("Fail to decode request due to: " + msg);
res.setStatus(Response.BAD_REQUEST);
return res;
}
// find handler by message class.
Object msg = req.getData();
try {
// handle data.
Object result = handler.reply(channel, msg);
res.setStatus(Response.OK);
res.setResult(result);
} catch (Throwable e) {
res.setStatus(Response.SERVICE_ERROR);
res.setErrorMessage(StringUtils.toString(e));
}
return res;
}
}
- HeaderExchangeHandler的received()方法负责处理消息请求事件。
- HeaderExchangeHandler的connected()方法负责处理连接请求事件。
- 针对双向请求request.isTwoWay()执行handleRequest()方法处理请求。
- handleRequest()方法会请求DubboProtocol的requestHandler对象。
DubboProtocol
public class DubboProtocol extends AbstractProtocol {
public static final String NAME = "dubbo";
public static final int DEFAULT_PORT = 20880;
private static final String IS_CALLBACK_SERVICE_INVOKE = "_isCallBackServiceInvoke";
private static DubboProtocol INSTANCE;
private final Map<String, ExchangeServer> serverMap = new ConcurrentHashMap<String, ExchangeServer>(); // <host:port,Exchanger>
private final Map<String, ReferenceCountExchangeClient> referenceClientMap = new ConcurrentHashMap<String, ReferenceCountExchangeClient>(); // <host:port,Exchanger>
private final ConcurrentMap<String, LazyConnectExchangeClient> ghostClientMap = new ConcurrentHashMap<String, LazyConnectExchangeClient>();
private final ConcurrentMap<String, Object> locks = new ConcurrentHashMap<String, Object>();
private final Set<String> optimizers = new ConcurrentHashSet<String>();
private final ConcurrentMap<String, String> stubServiceMethodsMap = new ConcurrentHashMap<String, String>();
private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
@Override
public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
if (message instanceof Invocation) {
Invocation inv = (Invocation) message;
Invoker<?> invoker = getInvoker(channel, inv);
// need to consider backward-compatibility if it's a callback
if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
String methodsStr = invoker.getUrl().getParameters().get("methods");
boolean hasMethod = false;
if (methodsStr == null || methodsStr.indexOf(",") == -1) {
hasMethod = inv.getMethodName().equals(methodsStr);
} else {
String[] methods = methodsStr.split(",");
for (String method : methods) {
if (inv.getMethodName().equals(method)) {
hasMethod = true;
break;
}
}
}
if (!hasMethod) {
return null;
}
}
RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
return invoker.invoke(inv);
}
throw new RemotingException(channel, "Unsupported request: "
+ (message == null ? null : (message.getClass().getName() + ": " + message))
+ ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
}
@Override
public void received(Channel channel, Object message) throws RemotingException {
if (message instanceof Invocation) {
reply((ExchangeChannel) channel, message);
} else {
super.received(channel, message);
}
}
@Override
public void connected(Channel channel) throws RemotingException {
invoke(channel, Constants.ON_CONNECT_KEY);
}
@Override
public void disconnected(Channel channel) throws RemotingException {
invoke(channel, Constants.ON_DISCONNECT_KEY);
}
private void invoke(Channel channel, String methodKey) {
Invocation invocation = createInvocation(channel, channel.getUrl(), methodKey);
if (invocation != null) {
try {
received(channel, invocation);
} catch (Throwable t) {
logger.warn("Failed to invoke event method " + invocation.getMethodName() + "(), cause: " + t.getMessage(), t);
}
}
}
private Invocation createInvocation(Channel channel, URL url, String methodKey) {
String method = url.getParameter(methodKey);
if (method == null || method.length() == 0) {
return null;
}
RpcInvocation invocation = new RpcInvocation(method, new Class<?>[0], new Object[0]);
invocation.setAttachment(Constants.PATH_KEY, url.getPath());
invocation.setAttachment(Constants.GROUP_KEY, url.getParameter(Constants.GROUP_KEY));
invocation.setAttachment(Constants.INTERFACE_KEY, url.getParameter(Constants.INTERFACE_KEY));
invocation.setAttachment(Constants.VERSION_KEY, url.getParameter(Constants.VERSION_KEY));
if (url.getParameter(Constants.STUB_EVENT_KEY, false)) {
invocation.setAttachment(Constants.STUB_EVENT_KEY, Boolean.TRUE.toString());
}
return invocation;
}
};
}
- DubboProtocol的requestHandler是处理的核心逻辑。
- 寻找被调用对象invoker的逻辑在于getInvoker(channel, inv)。
- 执行invoker.invoke(inv)方法执行真正的逻辑。
- DubboInvoker的执行逻辑见Dubbo Provider 处理请求流程 (2)介绍。