【Dubbo】网络通信

provider、consumer通信原理

未命名文件 (7).png

Consumer发送原理

-->Result result = invoker.invoke(invocation)
--------------------------------------------------------------------------扩展点----------------
                -->InvokerWrapper.invoke
                  -->ProtocolFilterWrapper.invoke
                    -->ConsumerContextFilter.invoke
                      -->ProtocolFilterWrapper.invoke
                        -->MonitorFilter.invoke
                          -->ProtocolFilterWrapper.invoke
                            -->FutureFilter.invoke
                              -->ListenerInvokerWrapper.invoke
                                -->AbstractInvoker.invoke
---------------------------------------------------------------------------扩展点---------------
                                  -->doInvoke(invocation)
                                    -->DubboInvoker.doInvoke//为什么DubboInvoker是个protocol? 因为RegistryDirectory.refreshInvoker.toInvokers: protocol.refer
                                      -->ReferenceCountExchangeClient.request
                                        -->HeaderExchangeClient.request
                                          -->HeaderExchangeChannel.request
                                            -->NettyClient.send
                                            -->AbstractPeer.send
                                              -->NettyChannel.send
                                                -->ChannelFuture future = channel.write(message);//最终的目的:通过netty的channel发送网络数据

consumer的RegistryDirectory创建DubboInvoker是在zk配置发生变化回调RegistryDirectory.notify的时候。创建DubboInvoker使用的是protocol.refer(serviceType, url)方法,Protocol$Adpative会有一些Wapper给DubboInvoker添加很多包装类,所以在调用链中会有一些filter,这和服务发布的时候是一样的

#com.alibaba.dubbo.registry.integration.RegistryDirectory#toInvokers
private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
......
                        invoker = new InvokerDelegete<T>(protocol.refer(serviceType, url), url, providerUrl);
......
}
#com.alibaba.dubbo.rpc.protocol.dubbo.DubboInvoker#doInvoke
    private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
......
ResponseFuture future = currentClient.request(inv, timeout) ;
......
}

provider接收、处理、返回

NettyHandler.messageReceived
  -->AbstractPeer.received
    -->MultiMessageHandler.received
      -->HeartbeatHandler.received
        -->AllChannelHandler.received
          -->ChannelEventRunnable.run //线程池 执行线程
            -->DecodeHandler.received
              -->HeaderExchangeHandler.received
                -->handleRequest(exchangeChannel, request)//网络通信接收处理 
                  -->DubboProtocol.reply
                    -->getInvoker
                      -->exporterMap.get(serviceKey)//从服务暴露里面提取 
                      -->DubboExporter.getInvoker()//最终得到一个invoker
-------------------------------------------------------------------------扩展点--------------
                    -->ProtocolFilterWrapper.invoke
                      -->EchoFilter.invoke
                        -->ClassLoaderFilter.invoke
                          -->GenericFilter.invoke
                            -->TraceFilter.invoke
                              -->MonitorFilter.invoke
                                -->TimeoutFilter.invoke
                                  -->ExceptionFilter.invoke
                                    -->InvokerWrapper.invoke
-------------------------------------------------------------------------扩展点--------------
                                      -->AbstractProxyInvoker.invoke
                                        -->JavassistProxyFactory.AbstractProxyInvoker.doInvoke
                                          --> 进入真正执行的实现类   DemoServiceImpl.sayHello
                                        ....................................
                -->channel.send(response);//把接收处理的结果,发送回去 
                  -->AbstractPeer.send
                    -->NettyChannel.send
                      -->ChannelFuture future = channel.write(message);//数据发回consumer

Consumer的接收原理

//consumer的接收原理 
NettyHandler.messageReceived
  -->AbstractPeer.received
    -->MultiMessageHandler.received
      -->HeartbeatHandler.received
        -->AllChannelHandler.received
          -->ChannelEventRunnable.run //线程池 执行线程
            -->DecodeHandler.received
              -->HeaderExchangeHandler.received
                -->handleResponse(channel, (Response) message);
                  -->HeaderExchangeHandler.handleResponse
                    -->DefaultFuture.received
                      -->DefaultFuture.doReceived
                        private void doReceived(Response res) {
                            lock.lock();
                            try {
                                response = res;
                                if (done != null) {
                                    done.signal();
                                }
                            } finally {
                                lock.unlock();
                            }
                            if (callback != null) {
                                invokeCallback(callback);
                            }
                        }

发送数据是异步的,可以参考netty的案例,通过channel发送数据不能直接拿到结果,必须通过epoll中的等待回调事件再取得数据

数据通信的基本类

  • NettyChannel
    包含一个send(Object message, boolean sent)方法,内置了一个netty自己的channel可直接发送数据

  • NettyHandler
    与原生netty的事件交互,获取到netty的事件后会回调上层hook的handler

  • ChannelEventRunnable
    接收到的数据的处理

  • DecodeHandler
    数据的解码器

  • HeaderExchangeHandler
    是一个中间层,负责将解析好的数据与上层业务的逻辑的流转
    handleRequest, handleResponse

image.png

异步转同步

dubbo的consumer发送请求是非阻塞的,不会等待返回值。provider接收是阻塞的会等待provider调用invoker处理完直接返回给consumer。

    public void start() throws Exception {
        for (int i = 0; i < Integer.MAX_VALUE; i ++) {
            try {
                String hello = demoService.sayHello("world" + i);
                System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "] " + hello);
            } catch (Exception e) {
                e.printStackTrace();
            }
            Thread.sleep(2000);
        }
    }

dubbo 是基于netty NIO的非阻塞并行调用通信。(阻塞 非阻塞 异步 同步 区别 )dubbo 的通信方式 有3类类型:

  1. 异步,有返回值
    <dubbo:method name="sayHello" async="true"> </dubbo:method>
    Future<String> temp= RpcContext.getContext().getFuture(); hello=temp.get();

  2. 异步,无返回值
    <dubbo:method name="sayHello" return="false"></dubbo:method>

  3. 异步,变同步(默认的通信方式)
    A.当前线程怎么让它 “暂停,等结果回来后,再执行”?
    B.socket是一个全双工的通信方式,那么在多线程的情况下,如何知道那个返回结果对应原先那条线程的调用?
    通过一个全局唯一的ID来做consumer 和 provider 来回传输。

单工 全双工 半双工 区别
  • 单工
    在同一时间只允许一方向另一方传送信息,而另一方不能向一方传送
  • 全双工
    是指在发送数据的同事也能够接收数据,两者同步进行,这好像我们平时打电话一样,说话的同事也能够听到对方的声音。目前的网卡一般都支持全双工
  • 半双工
    所谓的半双工就是指一个时间段内只有一个动作发生,举个简单例子,一条窄窄的马路,同事只能有一辆车通过,当目前有两辆车对开,这种情况下就只能一辆车先过,到头后另一辆车再开,这个例子就形象的说明了半双工的原理。
同步、异步实现

异步.有返回值时会将能获取结果的future放在上下文变量中,如果需要取结果直接从future中阻塞获取就可以了;异步,无返回值,则不再处理;同步,有返回值会直接调用future.get

#com.alibaba.dubbo.rpc.protocol.dubbo.DubboInvoker#doInvoke
    protected Result doInvoke(final Invocation invocation) throws Throwable {
boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
            //是否有返回值
            boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
            int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT);
            if (isOneway) {
                //2.异步,无返回值
                boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                //ReferenceCountExchangeClient
                currentClient.send(inv, isSent);
                RpcContext.getContext().setFuture(null);
                return new RpcResult();
            } else if (isAsync) {
                //1.异步.有返回值
                //ReferenceCountExchangeClient 发送请求
                ResponseFuture future = currentClient.request(inv, timeout) ;
                RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
                return new RpcResult();
            } else {//
                //3. 异步,变同步(默认的通信方式)
                RpcContext.getContext().setFuture(null);
                return (Result) currentClient.request(inv, timeout).get();
            }
}
  • 生成future
#com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeChannel#request(java.lang.Object, int)
public ResponseFuture request(Object request, int timeout) throws RemotingException {
        // create request.
        Request req = new Request();
        req.setVersion("2.0.0");
        req.setTwoWay(true);
        req.setData(request);
        DefaultFuture future = new DefaultFuture(channel, req, timeout);
        try{
            //AbstractPeer
            channel.send(req);
        }catch (RemotingException e) {
            future.cancel();
            throw e;
        }
        return future;
    }
  • 阻塞
    可以看到所有的DefaultFuture都会被维护到FUTURES这个map中,而且request.getId()为key。当我们调用future.get获取结果的时,会循环判断当前response是否为空,如果为空就当前线程一直等待在done上,直到超时为止。
public class DefaultFuture implements ResponseFuture {
    private static final Map<Long, DefaultFuture> FUTURES   = new ConcurrentHashMap<Long, DefaultFuture>();
    private final Lock                            lock = new ReentrantLock();
    private final Condition                       done = lock.newCondition();
    private volatile Response                     response;
   public DefaultFuture(Channel channel, Request request, int timeout){
        this.channel = channel;
        this.request = request;
        this.id = request.getId();
        this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
        // put into waiting map.
        FUTURES.put(id, this);
        CHANNELS.put(id, channel);
    }
public Object get(int timeout) throws RemotingException {
        if (timeout <= 0) {
            timeout = Constants.DEFAULT_TIMEOUT;
        }
        if (! isDone()) {
            long start = System.currentTimeMillis();
            lock.lock();
            try {
                while (! isDone()) {
                    done.await(timeout, TimeUnit.MILLISECONDS);
                    if (isDone() || System.currentTimeMillis() - start > timeout) {
                        break;
                    }
                }
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } finally {
                lock.unlock();
            }
            if (! isDone()) {
                throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
            }
        }
        return returnFromResponse();
    }
    public boolean isDone() {
        return response != null;
    }
}
  • 唤醒
    NettyHandler获取到数据之后传递给HeaderExchangeHandler后,会调用DefaultFuture的静态方法received来设置response。会根据response的id从FUTURES中获取创建时缓存的future实例。如果获取到了就调用 future.doReceived(response)来设置返回值,并且通知等待在done上的线程,这时之前阻塞在future.get()方法上的线程就会立即返回。
#com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeHandler#handleResponse
   static void handleResponse(Channel channel, Response response) throws RemotingException {
        if (response != null && !response.isHeartbeat()) {
            DefaultFuture.received(channel, response);
        }
    }
#com.alibaba.dubbo.remoting.exchange.support.DefaultFuture#received
public static void received(Channel channel, Response response) {
        try {
            DefaultFuture future = FUTURES.remove(response.getId());
            if (future != null) {
                future.doReceived(response);
            } else {
                logger.warn("The timeout response finally returned at " 
                            + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date())) 
                            + ", response " + response 
                            + (channel == null ? "" : ", channel: " + channel.getLocalAddress() 
                                + " -> " + channel.getRemoteAddress()));
            }
        } finally {
            CHANNELS.remove(response.getId());
        }
    }
    private void doReceived(Response res) {
        lock.lock();
        try {
            response = res;
            if (done != null) {
                done.signal();
            }
        } finally {
            lock.unlock();
        }
        if (callback != null) {
            invokeCallback(callback);
        }
    }
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,546评论 6 507
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,224评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,911评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,737评论 1 294
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,753评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,598评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,338评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,249评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,696评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,888评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,013评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,731评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,348评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,929评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,048评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,203评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,960评论 2 355

推荐阅读更多精彩内容