8. dubbo源码-NettyServer

Consumer需要通过Netty(默认)把请求信息发送到远程Provider,Provider也是通过Netty接收并处理请求;这个处理流程的核心实现在NettyServer中,如下图所示,接下来重点分析Provider如何使用Netty的:

NettyServer-处理请求-1.png

NettyServer

NettyServer中绑定服务时,指定了handler为:new NettyHandler(getUrl(), this);,所以dubbo在NettyHandler中处理所有消息;NettyServerdoOpen()方法如下:

@Override
protected void doOpen() throws Throwable {
    NettyHelper.setNettyLoggerFactory();
    ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
    ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
    ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
    bootstrap = new ServerBootstrap(channelFactory);
    
    final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
    channels = nettyHandler.getChannels();
    bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
        public ChannelPipeline getPipeline() {
            NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec() ,getUrl(), NettyServer.this);
            ChannelPipeline pipeline = Channels.pipeline();
            // 设置编码&解码
            pipeline.addLast("decoder", adapter.getDecoder());
            pipeline.addLast("encoder", adapter.getEncoder());
            // 设置消息处理handler
            pipeline.addLast("handler", nettyHandler);
            return pipeline;
        }
    });
    // 在端口上开启Netty服务
    channel = bootstrap.bind(getBindAddress());
}

NettyHandler

NettyServer中已经设置了处理消息的handler为NettyHandlerNettyHandler只需继承org.jboss.netty.channel.SimpleChannelHandler即可(这只是Netty中成为handler的若干实现中的一种方法),源码如下:

public class NettyHandler extends SimpleChannelHandler {

    ... ...
    
    @Override
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
        try {
            handler.received(channel, e.getMessage());
        } finally {
            NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
        NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
        try {
            handler.caught(channel, e.getCause());
        } finally {
            NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
        }
    }
}

从NettyHandler的源码可知,不止接收消息在这里处理,建立连接(channelConnected),断开连接(channelDisconnected),异常处理(exceptionCaught)都在这里;

备注:该系列文章主要是讲解dubbo,不打算发散Netty相关知识点,如果想稍微了解Netty中的SimpleChannelHandler,请参考SimpleChannelHandler官方介绍

AllChannelHandler

接收请求的核心源码如下:


// 初始化一个共享的线程池
protected static final ExecutorService SHARED_EXECUTOR = Executors.newCachedThreadPool(new NamedThreadFactory("DubboSharedHandler", true));

protected final ExecutorService executor;
// 构造方法中给executor赋值,通过ThreadPool的SPI注解可知,Provider的默认线程池为FixedThreadPool
public WrappedChannelHandler(ChannelHandler handler, URL url) {
    ... ...
    // 选择一种线程池实现方式处理请求,想了解dubbo如何选择线程池实现方式,请参考[dubbo源码-ExtensionLoader扩展机制](http://www.jianshu.com/p/4691f98663aa)
    executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);
    ... ...
}

public void received(Channel channel, Object message) throws RemotingException {
    // 先得到线程池
    ExecutorService cexecutor = getExecutorService();
    try {
        // 线程异步处理请求
        cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
    } catch (Throwable t) {
        throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
    }
}

private ExecutorService getExecutorService() {
    ExecutorService cexecutor = executor;
    // 如果构造方法中初始化的ExecutorService为空,或者线程池已经被shutdown,那么用共享连接池处理请求
    if (cexecutor == null || cexecutor.isShutdown()) { 
        cexecutor = SHARED_EXECUTOR;
    }
    return cexecutor;
}

ChannelEventRunnable

dubbo只接收5种类型的消息,由枚举ChannelState定义;实现源码如下:

public void run() {
    switch (state) {
        case CONNECTED:
            ... ...
            break;
        case DISCONNECTED:
            ... ...
            break;
        case SENT:
            ... ...
            break;
        case RECEIVED:
            try{
                // 前面分析用线程池异步处理请求时ChannelState.RECEIVED,所以处理请求的业务逻辑在这里
                handler.received(channel, message);
            }catch (Exception e) {
                logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel + ", message is "+ message,e);
            }
            break;
        case CAUGHT:
            ... ...
            break;
        default:
            logger.warn("unknown state: " + state + ", message is " + message);
    }
}

DecodeHandler

DecodeHandlerreceived()接收到请求消息,判断是否需要解码;核心源码如下:

public void received(Channel channel, Object message) throws RemotingException {
    if (message instanceof Decodeable) {
        decode(message);
    }
    // 如果是consumer调用provider,那么message就是Request类型,需要在这里解码
    if (message instanceof Request) {
        decode(((Request)message).getData());
    }
    
    if (message instanceof Response) {
        decode( ((Response)message).getResult());
    }
    // 如果使用telnet方式调用provider,那么message是String类型,不需要再做任何处理,直接运行到这里
    handler.received(channel, message);
}

HeaderExchangeHandler

这里分析的是Provider处理Consumer的请求,所以message就是Request类型,实现的核心源码如下:

*** ***
if (message instanceof Request) {
    // handle request.
    Request request = (Request) message;
    if (request.isEvent()) {
        handlerEvent(channel, request);
    } else {
        // twoway就是指Consumer需要拿到Provider的结果的调用
        if (request.isTwoWay()) {
            Response response = handleRequest(exchangeChannel, request);
            // 将RPC结果发送到请求的channel(NettyChannel中调用send()方法)
            channel.send(response);
        } else {
            handler.received(exchangeChannel, request.getData());
        }
    }
} 
*** ***
Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException {
    Response res = new Response(req.getId(), req.getVersion());
    if (req.isBroken()) {
        ... ...
        return res;
    }
    // 从Request类型的req中取出请求msg,RpcInvocation类型.
    Object msg = req.getData();
    try {
        // handle data. 将返回的result(RpcResult类型)包装成dubbo统一的返回数据类型Reponse
        Object result = handler.reply(channel, msg);
        res.setStatus(Response.OK);
        res.setResult(result);
    } catch (Throwable e) {
        res.setStatus(Response.SERVICE_ERROR);
        res.setErrorMessage(StringUtils.toString(e));
    }
    return res;
}

DubboProtocol

DubboProtol中绑定端口开启Netty服务server = Exchangers.bind(url, requestHandler);可知,dubbo把请求交给requestHandler处理,requestHandler定义如下,从源码可知,requestHandler中定义了reply()received()connected()disconnected()这4个方法,而这4个方法最终都调用到reply(),上面的源码调用handler.reply(channel, msg),其核心实现源码如下:

private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
    
    public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
        if (message instanceof Invocation) {
            Invocation inv = (Invocation) message;
            // 根据调用参数Invocation就能得到需要调用的Invoker
            Invoker<?> invoker = getInvoker(channel, inv);
            //如果是callback 需要处理高版本调用低版本的问题
            if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))){
                String methodsStr = invoker.getUrl().getParameters().get("methods");
                boolean hasMethod = false;
                if (methodsStr == null || methodsStr.indexOf(",") == -1){
                    hasMethod = inv.getMethodName().equals(methodsStr);
                } else {
                    String[] methods = methodsStr.split(",");
                    for (String method : methods){
                        if (inv.getMethodName().equals(method)){
                            hasMethod = true;
                            break;
                        }
                    }
                }
                if (!hasMethod){
                    logger.warn(new IllegalStateException("The methodName "+inv.getMethodName()+" not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) +" ,invocation is :"+inv );
                    return null;
                }
            }
            RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
            // 经过Filter链后,动态代理调用Provider的方法实现,并返回结果
            return invoker.invoke(inv);
        }
        throw new RemotingException(channel, "Unsupported request: " + message == null ? null : (message.getClass().getName() + ": " + message) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
    }
    ... ...
};

reply()源码解读:
方法入参ExchangeChannel channel: dubbo服务处理请求/响应的渠道,例如NettyChannel, MinaChannel
方法入参Object message: Consumer调用Provider时的请求数据,包括调用的接口, 调用的参数,参数类型,dubbo的版本,请求服务的版本等信息;

  1. if (message instanceof Invocation){
    所有dubbo请求都被会封装成Invocation类型对象,所以如果请求message不是Invocation类型,那么抛出异常:throw new RemotingException(channel, "Unsupported request: "
  2. Invocation inv = (Invocation) message;
    Invoker<?> invoker = getInvoker(channel, inv);
    根据调用参数message得到Invoke
  3. return invoker.invoke(inv);
    根据封装后的请求参数inv进行invoke调用;
  4. HeaderExchangeHandler中把 invoker.invoke(inv)得到的RpcResult封装成Response对象:
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。