8. dubbo源码-NettyServer

Consumer需要通过Netty(默认)把请求信息发送到远程Provider,Provider也是通过Netty接收并处理请求;这个处理流程的核心实现在NettyServer中,如下图所示,接下来重点分析Provider如何使用Netty的:

NettyServer-处理请求-1.png

NettyServer

NettyServer中绑定服务时,指定了handler为:new NettyHandler(getUrl(), this);,所以dubbo在NettyHandler中处理所有消息;NettyServerdoOpen()方法如下:

@Override
protected void doOpen() throws Throwable {
    NettyHelper.setNettyLoggerFactory();
    ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
    ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
    ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
    bootstrap = new ServerBootstrap(channelFactory);
    
    final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
    channels = nettyHandler.getChannels();
    bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
        public ChannelPipeline getPipeline() {
            NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec() ,getUrl(), NettyServer.this);
            ChannelPipeline pipeline = Channels.pipeline();
            // 设置编码&解码
            pipeline.addLast("decoder", adapter.getDecoder());
            pipeline.addLast("encoder", adapter.getEncoder());
            // 设置消息处理handler
            pipeline.addLast("handler", nettyHandler);
            return pipeline;
        }
    });
    // 在端口上开启Netty服务
    channel = bootstrap.bind(getBindAddress());
}

NettyHandler

NettyServer中已经设置了处理消息的handler为NettyHandlerNettyHandler只需继承org.jboss.netty.channel.SimpleChannelHandler即可(这只是Netty中成为handler的若干实现中的一种方法),源码如下:

public class NettyHandler extends SimpleChannelHandler {

    ... ...
    
    @Override
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
        try {
            handler.received(channel, e.getMessage());
        } finally {
            NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
        NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
        try {
            handler.caught(channel, e.getCause());
        } finally {
            NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
        }
    }
}

从NettyHandler的源码可知,不止接收消息在这里处理,建立连接(channelConnected),断开连接(channelDisconnected),异常处理(exceptionCaught)都在这里;

备注:该系列文章主要是讲解dubbo,不打算发散Netty相关知识点,如果想稍微了解Netty中的SimpleChannelHandler,请参考SimpleChannelHandler官方介绍

AllChannelHandler

接收请求的核心源码如下:


// 初始化一个共享的线程池
protected static final ExecutorService SHARED_EXECUTOR = Executors.newCachedThreadPool(new NamedThreadFactory("DubboSharedHandler", true));

protected final ExecutorService executor;
// 构造方法中给executor赋值,通过ThreadPool的SPI注解可知,Provider的默认线程池为FixedThreadPool
public WrappedChannelHandler(ChannelHandler handler, URL url) {
    ... ...
    // 选择一种线程池实现方式处理请求,想了解dubbo如何选择线程池实现方式,请参考[dubbo源码-ExtensionLoader扩展机制](http://www.jianshu.com/p/4691f98663aa)
    executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);
    ... ...
}

public void received(Channel channel, Object message) throws RemotingException {
    // 先得到线程池
    ExecutorService cexecutor = getExecutorService();
    try {
        // 线程异步处理请求
        cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
    } catch (Throwable t) {
        throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
    }
}

private ExecutorService getExecutorService() {
    ExecutorService cexecutor = executor;
    // 如果构造方法中初始化的ExecutorService为空,或者线程池已经被shutdown,那么用共享连接池处理请求
    if (cexecutor == null || cexecutor.isShutdown()) { 
        cexecutor = SHARED_EXECUTOR;
    }
    return cexecutor;
}

ChannelEventRunnable

dubbo只接收5种类型的消息,由枚举ChannelState定义;实现源码如下:

public void run() {
    switch (state) {
        case CONNECTED:
            ... ...
            break;
        case DISCONNECTED:
            ... ...
            break;
        case SENT:
            ... ...
            break;
        case RECEIVED:
            try{
                // 前面分析用线程池异步处理请求时ChannelState.RECEIVED,所以处理请求的业务逻辑在这里
                handler.received(channel, message);
            }catch (Exception e) {
                logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel + ", message is "+ message,e);
            }
            break;
        case CAUGHT:
            ... ...
            break;
        default:
            logger.warn("unknown state: " + state + ", message is " + message);
    }
}

DecodeHandler

DecodeHandlerreceived()接收到请求消息,判断是否需要解码;核心源码如下:

public void received(Channel channel, Object message) throws RemotingException {
    if (message instanceof Decodeable) {
        decode(message);
    }
    // 如果是consumer调用provider,那么message就是Request类型,需要在这里解码
    if (message instanceof Request) {
        decode(((Request)message).getData());
    }
    
    if (message instanceof Response) {
        decode( ((Response)message).getResult());
    }
    // 如果使用telnet方式调用provider,那么message是String类型,不需要再做任何处理,直接运行到这里
    handler.received(channel, message);
}

HeaderExchangeHandler

这里分析的是Provider处理Consumer的请求,所以message就是Request类型,实现的核心源码如下:

*** ***
if (message instanceof Request) {
    // handle request.
    Request request = (Request) message;
    if (request.isEvent()) {
        handlerEvent(channel, request);
    } else {
        // twoway就是指Consumer需要拿到Provider的结果的调用
        if (request.isTwoWay()) {
            Response response = handleRequest(exchangeChannel, request);
            // 将RPC结果发送到请求的channel(NettyChannel中调用send()方法)
            channel.send(response);
        } else {
            handler.received(exchangeChannel, request.getData());
        }
    }
} 
*** ***
Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException {
    Response res = new Response(req.getId(), req.getVersion());
    if (req.isBroken()) {
        ... ...
        return res;
    }
    // 从Request类型的req中取出请求msg,RpcInvocation类型.
    Object msg = req.getData();
    try {
        // handle data. 将返回的result(RpcResult类型)包装成dubbo统一的返回数据类型Reponse
        Object result = handler.reply(channel, msg);
        res.setStatus(Response.OK);
        res.setResult(result);
    } catch (Throwable e) {
        res.setStatus(Response.SERVICE_ERROR);
        res.setErrorMessage(StringUtils.toString(e));
    }
    return res;
}

DubboProtocol

DubboProtol中绑定端口开启Netty服务server = Exchangers.bind(url, requestHandler);可知,dubbo把请求交给requestHandler处理,requestHandler定义如下,从源码可知,requestHandler中定义了reply()received()connected()disconnected()这4个方法,而这4个方法最终都调用到reply(),上面的源码调用handler.reply(channel, msg),其核心实现源码如下:

private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
    
    public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
        if (message instanceof Invocation) {
            Invocation inv = (Invocation) message;
            // 根据调用参数Invocation就能得到需要调用的Invoker
            Invoker<?> invoker = getInvoker(channel, inv);
            //如果是callback 需要处理高版本调用低版本的问题
            if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))){
                String methodsStr = invoker.getUrl().getParameters().get("methods");
                boolean hasMethod = false;
                if (methodsStr == null || methodsStr.indexOf(",") == -1){
                    hasMethod = inv.getMethodName().equals(methodsStr);
                } else {
                    String[] methods = methodsStr.split(",");
                    for (String method : methods){
                        if (inv.getMethodName().equals(method)){
                            hasMethod = true;
                            break;
                        }
                    }
                }
                if (!hasMethod){
                    logger.warn(new IllegalStateException("The methodName "+inv.getMethodName()+" not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) +" ,invocation is :"+inv );
                    return null;
                }
            }
            RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
            // 经过Filter链后,动态代理调用Provider的方法实现,并返回结果
            return invoker.invoke(inv);
        }
        throw new RemotingException(channel, "Unsupported request: " + message == null ? null : (message.getClass().getName() + ": " + message) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
    }
    ... ...
};

reply()源码解读:
方法入参ExchangeChannel channel: dubbo服务处理请求/响应的渠道,例如NettyChannel, MinaChannel
方法入参Object message: Consumer调用Provider时的请求数据,包括调用的接口, 调用的参数,参数类型,dubbo的版本,请求服务的版本等信息;

  1. if (message instanceof Invocation){
    所有dubbo请求都被会封装成Invocation类型对象,所以如果请求message不是Invocation类型,那么抛出异常:throw new RemotingException(channel, "Unsupported request: "
  2. Invocation inv = (Invocation) message;
    Invoker<?> invoker = getInvoker(channel, inv);
    根据调用参数message得到Invoke
  3. return invoker.invoke(inv);
    根据封装后的请求参数inv进行invoke调用;
  4. HeaderExchangeHandler中把 invoker.invoke(inv)得到的RpcResult封装成Response对象:
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,864评论 6 494
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,175评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,401评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,170评论 1 286
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,276评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,364评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,401评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,179评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,604评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,902评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,070评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,751评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,380评论 3 319
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,077评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,312评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,924评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,957评论 2 351