SOFABolt 源码分析20 - Serializer 序列化机制设计

image.png

类组成

  • Serializer 序列化器
  • Serializer 定义了序列化接口,提供了一个默认实现 HessianSerializer,我们可以通过模仿 HessianSerializer 实现 Serializer 接口来提供自己的序列化方式
  • SerializerManager 是 Serializer 实现类的管理器,通过一个 Serializer[] 存储各种序列化器,数组索引下标 index 就是 Serializer 的 key,例如 HessianSerializer 的 index = 1
  • CustomSerializer 序列化器
  • CustomSerializer 定义了自定义序列化接口,提供了一个适配器 DefaultCustomSerializer,使得用户可以近实现自己需要的方法,例如仅实现 request 的 serializeHeader 和 deserializeHeader
  • CustomSerializerManager 是 CustomSerializer 实现类的管理器,通过一个 Map<String, CustomSerializer> 和 Map<CommandCode, CustomSerializer> 来存储各种自定义序列化器,前者 key 为请求数据的全类名,后者 key 为 CommandCode
  • 各种命令
  • RemotingCommand 提供了命令接口,定义了四个序列化方法与获取全局序列化方式的方法(继承自 Serializable,从 Codec 编解码的分析中可以得到,如果一个对象想被 Bolt 进行编码,必须实现 Serializable 接口)
  • RpcCommand 指定了默认的全局序列化器(hessian2),并提供了三个 byte[] 数组,用于存储序列化后的相应内容,同时提供了 InvokeContext 对象,使得用户可以使用 InvokeContext 中的内容做一些逻辑;同时提供了 serialize() 、deserialize() 和 deserialize(long mask) 三种模板,前者做全序列化、中间做全反序列化、后者根据传入的 RpcDeserializeLevel 的值,决定做下列三者之一
  • 仅序列化 clazzName
  • 序列化 clazzName + header
  • 全序列化 clazzName + header + content
  • RpcRequestCommand 存储了真实的业务数据(clazzName、header、content),并提供了 customSerializer 对象(该对象通过 CustomSerializerManager 进行获取),并提供了三种类型业务数据的序列化和反序列化实现
  • RpcResponseCommand 与 RpcRequestCommand 类似
  • 调用入口
  • 当发起请求时,例如 invokeSync() 时,RpcRemoting 会先对请求数据进行序列化,之后编码发送
  • 当收到请求时,对请求消息进行解码,然后 RpcRequestProcessor 会对解码后的请求数据进行精细的反序列化;
  • 处理请求完成之后,RpcRequestProcessor 会对响应消息进行序列化,之后编码发送
  • 收到响应消息后,对响应消息进行解码,然后会在 RpcInvokeCallbackListener 或者 RpcResponseResolver 中对解码后的响应消息进行反序列化

三种业务数据序列化

  • clazzName:因为只是 String 与 byte[] 互转,所以与 CustomSerializer 和 Serializer 无关
  • header:仅仅用在 CustomSerializer 存在时
  • content:当 CustomSerializer 存在时,首先使用 CustomSerializer 进行反序列化 content,如果序列化失败,再使用 Serializer 进行反序列化 content

一、Serializer 序列化器扩展

// 1. 实现 Serializer
public class MySerializer implements Serializer {
     @Override
     public byte[] serialize(Object obj) throws CodecException {
         ...
     }

     @Override
     public <T> T deserialize(byte[] data, String classOfT) throws CodecException {
         ...
     }
}
// 2. 注册
public static final byte mySerializer = 2;
SerializerManager.addSerializer(mySerializer, new MySerializer());

编写并注册号序列化器之后,现在可以有两种方式进行选择:

  • 全局设置:
  • 调用级别的设置
// 全局设置
System.setProperty(Configs.SERIALIZER, String.valueOf(MySerializer.mySerializer)); // 或者 -Dbolt.serializer=2

// 调用级别的设置
InvokeContext invokeContext = new InvokeContext();
invokeContext.put(InvokeContext.BOLT_CUSTOM_SERIALIZER, MySerializer.mySerializer);

二、CustomSerializer 序列化器扩展

使用姿势见 SOFABolt 源码分析10 - 精细的线程模型的设计 中的“2.4 设置 UserProcessor 自定义线程池选择器”。简单来讲就是两步:

  1. 继承 DefaultCustomSerializer 实现自定义序列化器 MyCustomHeaderSerializer
  2. 将 MyCustomHeaderSerializer 注册到 CustomSerializerManager 中

三、源码分析

3.1 Serializer 序列化器

========================== Serializer ==========================
public interface Serializer {
    byte[] serialize(final Object obj) throws CodecException;
    <T> T deserialize(final byte[] data, String classOfT) throws CodecException;
}

========================== SerializerManager ==========================
public class SerializerManager {
    // 序列化器集合
    private static Serializer[] serializers = new Serializer[5];
    // 序列化器下标
    public static final byte    Hessian2    = 1;
    //public static final byte    Json        = 2;

    static {
        addSerializer(Hessian2, new HessianSerializer());
    }

    public static Serializer getSerializer(int idx) {
        return serializers[idx];
    }

    public static void addSerializer(int idx, Serializer serializer) {
        if (serializers.length <= idx) {
            // 扩容 - 创建新数组
            Serializer[] newSerializers = new Serializer[idx + 5];
            // 将老数组的内容拷贝到新数组
            System.arraycopy(serializers, 0, newSerializers, 0, serializers.length);
            serializers = newSerializers;
        }
        serializers[idx] = serializer;
    }
}

3.2 CustomSerializer 序列化器

========================== CustomSerializer ==========================
public interface CustomSerializer {
    <T extends RequestCommand> boolean serializeHeader(T request, InvokeContext invokeContext);
    <T extends RequestCommand> boolean deserializeHeader(T request);
    <T extends RequestCommand> boolean serializeContent(T request, InvokeContext invokeContext);
    <T extends RequestCommand> boolean deserializeContent(T request);

    <T extends ResponseCommand> boolean serializeHeader(T response);
    <T extends ResponseCommand> boolean deserializeHeader(T response, InvokeContext invokeContext);
    <T extends ResponseCommand> boolean serializeContent(T response);
    <T extends ResponseCommand> boolean deserializeContent(T response, InvokeContext invokeContext);
}

========================== DefaultCustomSerializer ==========================
public class DefaultCustomSerializer implements CustomSerializer {
    @Override
    public <T extends RequestCommand> boolean serializeHeader(T request, InvokeContext invokeContext) {
        return false;
    }

    @Override
    public <T extends ResponseCommand> boolean serializeHeader(T response) {
        return false;
    }

    @Override
    public <T extends RequestCommand> boolean deserializeHeader(T request) {
        return false;
    }

    @Override
    public <T extends ResponseCommand> boolean deserializeHeader(T response, InvokeContext invokeContext) {
        return false;
    }

    @Override
    public <T extends RequestCommand> boolean serializeContent(T request, InvokeContext invokeContext) {
        return false;
    }

    @Override
    public <T extends ResponseCommand> boolean serializeContent(T response) {
        return false;
    }

    @Override
    public <T extends RequestCommand> boolean deserializeContent(T request) {
        return false;
    }

    @Override
    public <T extends ResponseCommand> boolean deserializeContent(T response, InvokeContext invokeContext) {
        return false;
    }
}

========================== CustomSerializerManager ==========================
public class CustomSerializerManager {
    /** For rpc,key = clazzName */
    private static ConcurrentHashMap<String, CustomSerializer>        classCustomSerializer   = new ConcurrentHashMap<String, CustomSerializer>();
    /** For user defined command,key = CommandCode */
    private static ConcurrentHashMap<CommandCode, CustomSerializer> commandCustomSerializer = new ConcurrentHashMap<CommandCode, CustomSerializer>();

    public static void registerCustomSerializer(String className, CustomSerializer serializer) {
        CustomSerializer prevSerializer = classCustomSerializer.putIfAbsent(className, serializer);
        // 只能注册一次,不可修改
        if (prevSerializer != null) {
            throw new RuntimeException();
        }
    }

    public static CustomSerializer getCustomSerializer(String className) {
        if (!classCustomSerializer.isEmpty()) {
            return classCustomSerializer.get(className);
        }
        return null;
    }

    public static void registerCustomSerializer(CommandCode code, CustomSerializer serializer) {
        CustomSerializer prevSerializer = commandCustomSerializer.putIfAbsent(code, serializer);
        if (prevSerializer != null) {
            throw new RuntimeException();
        }
    }

    public static CustomSerializer getCustomSerializer(CommandCode code) {
        if (!commandCustomSerializer.isEmpty()) {
            return commandCustomSerializer.get(code);
        }
        return null;
    }

    public static void clear() {
        classCustomSerializer.clear();
        commandCustomSerializer.clear();
    }
}
  • DefaultCustomSerializer 是 CustomSerializer 一个适配器,所有的方法都返回 false,使得用户可以仅实现自己需要的方法,例如仅实现 request 的 serializeHeader 和 deserializeHeader
  • Map<String, CustomSerializer> 通常用在 rpc 中;Map<CommandCode, CustomSerializer> 通常用在用户自定义 CommandCode 时。

3.3 各种命令

========================== RemotingCommand ==========================
public interface RemotingCommand extends Serializable {
    ...
    // 可用于 CustomSerializer
    InvokeContext getInvokeContext();

    // 获取序列化器的key(即数组下标)
    byte getSerializer();

    // Serialize all parts of remoting command
    void serialize();

    // Deserialize all parts of remoting command
    void deserialize();

    // Serialize content of remoting command
    void serializeContent(InvokeContext invokeContext);

    // Deserialize content of remoting command
    void deserializeContent(InvokeContext invokeContext);
}

========================== RpcCommand ==========================
public abstract class RpcCommand implements RemotingCommand {
    // 请求 / 响应 / 心跳
    private CommandCode       cmdCode;
    // 序列化器:默认为 hessian2
    private byte              serializer       = ConfigManager.serializer;
    /** The length of clazz */
    private short             clazzLength      = 0;
    private short             headerLength     = 0;
    private int               contentLength    = 0;
    // clazzName: String <-> byte[]
    private byte[]            clazz;
    // header: Object <-> byte[]
    private byte[]            header;
    // content: Object <-> byte[]
    private byte[]            content;
    // 调用上下文:其内的属性可能会用于自定义序列化器,但是 invokeContext 本身不会传到对端
    private InvokeContext     invokeContext;

    // 全部序列化
    @Override
    public void serialize() throws SerializationException {
        this.serializeClazz();
        this.serializeHeader(this.invokeContext);
        this.serializeContent(this.invokeContext);
    }

    // 全部反序列化
    @Override
    public void deserialize() throws DeserializationException {
        this.deserializeClazz();
        this.deserializeHeader(this.invokeContext);
        this.deserializeContent(this.invokeContext);
    }

    /**
     * Deserialize according to mask.
     * <ol>
     *     <li>If mask <= {@link RpcDeserializeLevel#DESERIALIZE_CLAZZ}, only deserialize clazz - only one part.</li>
     *     <li>If mask <= {@link RpcDeserializeLevel#DESERIALIZE_HEADER}, deserialize clazz and header - two parts.</li>
     *     <li>If mask <= {@link RpcDeserializeLevel#DESERIALIZE_ALL}, deserialize clazz, header and content - all three parts.</li>
     * </ol>
     */
    public void deserialize(long mask) throws DeserializationException {
        if (mask <= RpcDeserializeLevel.DESERIALIZE_CLAZZ) {
            // 仅反序列化 clazzName
            this.deserializeClazz();
        } else if (mask <= RpcDeserializeLevel.DESERIALIZE_HEADER) {
            // 仅反序列化 clazzName + header
            this.deserializeClazz();
            this.deserializeHeader(this.getInvokeContext());
        } else if (mask <= RpcDeserializeLevel.DESERIALIZE_ALL) {
            // 反序列化 clazzName + header + content
            this.deserialize();
        }
    }

    // Serialize content class.
    public void serializeClazz() {
    }

    // Deserialize the content class.
    public void deserializeClazz() {
    }

    // Serialize the header.
    public void serializeHeader(InvokeContext invokeContext) {
    }

    // Deserialize the header.
    public void deserializeHeader(InvokeContext invokeContext) {
    }

    // Serialize the content.
    @Override
    public void serializeContent(InvokeContext invokeContext) {
    }

    // Deserialize the content.
    @Override
    public void deserializeContent(InvokeContext invokeContext) {
    }

    @Override
    public byte getSerializer() {
        return serializer;
    }

    public void setSerializer(byte serializer) {
        this.serializer = serializer;
    }
}

========================== RpcRequestCommand ==========================
public class RpcRequestCommand extends RequestCommand {
    // 请求对象类型
    private String            requestClass;
    // 请求头
    private Object            requestHeader;
    // 真正的请求对象
    private Object            requestObject;
    // 自定义序列化器
    private CustomSerializer  customSerializer;

    @Override
    public void serializeClazz() {
        if (this.requestClass != null) {
            // 直接 String -> byte[]
            byte[] clz = this.requestClass.getBytes(Configs.DEFAULT_CHARSET);
            this.setClazz(clz);
        }
    }

    @Override
    public void deserializeClazz() {
        // this.getRequestClass() != null 表示已经反序列化过了 - 避免重复反序列化
        if (this.getClazz() != null && this.getRequestClass() == null) {
            // 直接 byte[] -> String
            this.setRequestClass(new String(this.getClazz(), Configs.DEFAULT_CHARSET));
        }
    }

    // 如果 customSerializer 存在,才会做 serializeHeader
    @Override
    public void serializeHeader(InvokeContext invokeContext) {
        // 如果 customSerializer 存在,才会做 serializeHeader
        if (this.getCustomSerializer() != null) {
            // 执行 customSerializer 自行实现的逻辑(此处就可以使用 invokeContext 做一些逻辑了)
            this.getCustomSerializer().serializeHeader(this, invokeContext);
        }
    }

    @Override
    public void deserializeHeader(InvokeContext invokeContext) {
        // this.getRequestHeader() != null 表示已经反序列化过了 - 避免重复反序列化
        if (this.getHeader() != null && this.getRequestHeader() == null) {
            if (this.getCustomSerializer() != null) {
                // 执行 customSerializer 自行实现的逻辑
                this.getCustomSerializer().deserializeHeader(this);
            }
        }
    }

    @Override
    public void serializeContent(InvokeContext invokeContext) {
        if (this.requestObject != null) {
            // 如果 customSerializer 存在,使用 customSerializer 自行实现的逻辑做 content 的序列化,如果失败,使用 Serializer 的序列化器
            if (this.getCustomSerializer() != null && this.getCustomSerializer().serializeContent(this, invokeContext)) {
                return;
            }
            // 如果 customSerializer 不存在或序列化失败,使用 RpcCommand.serializer 属性获取指定的序列化器,然后通过该序列化器做序列化操作
            // RpcCommand.serializer 默认为1,即 hessian2 序列化器,可以通过 -Dbolt.serializer=1 来指定(前提是需要将指定的序列化器注册到 SerializerManager 中)- 这种是全局的;
            // 可以通过 invokeContext.put(InvokeContext.BOLT_CUSTOM_SERIALIZER, 序列化器序号来指定) - 这种可以为每一次的调用指定动态序列化器
            this.setContent(SerializerManager.getSerializer(this.getSerializer()).serialize(this.requestObject));
        }
    }

    @Override
    public void deserializeContent(InvokeContext invokeContext) throws DeserializationException {
        if (this.getRequestObject() == null) {
            // 如果 customSerializer 存在,使用 customSerializer 自行实现的逻辑做反序列化
            if (this.getCustomSerializer() != null && this.getCustomSerializer().deserializeContent(this)) {
                return;
            }
            // 如果 customSerializer 不存在或反序列化失败,使用 RpcCommand.serializer 属性获取指定的序列化器,然后通过该序列化器做反序列化操作
            if (this.getContent() != null) {
                this.setRequestObject(SerializerManager.getSerializer(this.getSerializer()).deserialize(this.getContent(), this.requestClass));
            }
        }
    }

    // 获取自定义序列化器
    public CustomSerializer getCustomSerializer() {
        // 如果有了,直接返回
        if (this.customSerializer != null) {
            return customSerializer;
        }
        // 先根据请求数据的全类名获取 CustomSerializer;如果获取不到,再根据 CommandCode 获取
        if (this.requestClass != null) {
            this.customSerializer = CustomSerializerManager.getCustomSerializer(this.requestClass);
        }
        if (this.customSerializer == null) {
            this.customSerializer = CustomSerializerManager.getCustomSerializer(this.getCmdCode());
        }
        return this.customSerializer;
    }
}

========================== RpcResponseCommand ==========================
public class RpcResponseCommand extends ResponseCommand {
    // 响应对象类型
    private String            responseClass;
    // 响应头
    private Object            responseHeader;
    // 真正的响应对象
    private Object            responseObject;
    // 自定义序列化器
    private CustomSerializer  customSerializer;

    @Override
    public void serializeClazz() {
        if (this.getResponseClass() != null) {
            // 直接 String -> byte[]
            byte[] clz = this.getResponseClass().getBytes(Configs.DEFAULT_CHARSET);
            this.setClazz(clz);
        }
    }

    @Override
    public void deserializeClazz() {
        // this.getResponseClass() != null 表示已经反序列化过了 - 避免重复反序列化
        if (this.getClazz() != null && this.getResponseClass() == null) {
            this.setResponseClass(new String(this.getClazz(), Configs.DEFAULT_CHARSET));
        }
    }

    // 如果 customSerializer 存在,才会做 serializeHeader
    @Override
    public void serializeHeader(InvokeContext invokeContext) {
        if (this.getCustomSerializer() != null) {
            // 如果 customSerializer 存在,才会做 serializeHeader
            this.getCustomSerializer().serializeHeader(this);
        }
    }

    @Override
    public void deserializeHeader(InvokeContext invokeContext) {
        // this.getResponseHeader() != null 表示已经反序列化过了 - 避免重复反序列化
        if (this.getHeader() != null && this.getResponseHeader() == null) {
            if (this.getCustomSerializer() != null) {
                this.getCustomSerializer().deserializeHeader(this, invokeContext);
            }
        }
    }

    @Override
    public void serializeContent(InvokeContext invokeContext) throws SerializationException {
        if (this.getResponseObject() != null) {
            if (this.getCustomSerializer() != null && this.getCustomSerializer().serializeContent(this)) {
                return;
            }

            this.setContent(SerializerManager.getSerializer(this.getSerializer()).serialize(this.responseObject));
        }
    }

    @Override
    public void deserializeContent(InvokeContext invokeContext) throws DeserializationException {
        if (this.getResponseObject() == null) {
            if (this.getCustomSerializer() != null && this.getCustomSerializer().deserializeContent(this, invokeContext)) {
                return;
            }
            if (this.getContent() != null) {
                this.setResponseObject(SerializerManager.getSerializer(this.getSerializer()).deserialize(this.getContent(), this.responseClass));
            }
        }
    }

    public CustomSerializer getCustomSerializer() {
        if (this.customSerializer != null) {
            return customSerializer;
        }
        if (this.responseClass != null) {
            this.customSerializer = CustomSerializerManager.getCustomSerializer(this.responseClass);
        }
        if (this.customSerializer == null) {
            this.customSerializer = CustomSerializerManager.getCustomSerializer(this.getCmdCode());
        }
        return this.customSerializer;
    }
}

3.4 调用入口

========================== RpcRemoting ==========================
public abstract class RpcRemoting extends BaseRemoting {
    public Object invokeSync(Connection conn, Object request, InvokeContext invokeContext, int timeoutMillis) {
        // 创建请求命令(序列化)
        RemotingCommand requestCommand = toRemotingCommand(request, conn, invokeContext, timeoutMillis);
        ...
        ResponseCommand responseCommand = (ResponseCommand) super.invokeSync(conn, requestCommand,
            timeoutMillis);
        responseCommand.setInvokeContext(invokeContext);
        // 解析响应(反序列化)
        Object responseObject = RpcResponseResolver.resolveResponseObject(responseCommand, RemotingUtil.parseRemoteAddress(conn.getChannel()));
        return responseObject;
    }

    protected RemotingCommand toRemotingCommand(Object request, Connection conn, InvokeContext invokeContext, int timeoutMillis) {
        // 创建 RpcRequestCommand
        RpcRequestCommand command = this.getCommandFactory().createRequestCommand(request);

        if (null != invokeContext) {
            // 设置调用级别的 Serializer
            Object clientCustomSerializer = invokeContext.get(InvokeContext.BOLT_CUSTOM_SERIALIZER);
            if (null != clientCustomSerializer) {
                command.setSerializer((Byte) clientCustomSerializer);
            }
        }
        command.setRequestClass(request.getClass().getName());
        command.setInvokeContext(invokeContext);
        // 请求的序列化
        command.serialize();
        return command;
    }
}

========================== RpcRequestProcessor ==========================
public class RpcRequestProcessor extends AbstractRemotingProcessor<RpcRequestCommand> {
    @Override
    public void process(RemotingContext ctx, RpcRequestCommand cmd, ExecutorService defaultExecutor) {
        // 首先反序列化 clazzName,因为需要 clazzName 来获取 UserProcessor,如果处理 clazzName 的 UserProcessor 不存在,则直接返回错误
        if (!deserializeRequestCommand(ctx, cmd, RpcDeserializeLevel.DESERIALIZE_CLAZZ)) {
            return;
        }
        // 根据clazz获取UserProcessor
        UserProcessor userProcessor = ctx.getUserProcessor(cmd.getRequestClass());
        ...

        // 如果指定在IO线程处理请求,则直接反序列化全部,创建ProcessTask,直接执行
        if (userProcessor.processInIOThread()) {
            if (!deserializeRequestCommand(ctx, cmd, RpcDeserializeLevel.DESERIALIZE_ALL)) {
                return;
            }
            // process in io thread
            new ProcessTask(ctx, cmd).run();
            return;// end
        }

        // 如果指定不是在IO线程处理请求,则先获取线程池,创建ProcessTask,在新的线程池执行
        Executor executor;
        // 看是否配置了 UserProcessor.executorSelector,即线程池选择器,
        // 如果配置了:则需要反序列化出 header,因为 executorSelector 需要根据 header 去选择 executor;content 在异步线程池进行反序列化
        // 如果没有配置:则 header 和 content 都在选出的异步线程池进行反序列化
        if (null == userProcessor.getExecutorSelector()) {
            executor = userProcessor.getExecutor();
        } else {
            if (!deserializeRequestCommand(ctx, cmd, RpcDeserializeLevel.DESERIALIZE_HEADER)) {
                return;
            }
            executor = userProcessor.getExecutorSelector().select(cmd.getRequestClass(),
                cmd.getRequestHeader());
        }
        ...
        executor.execute(new ProcessTask(ctx, cmd));
    }

    @Override
    public void doProcess(RemotingContext ctx, RpcRequestCommand cmd) {
        ...
        // 反序列化全部
        if (!deserializeRequestCommand(ctx, cmd, RpcDeserializeLevel.DESERIALIZE_ALL)) {
            return;
        }
        dispatchToUserProcessor(ctx, cmd);
    }

    public void sendResponseIfNecessary(RemotingContext ctx, byte type, RemotingCommand response) {
        final int id = response.getId();
        if (type != RpcCommandType.REQUEST_ONEWAY) {
            RemotingCommand serializedResponse = response;
            // 响应序列化
            response.serialize();
            // Netty 发送响应
            ctx.writeAndFlush(serializedResponse);
        }
    }
    
    private void dispatchToUserProcessor(RemotingContext ctx, RpcRequestCommand cmd) {
        ... 消息处理
        // 发送响应(创建 RpcResponseCommand,指定序列化器为请求的序列化器)
        sendResponseIfNecessary(ctx, type, this.getCommandFactory().createResponse(responseObject, cmd));
    }
    
    private boolean deserializeRequestCommand(RemotingContext ctx, RpcRequestCommand cmd, int level) {
        // 根据传入的序列化级别来序列化不同的内容
        cmd.deserialize(level);
    }
    
    class ProcessTask implements Runnable {
        @Override
        public void run() {
            RpcRequestProcessor.this.doProcess(ctx, msg);
        }
    }
}

public class RpcCommandFactory implements CommandFactory {
    @Override
    public RpcRequestCommand createRequestCommand(Object requestObject) {
        return new RpcRequestCommand(requestObject);
    }

    @Override
    public RpcResponseCommand createResponse(Object responseObject, RemotingCommand requestCmd) {
        RpcResponseCommand response = new RpcResponseCommand(requestCmd.getId(), responseObject);
        if (null != responseObject) {
            response.setResponseClass(responseObject.getClass().getName());
        } else {
            response.setResponseClass(null);
        }
        // 响应与请求用的是同一种序列化器
        response.setSerializer(requestCmd.getSerializer());
        ...
        return response;
    }
}
========================== RpcInvokeCallbackListener ==========================
public class RpcInvokeCallbackListener implements InvokeCallbackListener {
    @Override
    public void onResponse(InvokeFuture future) {
        InvokeCallback callback = future.getInvokeCallback();
        if (callback != null) {
            CallbackTask task = new CallbackTask(this.getRemoteAddress(), future);
            if (callback.getExecutor() != null) {
                callback.getExecutor().execute(task);
            } else {
                task.run();
            }
        }
    }

    class CallbackTask implements Runnable {
        @Override
        public void run() {
            InvokeCallback callback = future.getInvokeCallback();
            // 设置响应
            ResponseCommand response = (ResponseCommand) future.waitResponse(0);
            response.setInvokeContext(future.getInvokeContext());
            RpcResponseCommand rpcResponse = (RpcResponseCommand) response;
            // 反序列化
            response.deserialize();
            // 回调
            callback.onResponse(rpcResponse.getResponseObject());
        } // end of run
    }
}

========================== RpcResponseResolver ==========================
public class RpcResponseResolver {
    public static Object resolveResponseObject(ResponseCommand responseCommand, String addr) {
        return toResponseObject(responseCommand);
    }

    private static Object toResponseObject(ResponseCommand responseCommand) {
        RpcResponseCommand response = (RpcResponseCommand) responseCommand;
        // 响应的反序列化
        response.deserialize();
        return response.getResponseObject();
    }
}


©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,701评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,649评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 166,037评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,994评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,018评论 6 395
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,796评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,481评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,370评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,868评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,014评论 3 338
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,153评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,832评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,494评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,039评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,156评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,437评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,131评论 2 356

推荐阅读更多精彩内容