dubbo 服务调用过程

目录
dubbo 拓展机制 SPI
dubbo 自适应拓展机制
dubbo 服务导出
dubbo 服务引用
dubbo 服务字典
dubbo 服务路由
dubbo 集群
dubbo 负载均衡
dubbo 服务调用过程

大致流程图:


image.png

1.服务调用方式

从dubbo源码提供的demo作为入口分析服务调用过程,反编译demo的代理类我不是使用的arthas,我使用的是jdk自带的工具HSDB。最终代理类如下:

public class proxy0 implements DC, EchoService, DemoService {
    public static Method[] methods;
    private InvocationHandler handler;

    public proxy0(InvocationHandler var1) {
        this.handler = var1;
    }

    public proxy0() {
    }

    public String sayHello(String var1) {
        Object[] var2 = new Object[]{var1};
        Object var3 = this.handler.invoke(this, methods[0], var2);
        return (String)var3;
    }

    public Object $echo(Object var1) {
        Object[] var2 = new Object[]{var1};
        Object var3 = this.handler.invoke(this, methods[1], var2);
        return (Object)var3;
    }
}

其中InvocationHandler的实现为InvokerInvocationHandler,所以我们接着看它的invoker方法。(PS:服务引用的最后提到过默认通过JavassistProxyFactory生成代理类,从JavassistProxyFactory也可以看出在代理类实例化的时候传入的InvocationHandler就是InvokerInvocationHandler)

public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    String methodName = method.getName();
    Class<?>[] parameterTypes = method.getParameterTypes();
    // 拦截定义在 Object 类中的方法(未被子类重写),比如 wait/notify
    if (method.getDeclaringClass() == Object.class) {
        return method.invoke(invoker, args);
    }
    // 如果 toString、hashCode 和 equals 等方法被子类重写了,这里也直接调用
    if ("toString".equals(methodName) && parameterTypes.length == 0) {
        return invoker.toString();
    }
    if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
        return invoker.hashCode();
    }
    if ("equals".equals(methodName) && parameterTypes.length == 1) {
        return invoker.equals(args[0]);
    }

    // 将 method 和 args 封装到 RpcInvocation 中,并执行后续的调用
    return invoker.invoke(new RpcInvocation(method, args)).recreate();
}

InvokerInvocationHandler 中的 invoker 成员变量类型为 MockClusterInvoker,MockClusterInvoker 内部封装了服务降级逻辑,这里的Invoker原本是来自于Cluster.join方法返回的invoker,但是MockClusterWrapper作为Cluster的包装类在通过Extension加载具体的Cluster实现类的时候会使用MockClusterWrapper包装一层。下面简单看一下:

public Result invoke(Invocation invocation) throws RpcException {
    Result result = null;

    // 获取 mock 配置值
    String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), MOCK_KEY, Boolean.FALSE.toString()).trim();
    if (value.length() == 0 || "false".equalsIgnoreCase(value)) {
        //no mock
        // 无 mock 逻辑,直接调用其他 Invoker 对象的 invoke 方法,
        // 比如 FailoverClusterInvoker
        result = this.invoker.invoke(invocation);
    } else if (value.startsWith("force")) {
        if (logger.isWarnEnabled()) {
            logger.warn("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + directory.getUrl());
        }
        //force:direct mock
        // force:xxx 直接执行 mock 逻辑,不发起远程调用
        result = doMockInvoke(invocation, null);
    } else {
        //fail-mock
        // fail:xxx 表示消费方对调用服务失败后,再执行 mock 逻辑,不抛出异常
        try {
            result = this.invoker.invoke(invocation);

            //fix:#4585
            if(result.getException() != null && result.getException() instanceof RpcException){
                RpcException rpcException= (RpcException)result.getException();
                if(rpcException.isBiz()){
                    throw  rpcException;
                }else {
                    // 调用失败,执行 mock 逻辑
                    result = doMockInvoke(invocation, rpcException);
                }
            }

        } catch (RpcException e) {
            if (e.isBiz()) {
                throw e;
            }

            if (logger.isWarnEnabled()) {
                logger.warn("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + directory.getUrl(), e);
            }
            result = doMockInvoke(invocation, e);
        }
    }
    return result;
}

其中doMockInvoke是在发生异常或者是mock 的配置为 force的时候调用,我们接下来看看doMockInvoke

private Result doMockInvoke(Invocation invocation, RpcException e) {
    Result result = null;
    Invoker<T> minvoker;

    // 通过directory列举invokers
    // ps:这里会通过MockInvokersSelector筛选出mock配置的本地调用
    List<Invoker<T>> mockInvokers = selectMockInvoker(invocation);
    // 如果没有列举的invoker,新建一个MockInvoker
    if (CollectionUtils.isEmpty(mockInvokers)) {
        minvoker = (Invoker<T>) new MockInvoker(directory.getUrl(), directory.getInterface());
    } else {
        // 获取第一个invoker
        minvoker = mockInvokers.get(0);
    }
    try {
        result = minvoker.invoke(invocation);
    } catch (RpcException me) {
        if (me.isBiz()) {
            result = AsyncRpcResult.newDefaultAsyncResult(me.getCause(), invocation);
        } else {
            throw new RpcException(me.getCode(), getMockExceptionMessage(e, me), me.getCause());
        }
    } catch (Throwable me) {
        throw new RpcException(getMockExceptionMessage(e, me), me.getCause());
    }
    return result;
}

代码比较简单,可以自行研究下。
回到刚才的invoker调用,继续看正常的远程调用。接着进入FailoverClusterInvoker进行集群调用,集群调用中会使用负载均衡从invokers(在服务引入中提到的RegistryDirectory通过监听Zookeeper节点数据动态变化的invoker列表)中选出一个调用。这个过程已经在前面分析过了,接下来会经过InvokerListener以及各种filter的调用,如果在InvokerListener和filter执行过程中都没有问题,接着调用的是AsyncToSyncInvoker,从名字我们知道是处理异步转同步的,也就是该次调用是异步还是同步的。

public Result invoke(Invocation invocation) throws RpcException {
    Result asyncResult = invoker.invoke(invocation);

    try {
        if (InvokeMode.SYNC == ((RpcInvocation) invocation).getInvokeMode()) {
            asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
        }
    } catch (InterruptedException e) {
        throw new RpcException("Interrupted unexpectedly while waiting for remoting result to return!  method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    } catch (ExecutionException e) {
        Throwable t = e.getCause();
        if (t instanceof TimeoutException) {
            throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        } else if (t instanceof RemotingException) {
            throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    } catch (Throwable e) {
        throw new RpcException(e.getMessage(), e);
    }
    return asyncResult;
}

可以看到当调用方式是同步时,会通过Result.get的方式阻塞等待结果。
接着就是调用DubboInvoker,可以看到它的invoke方法在父类AbstractInvoker中,所以先看AbstractInvoker的invoke方法

public Result invoke(Invocation inv) throws RpcException {
    // if invoker is destroyed due to address refresh from registry, let's allow the current invoke to proceed
    if (destroyed.get()) {
        logger.warn("Invoker for service " + this + " on consumer " + NetUtils.getLocalHost() + " is destroyed, "
                + ", dubbo version is " + Version.getVersion() + ", this invoker should not be used any longer");
    }
    RpcInvocation invocation = (RpcInvocation) inv;
    // 设置 Invoker
    invocation.setInvoker(this);
    if (CollectionUtils.isNotEmptyMap(attachment)) {
        // 设置 attachment
        invocation.addAttachmentsIfAbsent(attachment);
    }
    Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
    if (CollectionUtils.isNotEmptyMap(contextAttachments)) {
        /**
         * invocation.addAttachmentsIfAbsent(context){@link RpcInvocation#addAttachmentsIfAbsent(Map)}should not be used here,
         * because the {@link RpcContext#setAttachment(String, String)} is passed in the Filter when the call is triggered
         * by the built-in retry mechanism of the Dubbo. The attachment to update RpcContext will no longer work, which is
         * a mistake in most cases (for example, through Filter to RpcContext output traceId and spanId and other information).
         */
        // 添加 contextAttachments 到 RpcInvocation#attachment 变量中
        invocation.addAttachments(contextAttachments);
    }

    // 设置异步信息到 RpcInvocation#attachment 中
    invocation.setInvokeMode(RpcUtils.getInvokeMode(url, invocation));
    // 如果异步就设置invoke_id
    RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);

    try {
        return doInvoke(invocation);
    } catch (InvocationTargetException e) { // biz exception
        Throwable te = e.getTargetException();
        if (te == null) {
            return AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
        } else {
            if (te instanceof RpcException) {
                ((RpcException) te).setCode(RpcException.BIZ_EXCEPTION);
            }
            return AsyncRpcResult.newDefaultAsyncResult(null, te, invocation);
        }
    } catch (RpcException e) {
        if (e.isBiz()) {
            return AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
        } else {
            throw e;
        }
    } catch (Throwable e) {
        return AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
    }
}

可以看到其中大部分代码用于添加信息到 RpcInvocation#attachment 变量中,添加完毕后,调用 doInvoke 执行后续的调用。doInvoke 是一个抽象方法,需要由子类实现,下面到 DubboInvoker 中看一下

protected Result doInvoke(final Invocation invocation) throws Throwable {
    RpcInvocation inv = (RpcInvocation) invocation;
    final String methodName = RpcUtils.getMethodName(invocation);
    // 设置 path 和 version 到 attachment 中
    inv.setAttachment(PATH_KEY, getUrl().getPath());
    inv.setAttachment(VERSION_KEY, version);

    ExchangeClient currentClient;
    // 从 clients 数组中获取 ExchangeClient
    if (clients.length == 1) {
        currentClient = clients[0];
    } else {
        // 轮询获取链接
        currentClient = clients[index.getAndIncrement() % clients.length];
    }
    try {
        // 获取 dubbo:method 中return的设置,默认为true
        // true,则返回future,或回调onreturn等方法,如果设置为false,则请求发送成功后直接返回Null
        boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
        // 获取方法超时配置
        int timeout = getUrl().getMethodPositiveParameter(methodName, TIMEOUT_KEY, DEFAULT_TIMEOUT);
        if (isOneway) {
            boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
            // 发送请求
            currentClient.send(inv, isSent);
            // 直接返回 appResponse 空对象
            return AsyncRpcResult.newDefaultAsyncResult(invocation);
        } else {
            AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv);
            CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout);
            // 异步设置结果到 asyncRpcResult 中,
            // 如果方式为同步,将在后面的 AsyncToSyncInvoker 中调用 get 方法阻塞知道获取结果
            asyncRpcResult.subscribeTo(responseFuture);
            // save for 2.6.x compatibility, for example, TraceFilter in Zipkin uses com.alibaba.xxx.FutureAdapter
            // 兼容dubbo 2.6.x 版本,设置 future 到上下文中
            FutureContext.getContext().setCompatibleFuture(responseFuture);
            return asyncRpcResult;
        }
    } catch (TimeoutException e) {
        throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    } catch (RemotingException e) {
        throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}

可以看到这里就是选择一个client然后发起网络请求

2.服务消费方发送请求

接下来我们继续分析请求的发送过程,ExchangeClient类以及它的一些子类。先来看下ExchangeClient的类图。


image.png

2.1发送请求

上面的代码中currentClient为ReferenceCountExchangeClient,我们现在分析下ReferenceCountExchangeClient这个类。

final class ReferenceCountExchangeClient implements ExchangeClient {

    private final URL url;
    // 引用计数
    private final AtomicInteger referenceCount = new AtomicInteger(0);

    // 装饰对象
    private ExchangeClient client;

    public ReferenceCountExchangeClient(ExchangeClient client) {
        this.client = client;
        // 增加计数
        referenceCount.incrementAndGet();
        this.url = client.getUrl();
    }
.......
}

ReferenceCountExchangeClient 内部定义了一个引用计数变量 referenceCount,每当该对象被引用一次 referenceCount 都会进行自增。每当 close 方法被调用时,referenceCount 进行自减。ReferenceCountExchangeClient 内部仅实现了一个引用计数的功能,其他方法并无复杂逻辑,均是直接调用被装饰对象的相关方法。
ReferenceCountExchangeClient包装的对象是HeaderExchangeClient,HeaderExchangeClient在HeaderExchanger中使用connect方法生成。

public class HeaderExchangeClient implements ExchangeClient {

    private final Client client;
    private final ExchangeChannel channel;

    // 执行器
    private static final HashedWheelTimer IDLE_CHECK_TIMER = new HashedWheelTimer(
            new NamedThreadFactory("dubbo-client-idleCheck", true), 1, TimeUnit.SECONDS, TICKS_PER_WHEEL);
    // 心跳任务
    private HeartbeatTimerTask heartBeatTimerTask;
    // 连接任务
    private ReconnectTimerTask reconnectTimerTask;

    public HeaderExchangeClient(Client client, boolean startTimer) {
        Assert.notNull(client, "Client can't be null");
        this.client = client;
        // 创建 HeaderExchangeChannel 对象
        this.channel = new HeaderExchangeChannel(client);

        if (startTimer) {
            URL url = client.getUrl();
            // 开始重新连接任务,当连接不可用或者是还在超时时间内
            startReconnectTask(url);
            // 开始心跳任务
            startHeartBeatTask(url);
        }
    }
}

HeaderExchangeClient 中很多方法只有一行代码,即调用 HeaderExchangeChannel 对象的同签名方法。那 HeaderExchangeClient 有什么用处呢?答案是封装了一些关于心跳检测的逻辑以及重新建立连接。接下来继续看HeaderExchangeChannel

final class HeaderExchangeChannel implements ExchangeChannel {
    // 无返回值
    @Override
    public void send(Object message, boolean sent) throws RemotingException {
        if (closed) {
            throw new RemotingException(this.getLocalAddress(), null, "Failed to send message " + message + ", cause: The channel " + this + " is closed!");
        }
        if (message instanceof Request
                || message instanceof Response
                || message instanceof String) {
            channel.send(message, sent);
        } else {
            Request request = new Request();
            request.setVersion(Version.getProtocolVersion());
            // 设置双向通信标志
            request.setTwoWay(false);
            // 这里的 message 变量类型为 RpcInvocation
            request.setData(message);
            channel.send(request, sent);
        }
    }
    // 有返回值
    @Override
    public CompletableFuture<Object> request(Object request, int timeout) throws RemotingException {
        if (closed) {
            throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
        }
        // create request.
        Request req = new Request();
        req.setVersion(Version.getProtocolVersion());
        // 设置双向通信标志
        req.setTwoWay(true);
        // 这里的 request 变量类型为 RpcInvocation
        req.setData(request);
        // 创建 DefaultFuture 对象
        DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout);
        try {
            // 调用 NettyClient 的 send 方法发送请求
            channel.send(req);
        } catch (RemotingException e) {
            future.cancel();
            throw e;
        }
        return future;
    }
}

可以看到send方法无返回值,request有一个future的返回。
接下来我们继续看channel.send方法,其中channel默认为NettyClient,NettyClient没有send方法,而是它的父类AbstractPeer实现了send方法。然后会继续调用NettyClient的父类AbstractClient的sent方法。

public abstract class AbstractPeer implements Endpoint, ChannelHandler {
    @Override
    public void send(Object message) throws RemotingException {
        send(message, url.getParameter(Constants.SENT_KEY, false));
    }
}

public abstract class AbstractClient extends AbstractEndpoint implements Client {
    @Override
    public void send(Object message, boolean sent) throws RemotingException {
        if (needReconnect && !isConnected()) {
            connect();
        }
        // 获取 Channel,getChannel 是一个抽象方法,具体由子类实现
        Channel channel = getChannel();
        //TODO Can the value returned by getChannel() be null? need improvement.
        if (channel == null || !channel.isConnected()) {
            throw new RemotingException(this, "message can not send, because channel is closed . url:" + getUrl());
        }
        channel.send(message, sent);
    }
}

继续看NettyClient的getChannel方法

public class NettyClient extends AbstractClient {
    
    // 这里的 Channel 全限定名称为 org.jboss.netty.channel.Channel
    private volatile Channel channel;

    @Override
    protected com.alibaba.dubbo.remoting.Channel getChannel() {
        Channel c = channel;
        if (c == null || !c.isConnected())
            return null;
        // 获取一个 NettyChannel 类型对象
        return NettyChannel.getOrAddChannel(c, getUrl(), this);
    }
}

final class NettyChannel extends AbstractChannel {

    private static final ConcurrentMap<org.jboss.netty.channel.Channel, NettyChannel> channelMap = 
        new ConcurrentHashMap<org.jboss.netty.channel.Channel, NettyChannel>();

    private final org.jboss.netty.channel.Channel channel;
    
    /** 私有构造方法 */
    private NettyChannel(org.jboss.netty.channel.Channel channel, URL url, ChannelHandler handler) {
        super(url, handler);
        if (channel == null) {
            throw new IllegalArgumentException("netty channel == null;");
        }
        this.channel = channel;
    }

    static NettyChannel getOrAddChannel(org.jboss.netty.channel.Channel ch, URL url, ChannelHandler handler) {
        if (ch == null) {
            return null;
        }
        
        // 尝试从集合中获取 NettyChannel 实例
        NettyChannel ret = channelMap.get(ch);
        if (ret == null) {
            // 如果 ret = null,则创建一个新的 NettyChannel 实例
            NettyChannel nc = new NettyChannel(ch, url, handler);
            if (ch.isConnected()) {
                // 将 <Channel, NettyChannel> 键值对存入 channelMap 集合中
                ret = channelMap.putIfAbsent(ch, nc);
            }
            if (ret == null) {
                ret = nc;
            }
        }
        return ret;
    }
}

获取到NettyChannel的实例后,就开始调用NettyChannel的sent方法:

public void send(Object message, boolean sent) throws RemotingException {
    // whether the channel is closed
    super.send(message, sent);

    boolean success = true;
    int timeout = 0;
    try {
        // 发送消息(包含请求和响应消息)
        ChannelFuture future = channel.writeAndFlush(message);
        // sent 的值源于 <dubbo:method sent="true/false" /> 中 sent 的配置值,有两种配置值:
        //   1. true: 等待消息发出,消息发送失败将抛出异常
        //   2. false: 不等待消息发出,将消息放入 IO 队列,即刻返回
        // 默认情况下 sent = false;
        if (sent) {
            // wait timeout ms
            timeout = getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
            // 等待消息发出,若在规定时间没能发出,success 会被置为 false
            success = future.await(timeout);
        }
        Throwable cause = future.cause();
        // 若 success 为 false,这里抛出异常
        if (cause != null) {
            throw cause;
        }
    } catch (Throwable e) {
        throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
    }
    if (!success) {
        throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress()
                + "in timeout(" + timeout + "ms) limit");
    }
}

接下来看下sayHello方法的整个调用链

proxy0#sayHello(String)
-> InvokerInvocationHandler#invoke(Object, Method, Object[])
-> MockClusterInvoker#invoke(Invocation) //本地调用和错误时mock本地实现
-> AbstractClusterInvoker#invoke(Invocation) //列举可用服务以及初始化负载均衡对象
-> FailoverClusterInvoker#doInvoke(Invocation, List<Invoker<T>>, LoadBalance) //实现了Failover模式的集群容错
-> Filter#invoke(Invoker, Invocation) // 包含多个 Filter 调用
-> ListenerInvokerWrapper#invoke(Invocation)
-> AsyncToSyncInvoker#invoker // 如果需要异步转同步
-> AbstractInvoker#invoke(Invocation) // 设置attachment和同步或异步
-> DubboInvoker#doInvoke(Invocation) // 设置客户端以及发送请求
-> ReferenceCountExchangeClient#request(Object, int) // 记录该client使用次数
-> HeaderExchangeClient#request(Object, int) // 执行心跳任务和连接超时重连任务
-> HeaderExchangeChannel#request(Object, int) // 设置请求参数
-> AbstractPeer#send(Object) // 检查chaneel是否关闭
-> AbstractClient#send(Object, boolean) // 配置netty,建立连接,获取chaneel重新连接
-> NettyChannel#send(Object, boolean) // 发送请求和捕获异常,向上抛出
-> NioClientSocketChannel#write(Object)

对于服务请求的编码以及接收请求的解码过程暂不分析

2.2 服务提供方接收请求

2.2.1 线程派发模型

图片来自官网

默认使用的是all,接下来看看AllChannelHandler

public class AllChannelHandler extends WrappedChannelHandler {

    public AllChannelHandler(ChannelHandler handler, URL url) {
        super(handler, url);
    }

    /** 处理连接事件 */
    @Override
    public void connected(Channel channel) throws RemotingException {
        // 获取线程池
        ExecutorService executor = getExecutorService();
        try {
            // 将连接事件派发到线程池中处理
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);
        }
    }

    /** 处理断开事件 */
    @Override
    public void disconnected(Channel channel) throws RemotingException {
        ExecutorService executor = getExecutorService();
        try {
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException("disconnect event", channel, getClass() + " error when process disconnected event .", t);
        }
    }

    /** 处理请求和响应消息,这里的 message 变量类型可能是 Request,也可能是 Response */
    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService executor = getExecutorService();
        try {
            // 将请求和响应消息派发到线程池中处理
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
            //TODO A temporary solution to the problem that the exception information can not be sent to the opposite end after the thread pool is full. Need a refactoring
            //fix The thread pool is full, refuses to call, does not return, and causes the consumer to wait for time out
            if(message instanceof Request && t instanceof RejectedExecutionException){
                Request request = (Request)message;
                // 如果通信方式为双向通信,此时将 Server side ... threadpool is exhausted
                // 错误信息封装到 Response 中,并返回给服务消费方。
                if(request.isTwoWay()){
                    String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage();
                    Response response = new Response(request.getId(), request.getVersion());
                    response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
                    response.setErrorMessage(msg);
                    // 返回包含错误信息的 Response 对象
                    channel.send(response);
                    return;
                }
            }
            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
        }
    }

    /** 处理异常信息 */
    @Override
    public void caught(Channel channel, Throwable exception) throws RemotingException {
        ExecutorService executor = getExecutorService();
        try {
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
        } catch (Throwable t) {
            throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t);
        }
    }
}

继续看下构造函数中调用的父构造方法

public WrappedChannelHandler(ChannelHandler handler, URL url) {
    this.handler = handler;
    this.url = url;
    // 默认为FixedThreadPool的实现
    executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);

    String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY;
    if (CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY))) {
        componentKey = CONSUMER_SIDE;
    }
    // 配置到satastore中,方便client或者是server关闭时同时关闭线程池
    DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
    dataStore.put(componentKey, Integer.toString(url.getPort()), executor);
}

这里默认使用了一个最大线程数为200的线程池,具体线程池用于处理什么样的事件取决于使用怎样的线程派发模型。

2.2.2 调用服务

针对netty3和netty4接收channel数据的类和方法有所不同,以下以netty4为例,展示调用链,从NettyServerHandler进入是因为,在服务导出过程中向netty的bootstrap中注册了NettyServerHandler,根据不同的策略以下调用链中的具体的类可能有所不同,这里所列举的都是默认的处理类

NettyServerHandler#channelRead(ChannelHandlerContext, MessageEvent)
-> AbstractPeer#received
-> MultiMessageHandler#received // 多消息处理
-> HeartbeatHandler#received // 处理心跳请求,如果不是继续后续调用
-> AllChannelHandler#received // 将此次处理放入线程池执行
-> ExecutorService#execute(Runnable) // 由线程池执行后续的调用逻辑
-> DecodeHandler#received // 解码
-> HeaderExchangeHandler#received // 处理request或者response或者telnet
-> Dubboprotocol#reply // 获取服务导出过程中保存的对应的invoker
-> filter#invoker#invoke
-> invokerWrapper#invoke
-> AbstractProxyInvoker#invoke // 调用具体的方法,封装调用结果
-> 服务导出过程中生成的对应的代理类

这里线程池的execute执行的也就是ChannelEventRunnable,我们从ChannelEventRunnable开始分析

public class ChannelEventRunnable implements Runnable {
    
    private final ChannelHandler handler;
    private final Channel channel;
    private final ChannelState state;
    private final Throwable exception;
    private final Object message;
    
    @Override
    public void run() {
        // 检测通道状态,对于请求或响应消息,此时 state = RECEIVED
        if (state == ChannelState.RECEIVED) {
            try {
                // 将 channel 和 message 传给 ChannelHandler 对象,进行后续的调用
                handler.received(channel, message);
            } catch (Exception e) {
                logger.warn("... operation error, channel is ... message is ...");
            }
        } 
        
        // 其他消息类型通过 switch 进行处理
        else {
            switch (state) {
            case CONNECTED:
                try {
                    handler.connected(channel);
                } catch (Exception e) {
                    logger.warn("... operation error, channel is ...");
                }
                break;
            case DISCONNECTED:
                // ...
            case SENT:
                // ...
            case CAUGHT:
                // ...
            default:
                logger.warn("unknown state: " + state + ", message is " + message);
            }
        }

    }
}

ChannelEventRunnable 仅是一个中转站,它的 run 方法中并不包含具体的调用逻辑,仅用于将参数传给其他 ChannelHandler 对象进行处理,该对象类型为 DecodeHandler。

public class DecodeHandler extends AbstractChannelHandlerDelegate {

    private static final Logger log = LoggerFactory.getLogger(DecodeHandler.class);

    public DecodeHandler(ChannelHandler handler) {
        super(handler);
    }

    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        if (message instanceof Decodeable) {
            // 对 Decodeable 接口实现类对象进行解码
            decode(message);
        }

        if (message instanceof Request) {
            // 对 Request 的 data 字段进行解码
            decode(((Request) message).getData());
        }

        if (message instanceof Response) {
            // 对 Request 的 result 字段进行解码
            decode(((Response) message).getResult());
        }

        // 执行后续逻辑
        handler.received(channel, message);
    }

    private void decode(Object message) {
        // Decodeable 接口目前有两个实现类,
        // 分别为 DecodeableRpcInvocation 和 DecodeableRpcResult
        if (message instanceof Decodeable) {
            try {
                // 执行解码逻辑
                ((Decodeable) message).decode();
                if (log.isDebugEnabled()) {
                    log.debug("Decode decodeable message " + message.getClass().getName());
                }
            } catch (Throwable e) {
                if (log.isWarnEnabled()) {
                    log.warn("Call Decodeable.decode failed: " + e.getMessage(), e);
                }
            } // ~ end of catch
        } // ~ end of if
    } // ~ end of method decode

}

DecodeHandler 存在的意义就是保证请求或响应对象可在线程池中被解码。解码完毕后,完全解码后的 Request 对象会继续向后传递,下一站是 HeaderExchangeHandler。

public class HeaderExchangeHandler implements ChannelHandlerDelegate {
  void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {
        Response res = new Response(req.getId(), req.getVersion());
        // 检测请求是否合法,不合法则返回状态码为 BAD_REQUEST 的响应
        if (req.isBroken()) {
            Object data = req.getData();

            String msg;
            if (data == null) {
                msg = null;
            } else if (data instanceof Throwable) {
                msg = StringUtils.toString((Throwable) data);
            } else {
                msg = data.toString();
            }
            res.setErrorMessage("Fail to decode request due to: " + msg);
            res.setStatus(Response.BAD_REQUEST);

            // 将调用结果返回给服务消费端
            channel.send(res);
            return;
        }
        // find handler by message class.
        // 获取 data 字段值,也就是 RpcInvocation 对象
        Object msg = req.getData();
        try {
            // 继续向下调用
            CompletionStage<Object> future = handler.reply(channel, msg);
            future.whenComplete((appResult, t) -> {
                try {
                    if (t == null) {
                        // 设置 OK 状态码
                        res.setStatus(Response.OK);
                        // 设置调用结果
                        res.setResult(appResult);
                    } else {
                        // 若调用过程出现异常,则设置 SERVICE_ERROR,表示服务端异常
                        res.setStatus(Response.SERVICE_ERROR);
                        res.setErrorMessage(StringUtils.toString(t));
                    }
                    // 将调用结果返回给服务消费端
                    channel.send(res);
                } catch (RemotingException e) {
                    logger.warn("Send result to consumer failed, channel is " + channel + ", msg is " + e);
                } finally {
                    // HeaderExchangeChannel.removeChannelIfDisconnected(channel);
                }
            });
        } catch (Throwable e) {
            res.setStatus(Response.SERVICE_ERROR);
            res.setErrorMessage(StringUtils.toString(e));
            channel.send(res);
        }
    }
    public void received(Channel channel, Object message) throws RemotingException {
        channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
        final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
        try {
            // 处理请求对象
            if (message instanceof Request) {
                // handle request.
                Request request = (Request) message;
                if (request.isEvent()) {
                    // 处理事件
                    handlerEvent(channel, request);
                } else {
                    // 双向通信
                    if (request.isTwoWay()) {
                        // 向后调用服务,并得到调用结果
                        handleRequest(exchangeChannel, request);
                    }
                    // 如果是单向通信,仅向后调用指定服务即可,无需返回调用结果
                    else {
                        handler.received(exchangeChannel, request.getData());
                    }
                }
            }
            // 处理响应对象,服务消费方会执行此处逻辑,后面分析
            else if (message instanceof Response) {
                handleResponse(channel, (Response) message);
            }
            // telnet 相关,忽略
            else if (message instanceof String) {
                if (isClientSide(channel)) {
                    Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
                    logger.error(e.getMessage(), e);
                } else {
                    String echo = handler.telnet(channel, (String) message);
                    if (echo != null && echo.length() > 0) {
                        channel.send(echo);
                    }
                }
            } else {
                handler.received(exchangeChannel, message);
            }
        } finally {
            HeaderExchangeChannel.removeChannelIfDisconnected(channel);
        }
    }
}

对于双向通信,HeaderExchangeHandler 首先向后进行调用,得到调用结果。然后将调用结果封装到 Response 对象中,最后再将该对象返回给服务消费方。如果请求不合法,或者调用失败,则将错误信息封装到 Response 对象中,并返回给服务消费方。

其中handler.reply调用的是dubboProtocol的匿名类对象逻辑

public class DubboProtocol extends AbstractProtocol {
  private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
    @Override
        public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {

            if (!(message instanceof Invocation)) {
                throw new RemotingException(channel, "Unsupported request: "
                        + (message == null ? null : (message.getClass().getName() + ": " + message))
                        + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
            }

            Invocation inv = (Invocation) message;
            // 获取 Invoker 实例
            Invoker<?> invoker = getInvoker(channel, inv);
            // need to consider backward-compatibility if it's a callback
            // 参数回调 相关
            if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
                String methodsStr = invoker.getUrl().getParameters().get("methods");
                boolean hasMethod = false;
                if (methodsStr == null || !methodsStr.contains(",")) {
                    hasMethod = inv.getMethodName().equals(methodsStr);
                } else {
                    String[] methods = methodsStr.split(",");
                    for (String method : methods) {
                        if (inv.getMethodName().equals(method)) {
                            hasMethod = true;
                            break;
                        }
                    }
                }
                if (!hasMethod) {
                    logger.warn(new IllegalStateException("The methodName " + inv.getMethodName()
                            + " not found in callback service interface ,invoke will be ignored."
                            + " please update the api interface. url is:"
                            + invoker.getUrl()) + " ,invocation is :" + inv);
                    return null;
                }
            }
            RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
            // 通过 Invoker 调用具体的服务
            Result result = invoker.invoke(inv);
            // 返回入参
            return result.completionFuture().thenApply(Function.identity());
        }
  }

  Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {
        boolean isCallBackServiceInvoke = false;
        boolean isStubServiceInvoke = false;
        int port = channel.getLocalAddress().getPort();
        String path = inv.getAttachments().get(PATH_KEY);

        // if it's callback service on client side
        // 本地存根
        isStubServiceInvoke = Boolean.TRUE.toString().equals(inv.getAttachments().get(STUB_EVENT_KEY));
        if (isStubServiceInvoke) {
            port = channel.getRemoteAddress().getPort();
        }

        //callback
        // 是否是客户端并且不是 本地存根
        isCallBackServiceInvoke = isClientSide(channel) && !isStubServiceInvoke;
        if (isCallBackServiceInvoke) {
            path += "." + inv.getAttachments().get(CALLBACK_SERVICE_KEY);
            inv.getAttachments().put(IS_CALLBACK_SERVICE_INVOKE, Boolean.TRUE.toString());
        }

        // 计算 service key,格式为 groupName/serviceName:serviceVersion:port。比如:
        //   dubbo/com.alibaba.dubbo.demo.DemoService:1.0.0:20880
        String serviceKey = serviceKey(port, path, inv.getAttachments().get(VERSION_KEY), inv.getAttachments().get(GROUP_KEY));
        // 从 exporterMap 查找与 serviceKey 相对应的 DubboExporter 对象,
        // 服务导出过程中会将 <serviceKey, DubboExporter> 映射关系存储到 exporterMap 集合中
        DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);

        // 获取 Invoker 对象,并返回
        if (exporter == null) {
            throw new RemotingException(channel, "Not found exported service: " + serviceKey + " in " + exporterMap.keySet() + ", may be version or group mismatch " +
                    ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:" + inv);
        }

        return exporter.getInvoker();
    }

}

invoke方法逻辑在AbstractProxyInvoker中

public Result invoke(Invocation invocation) throws RpcException {
    try {
        // 调用 doInvoke 执行后续的调用,并将调用结果封装到 asyncRpcResult
        Object value = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments());
        CompletableFuture<Object> future = wrapWithFuture(value, invocation);
        AsyncRpcResult asyncRpcResult = new AsyncRpcResult(invocation);
        future.whenComplete((obj, t) -> {
            AppResponse result = new AppResponse();
            if (t != null) {
                if (t instanceof CompletionException) {
                    result.setException(t.getCause());
                } else {
                    result.setException(t);
                }
            } else {
                result.setValue(obj);
            }
            asyncRpcResult.complete(result);
        });
        return asyncRpcResult;
    } catch (InvocationTargetException e) {
        if (RpcContext.getContext().isAsyncStarted() && !RpcContext.getContext().stopAsync()) {
            logger.error("Provider async started, but got an exception from the original method, cannot write the exception back to consumer because an async result may have returned the new thread.", e);
        }
        return AsyncRpcResult.newDefaultAsyncResult(null, e.getTargetException(), invocation);
    } catch (Throwable e) {
        throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}

doInvoke 是一个抽象方法,这个需要由具体的 Invoker 实例实现。Invoker 实例是在运行时通过 JavassistProxyFactory 创建的,创建逻辑如下

public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
    // TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
    // 为目标类创建 Wrapper
    final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
    // 创建匿名 Invoker 类对象,并实现 doInvoke 方法
    return new AbstractProxyInvoker<T>(proxy, type, url) {
        @Override
        protected Object doInvoke(T proxy, String methodName,
                                  Class<?>[] parameterTypes,
                                  Object[] arguments) throws Throwable {
            // 调用 Wrapper 的 invokeMethod 方法,invokeMethod 最终会调用目标方法
            return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
        }
    };
}

Wrapper 是一个抽象类,其中 invokeMethod 是一个抽象方法。Dubbo 会在运行时通过 Javassist 框架为 Wrapper 生成实现类,并实现 invokeMethod 方法,该方法最终会根据调用信息调用具体的服务。以 DemoServiceImpl 为例,Javassist 为其生成的代理类如下。

public class Wrapper1
extends Wrapper
implements ClassGenerator.DC {
    public static String[] pns;
    public static Map pts;
    public static String[] mns;
    public static String[] dmns;
    public static Class[] mts0;

    public String[] getPropertyNames() {
        return pns;
    }

    public boolean hasProperty(String string) {
        return pts.containsKey(string);
    }

    public Class getPropertyType(String string) {
        return (Class)pts.get(string);
    }

    public String[] getMethodNames() {
        return mns;
    }

    public String[] getDeclaredMethodNames() {
        return dmns;
    }

    public void setPropertyValue(Object object, String string, Object object2) {
        try {
            DemoServiceImpl demoServiceImpl = (DemoServiceImpl)object;
        }
        catch (Throwable throwable) {
            throw new IllegalArgumentException(throwable);
        }
        throw new NoSuchPropertyException(new StringBuffer().append("Not found property \"").append(string).append("\" field or setter method in class org.apache.dubbo.demo.provider.DemoServiceImpl.").toString());
    }

    public Object getPropertyValue(Object object, String string) {
        try {
            DemoServiceImpl demoServiceImpl = (DemoServiceImpl)object;
        }
        catch (Throwable throwable) {
            throw new IllegalArgumentException(throwable);
        }
        throw new NoSuchPropertyException(new StringBuffer().append("Not found property \"").append(string).append("\" field or setter method in class org.apache.dubbo.demo.provider.DemoServiceImpl.").toString());
    }

    public Object invokeMethod(Object object, String string, Class[] arrclass, Object[] arrobject) throws InvocationTargetException {
        DemoServiceImpl demoServiceImpl;
        try {
            demoServiceImpl = (DemoServiceImpl)object;
        }
        catch (Throwable throwable) {
            throw new IllegalArgumentException(throwable);
        }
        try {
            if ("sayHello".equals(string) && arrclass.length == 1) {
                return demoServiceImpl.sayHello((String)arrobject[0]);
            }
        }
        catch (Throwable throwable) {
            throw new InvocationTargetException(throwable);
        }
        throw new NoSuchMethodException(new StringBuffer().append("Not found method \"").append(string).append("\" in class org.apache.dubbo.demo.provider.DemoServiceImpl.").toString());
    }
}

2.2.3 服务提供方返回结果

执行完HeaderExchangeHandler#handleRequest中的handler.reply后,会调用channel.send将调用结果返回给服务消费者

HeaderExchangeHandler#handleRequest
-> HeaderExchangeChannel#send
-> NettyChannel#send
-> NioSocketChannel#writeAndFlush

2.2.4 服务消费方接收服务提供方返回

在服务引入过程中,client会在netty bootstrap中添加类型为NettyClientHandler的handler,当消费方收到服务提供方的返回后,就触发NettyClientHandler中的channelRead方法。

NettyClientHandler#channelRead
-> AbstractPeer#received
-> MultiMessageHandler#received
-> HeartbeatHandler#received
-> AllChannelHandler#received
-> ExecutorService#execute(Runnable)
-> DecodeHandler#received
-> HeaderExchangeHandler#received
感觉和服务提供者收到请求差不多。但是,从这里开始执行的是HeaderExchangeHandler#handleResponse。

static void handleResponse(Channel channel, Response response) throws RemotingException {
        if (response != null && !response.isHeartbeat()) {
            DefaultFuture.received(channel, response);
        }
    }

public static void received(Channel channel, Response response) {
        received(channel, response, false);
    }

    public static void received(Channel channel, Response response, boolean timeout) {
        try {
            // 在 HeaderExchangeChannel#request,中创建实例时添加到 FUTURES 中
            DefaultFuture future = FUTURES.remove(response.getId());
            if (future != null) {
                Timeout t = future.timeoutCheckTask;
                // 没有超时则取消超时任务
                if (!timeout) {
                    // decrease Time
                    t.cancel();
                }
                future.doReceived(response);
            } else {
                logger.warn("The timeout response finally returned at "
                        + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
                        + ", response " + response
                        + (channel == null ? "" : ", channel: " + channel.getLocalAddress()
                        + " -> " + channel.getRemoteAddress()));
            }
        } finally {
            CHANNELS.remove(response.getId());
        }
    }

private void doReceived(Response res) {
        if (res == null) {
            throw new IllegalStateException("response cannot be null");
        }
        if (res.getStatus() == Response.OK) {
            this.complete(res.getResult());
        } else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
            this.completeExceptionally(new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage()));
        } else {
            this.completeExceptionally(new RemotingException(channel, res.getErrorMessage()));
        }
    }

接下来会调用FutureAdapter

public FutureAdapter(CompletableFuture<AppResponse> future) {
    this.appResponseFuture = future;
    future.whenComplete((appResponse, t) -> {
        if (t != null) {
            if (t instanceof CompletionException) {
                t = t.getCause();
            }
            this.completeExceptionally(t);
        } else {
            if (appResponse.hasException()) {
                this.completeExceptionally(appResponse.getException());
            } else {
                #1
                this.complete((V) appResponse.getValue());
            }
        }
    });
}

从#1代码可以看出来,已经取出来了服务提供方返回的结果。
这里的this.complete就是层层调用complete,将结果不断的向上传递。

系列文章大致的介绍了一下dubbo的一些源码。但是依然有许多的内容没有介绍到,如:编码解码,返回结果的各种适配器,dubbo与spring的相关结合等等。文章中如有错误欢迎指出,一起讨论。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,001评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,210评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,874评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,001评论 1 291
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,022评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,005评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,929评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,742评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,193评论 1 309
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,427评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,583评论 1 346
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,305评论 5 342
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,911评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,564评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,731评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,581评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,478评论 2 352

推荐阅读更多精彩内容