详解Dubbo(四):消费端请求发送Exchanger

前言

前两篇文章讲了消费端代理的生成,最终到请求发送操作由Invoker来完成。Invoker同时集成了集群服务发现和路由功能,还集成了调用过程中的自定义扩展Filter。Invoker是业务对象的分水岭,请求到达Invoker之前,都是以业务接口和方法的方式调用,就是说调用方要拿到接口定义的api。Invoker之后就是Exchanger和Transporter层,只存在Request/Response了,在这里接口和方法变成了Request的一个参数。这篇文章先看一下Dubbo是怎么初始化远程通信Client并发送和接收请求的。下一篇将解析通信协议以及序列化等操作的实现。

Client初始化

回顾之前的白话Dubbo系列,Invoker调用的是Exchanger。Exchange层针对消费端和服务提供端分布封装成ExchangeClientExchangeServer。它们大部分接口都是一样的,只是对于Client来说,支持connect()操作来和提供端建立连接;而对于Server端,需要通过bind操作来监听端口,来接收消费端的连接请求。其它的数据发送和接收对于两端来说其实是一样的。

Dubbo Client初始化

还是以Dubbo协议为例,回顾下上一篇DubboProtocol的Invoker初始化操作:

@Override
    public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
        ...
        ...
        // 创建Invoker
        DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
        invokers.add(invoker);
        return invoker;
    }

在构造Invoker的时候,需要传入一个Client的列表,之后Invoker通过client发送请求和接收返回结果,至于请求协议打包、连接建立、异步io的结果接收等操作留给ExchangeClient来实现。下面看下getClients()方法的实现:

private ExchangeClient[] getClients(URL url) {
        boolean useShareConnect = false;
        //连接数配置
        int connections = url.getParameter(CONNECTIONS_KEY, 0);
        List<ReferenceCountExchangeClient> shareClients = null;
        // 如果没设置连接数,说明Consumer希望使用共享连接
        if (connections == 0) {
            useShareConnect = true;
            //共享连接数配置
            String shareConnectionsStr = url.getParameter(SHARE_CONNECTIONS_KEY, (String) null);
            connections = Integer.parseInt(StringUtils.isBlank(shareConnectionsStr) ? ConfigUtils.getProperty(SHARE_CONNECTIONS_KEY,
                    DEFAULT_SHARE_CONNECTIONS) : shareConnectionsStr);
            //获取共享Client
            shareClients = getSharedClient(url, connections);
        }
        //根据配置的连接个数初始化Client
        ExchangeClient[] clients = new ExchangeClient[connections];
        for (int i = 0; i < clients.length; i++) {
            if (useShareConnect) {
                //使用共享client
                clients[i] = shareClients.get(i);
            } else {
                //初始化Client
                clients[i] = initClient(url);
            }
        }
        return clients;
    }

上面的逻辑中涉及到共享Client的概念,因为Dubbo的服务是以接口为粒度的,每个Invoker对应了一个远程接口的调用封装。在实际应用中,一个应用会包含多个接口,如果对应同一个应用的多个Invoker每个都初始化一个Client的话,万一接口过多,会造成每个Consumer和Provider之间建立多个Connection,而且连接数随着consumer的个数增加而成倍数的增加。所以shareClient的意思就是对于同一个ip+port,所有invoker共享client,有点类似于连接池的概念,达到节约资源的目的。共享client最终初始化client的方式和普通的是一样的,所以这里直接看下initClient()方法是如何实现的。

private ExchangeClient initClient(URL url) {
        // client通信框架,默认netty
        String str = url.getParameter(CLIENT_KEY, url.getParameter(SERVER_KEY, DEFAULT_REMOTING_CLIENT));
        //使用Dubbo通信协议
        url = url.addParameter(CODEC_KEY, DubboCodec.NAME);
        // 设置发送心跳的间隔
        url = url.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT));
        // 检查传输层是否支持该框架
        if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
            throw new RpcException("Unsupported client type: " + str + "," +
                    " supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
        }

        ExchangeClient client;
        try {
            // client延迟建立连接,即第一次调用时才connect,已经不推荐使用
            if (url.getParameter(LAZY_CONNECT_KEY, false)) {
                client = new LazyConnectExchangeClient(url, requestHandler);
            } else {
                //初始化并连接server
                client = Exchangers.connect(url, requestHandler);
            }
        } catch (RemotingException e) {
            throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
        }
        return client;
    }

上面的初始化过程除了初始化参数外,就是调用工具类Exchangers来初始化client。这里面除了url之外还有一个ExchangeHandler参数,这个是用来处理服务端主动发送来的消息用的,对于Consumer发送请求的场景这里涉及不到。看下Exchangers.connect()是怎么实现的。

public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handler == null) {
            throw new IllegalArgumentException("handler == null");
        }
        url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
        return getExchanger(url).connect(url, handler);
    }

具体实现就是根据url参数获取对应的Exchanger,然后调用它的connect方法。Dubbo中默认实现类是HeaderExchanger

public class HeaderExchanger implements Exchanger {

    public static final String NAME = "header";

    @Override
    public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
    }

    @Override
    public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
    }

}

上面实现中,通过调用connect()方法获取到ExchangeClient的实现HeaderExchangeClient,这里会通过工具类Transporters获取到一个传输层的Client对象。获取Client对象时,对传入的handler,又做了两层封装,DecodeHandler用来对收到的Response中的数据部分解码成对象;而HeaderExchangeHandler作用是对于异步返回的Response找到当时的Request,将结果返给当初的调用方。

HeaderExchangeClient初始化

public HeaderExchangeClient(Client client, boolean startTimer) {
        Assert.notNull(client, "Client can't be null");
        this.client = client;
        this.channel = new HeaderExchangeChannel(client);

        if (startTimer) {
            URL url = client.getUrl();
            startReconnectTask(url);
            startHeartBeatTask(url);
        }
    }

由上面Client的构造函数可以看出主要做了两件事,首先将传入的client实现封装了一层,client的方法比如request()都是直接调用的channel的方法;其次启动了两个定时任务,一个是在和server端的connection断开后重连,一个是定时发送心跳。下面看下HeaderExchangeChannel对client做了一层封装后,主要干了什么。

    @Override
    public void send(Object message, boolean sent) throws RemotingException {
        if (closed) {
            throw new RemotingException(this.getLocalAddress(), null, "Failed to send message " + message + ", cause: The channel " + this + " is closed!");
        }
        if (message instanceof Request
                || message instanceof Response
                || message instanceof String) {
            channel.send(message, sent);
        } else {
            Request request = new Request();
            request.setVersion(Version.getProtocolVersion());
            request.setTwoWay(false);
            request.setData(message);
            channel.send(request, sent);
        }
    }
    @Override
    public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {
        if (closed) {
            throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
        }
        // create request.
        Request req = new Request();
        req.setVersion(Version.getProtocolVersion());
        req.setTwoWay(true);
        req.setData(request);
        DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout, executor);
        try {
            channel.send(req);
        } catch (RemotingException e) {
            future.cancel();
            throw e;
        }
        return future;
    }

这个类在发送前,将调用参数封装成一个request,设置版本号和请求类型。如果调用的是request()方法,即需要接收response,以异步请求的方式发出,返回给调用方(也就是Invoker)一个future。

请求发送过程

现在理一下一次请求的发送过程。在系统初始化阶段,创建Invoker的时候会初始化一个和服务端交互的ExchangeClient,对于Dubbo协议来说,就是DubboInvoker中包含一个或者多个HeaderExchangeClient。当代理调用Invoker的invoke()方法发送请求时,invoker选择一个Client发送request。如果是OneWay的请求,调用send方法,直接返回成功或者失败的结果。如果是TwoWay的请求,Client会返回一个Future给invoker,然后client在收到response后,会将收到的结果set到Future中,调用方就可以拿到远程接口的返回值了。

接收请求响应

在上面Invoker初始化Client的时候,需要传入一个ExchangeHandler用来接收异步响应回调。在HeaderExchangeClient初始化的时候,又在handler上面套了两层,所以最终的关系图大概时下面这样:

请求流转

在底层的Transporter收到Server端的数据并处理后,会将数据给到DecodeHandler,这个handler判断返回的数据是否实现了Decodeable接口,是的话就调用decode()方法并把解码出的value设置到Response中。

@Override
    public void received(Channel channel, Object message) throws RemotingException {
        if (message instanceof Decodeable) {
            decode(message);
        }
        //provider端解码Request
        if (message instanceof Request) {
            decode(((Request) message).getData());
        }
       //consumer端解码Response
        if (message instanceof Response) {
            decode(((Response) message).getResult());
        }
       //调用下一个handler
        handler.received(channel, message);
    }

第二个handler是HeaderExchangeHandler,如果是Server端返回的请求响应,最终会到handleResponse()方法中:

static void handleResponse(Channel channel, Response response) throws RemotingException {
        if (response != null && !response.isHeartbeat()) {
            DefaultFuture.received(channel, response);
        }
    }

还记得上面的请求发送过程吗?对于TwoWay的请求,request发出后,调用方会得到一个Future,这个Future就是在这个Handler这里填充结果的。这样一次完整的请求就完成了。

总结

Invoker将一次远程方法的调用封装成Request后,通过ExchangeClient发送出去,并通过传入的ExchangeHandler参数处理异步返回的Response并和之前的Request关联,返回给调用方。Dubbo这里对于Exchange和Transporter的划分使用了MEP设计模式(Message Exchange Pattern),感兴趣的话可以看下这个模式的定义。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,377评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,390评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,967评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,344评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,441评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,492评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,497评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,274评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,732评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,008评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,184评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,837评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,520评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,156评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,407评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,056评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,074评论 2 352