SOFABolt 源码分析16 - 上下文机制的设计

SOFABolt 中存在四种上下文 context

  • InvokeContext:调用上下文,用于端内隐式传参,并可以通过自定义序列化器将 InvokeContext 内存储的参数自定义的序列化传递给对端(注意:InvokeContext 本身是不会传递给对端的)
  • RemotingContext:Remoting 层的上下文,程序内部使用
  • BizContext:业务上下文,提供给用户程序使用,封装了 RemotingContext,防止直接将 RemotingContext 暴露给用户
  • AsyncContext:存储存根信息,用于 AsyncUserProcessor 异步返回响应

一、InvokeContext

1.1 使用姿势

SOFABolt 源码分析10 - 精细的线程模型的设计 的 “2.4 设置 UserProcessor 自定义线程池选择器”

1.2 源码分析

public class InvokeContext {
    // ~~~ invoke context keys of client side
    public final static String                CLIENT_LOCAL_IP        = "bolt.client.local.ip";
    public final static String                CLIENT_LOCAL_PORT      = "bolt.client.local.port";
    public final static String                CLIENT_REMOTE_IP       = "bolt.client.remote.ip";
    public final static String                CLIENT_REMOTE_PORT     = "bolt.client.remote.port";
    /** time consumed during connection creating, this is a timespan */
    public final static String                CLIENT_CONN_CREATETIME = "bolt.client.conn.createtime";

    // ~~~ invoke context keys of server side
    public final static String                SERVER_LOCAL_IP        = "bolt.server.local.ip";
    public final static String                SERVER_LOCAL_PORT      = "bolt.server.local.port";
    public final static String                SERVER_REMOTE_IP       = "bolt.server.remote.ip";
    public final static String                SERVER_REMOTE_PORT     = "bolt.server.remote.port";

    // ~~~ invoke context keys of client and server side
    public final static String                BOLT_INVOKE_REQUEST_ID = "bolt.invoke.request.id";
    /** 时间段:请求达到解码器 ~ 请求即将被处理(与处理完成) */
    public final static String                BOLT_PROCESS_WAIT_TIME = "bolt.invoke.wait.time";
    public final static String                BOLT_CUSTOM_SERIALIZER = "bolt.invoke.custom.serializer";
    public final static String                BOLT_CRC_SWITCH        = "bolt.invoke.crc.switch";

    public final static int                   INITIAL_SIZE           = 8;

    /** context */
    private ConcurrentHashMap<String, Object> context;

    public InvokeContext() {
        this.context = new ConcurrentHashMap<String, Object>(INITIAL_SIZE);
    }
}

注意:

  • InvokeContext 内部实际上就是一个 map,而不是像 RpcInvokeContext 一样,内部是一个 ThreadLocal(RpcInvokeContext 是 SOFARPC 的上下文)
  • 由于 InvokeContext 内部只是一个 map,所以 InvokeContext 本身不能进行“隐式传递”,InvokeContext 本身需要作为接口的参数进行传递才行,所以四种调用模式的三种调用链路方式都提供了带有 InvokeContext 参数的方法,eg. invokeSync(Connection conn, Object request, InvokeContext invokeContext, int timeoutMillis)
  • InvokeContext 不会传递给对端,但是其中的内容可以通过自定义序列化器的方式传递给对端,使用姿势见 SOFABolt 源码分析10 - 精细的线程模型的设计

设置建连消耗时间(仅客户端,服务端不建连)

    public Object invokeSync(Url url, Object request, InvokeContext invokeContext, int timeoutMillis) {
        final Connection conn = getConnectionAndInitInvokeContext(url, invokeContext);
        this.connectionManager.check(conn);
        return this.invokeSync(conn, request, invokeContext, timeoutMillis);
    }

    protected Connection getConnectionAndInitInvokeContext(Url url, InvokeContext invokeContext) {
        // 记录开始时间
        long start = System.currentTimeMillis();
        Connection conn;
        try {
            // 建连
            conn = this.connectionManager.getAndCreateIfAbsent(url);
        } finally {
            if (null != invokeContext) {
                // 记录建连时间(客户端,服务端不建连)
                invokeContext.putIfAbsent(InvokeContext.CLIENT_CONN_CREATETIME, (System.currentTimeMillis() - start));
            }
        }
        return conn;
    }

设置客户端自定义序列化器 + crc 开关 + 四要素(ip/port)+ requestID

    public Object invokeSync(Connection conn, Object request, InvokeContext invokeContext, int timeoutMillis) {
        // 创建请求:会根据 invokeContext 配置 - 设置客户端自定义序列化器 + crc 开关
        RemotingCommand requestCommand = toRemotingCommand(request, conn, invokeContext, timeoutMillis);
        // 预处理 InvokeContext:设置四要素(IP/PORT) + 请求ID
        preProcessInvokeContext(invokeContext, requestCommand, conn);
        // 发起请求
        ResponseCommand responseCommand = (ResponseCommand) super.invokeSync(conn, requestCommand, timeoutMillis);
        // 将 invokeContext 设置到 responseCommand,可以让用户使用
        responseCommand.setInvokeContext(invokeContext);

        Object responseObject = RpcResponseResolver.resolveResponseObject(responseCommand,
            RemotingUtil.parseRemoteAddress(conn.getChannel()));
        return responseObject;
    }

    protected RemotingCommand toRemotingCommand(Object request, Connection conn, InvokeContext invokeContext, int timeoutMillis) {
        RpcRequestCommand command = this.getCommandFactory().createRequestCommand(request);

        if (null != invokeContext) {
            // 设置客户端自定义序列化器
            Object clientCustomSerializer = invokeContext.get(InvokeContext.BOLT_CUSTOM_SERIALIZER);
            if (null != clientCustomSerializer) {
                    command.setSerializer((Byte) clientCustomSerializer);
            }

            // 是否开启 crc 开关
            // enable crc by default, user can disable by set invoke context `false` for key `InvokeContext.BOLT_CRC_SWITCH`
            Boolean crcSwitch = invokeContext.get(InvokeContext.BOLT_CRC_SWITCH, ProtocolSwitch.CRC_SWITCH_DEFAULT_VALUE);
            if (null != crcSwitch && crcSwitch) {
                command.setProtocolSwitch(ProtocolSwitch.create(new int[] { ProtocolSwitch.CRC_SWITCH_INDEX }));
            }
        } else {
            // enable crc by default, if there is no invoke context.
            command.setProtocolSwitch(ProtocolSwitch.create(new int[] { ProtocolSwitch.CRC_SWITCH_INDEX }));
        }
        command.setTimeout(timeoutMillis);
        command.setRequestClass(request.getClass().getName());
        // 设置 invokeContext 到 RpcRequestCommand 中,后续在自定义序列化器的序列化 header 和 body 的过程中,
        // 可以自定义的从 invokeContext 中序列化信息到对端
        command.setInvokeContext(invokeContext);
        command.serialize();
        logDebugInfo(command);
        return command;
    }

============================= RpcClientRemoting =============================
    protected void preProcessInvokeContext(InvokeContext invokeContext, RemotingCommand cmd, Connection connection) {
        if (null != invokeContext) {
            // 设置四要素
            invokeContext.putIfAbsent(InvokeContext.CLIENT_LOCAL_IP, RemotingUtil.parseLocalIP(connection.getChannel()));
            invokeContext.putIfAbsent(InvokeContext.CLIENT_LOCAL_PORT, RemotingUtil.parseLocalPort(connection.getChannel()));
            invokeContext.putIfAbsent(InvokeContext.CLIENT_REMOTE_IP, RemotingUtil.parseRemoteIP(connection.getChannel()));
            invokeContext.putIfAbsent(InvokeContext.CLIENT_REMOTE_PORT, RemotingUtil.parseRemotePort(connection.getChannel()));
            // 设置 requestID
            invokeContext.putIfAbsent(InvokeContext.BOLT_INVOKE_REQUEST_ID, cmd.getId());
        }
    }

============================= RpcServerRemoting =============================
    protected void preProcessInvokeContext(InvokeContext invokeContext, RemotingCommand cmd, Connection connection) {
        if (null != invokeContext) {
            // 设置四要素
            invokeContext.putIfAbsent(InvokeContext.SERVER_REMOTE_IP, RemotingUtil.parseRemoteIP(connection.getChannel()));
            invokeContext.putIfAbsent(InvokeContext.SERVER_REMOTE_PORT, RemotingUtil.parseRemotePort(connection.getChannel()));
            invokeContext.putIfAbsent(InvokeContext.SERVER_LOCAL_IP, RemotingUtil.parseLocalIP(connection.getChannel()));
            invokeContext.putIfAbsent(InvokeContext.SERVER_LOCAL_PORT, RemotingUtil.parseLocalPort(connection.getChannel()));
            // 设置 requestID
            invokeContext.putIfAbsent(InvokeContext.BOLT_INVOKE_REQUEST_ID, cmd.getId());
        }
    }

关于序列化的东西,在《序列化设计》部分分析。

统计“消息到达解码器 ~ 消息即将被业务逻辑处理器处理” 之间的时间

============================= RpcRequestProcessor =============================
    public void doProcess(final RemotingContext ctx, RpcRequestCommand cmd) throws Exception {
        long currentTimestamp = System.currentTimeMillis();
        // 预处理 RemotingContext
        preProcessRemotingContext(ctx, cmd, currentTimestamp);
        ...
        dispatchToUserProcessor(ctx, cmd);
    }

    private void preProcessRemotingContext(RemotingContext ctx, RpcRequestCommand cmd,
                                           long currentTimestamp) {
        ...
        ctx.getInvokeContext().putIfAbsent(InvokeContext.BOLT_PROCESS_WAIT_TIME, currentTimestamp - cmd.getArriveTime());
    }

BizContext 获取 InvokeContext

public class DefaultBizContext implements BizContext {
    private RemotingContext remotingCtx;
    public InvokeContext getInvokeContext() {
        return this.remotingCtx.getInvokeContext();
    }
}

二、RemotingContext

/**
 * Wrap the ChannelHandlerContext.
 */
public class RemotingContext {
    // netty ChannelHandlerContext
    private ChannelHandlerContext                       channelContext;
    // 是否是服务端
    private boolean                                     serverSide     = false;
    /** whether need handle request timeout, if true, request will be discarded. The default value is true */
    private boolean                                     timeoutDiscard = true;
    /** request arrive time stamp */
    private long                                        arriveTimestamp;
    /** request timeout setting by invoke side */
    private int                                         timeout;
    /** rpc command type:REQUEST / RESPONSE / REQUEST_ONEWAY */
    private int                                         rpcCommandType;
    // 用户业务逻辑处理器
    private ConcurrentHashMap<String, UserProcessor<?>> userProcessors;
    // 调用上下文,主要会统计“消息到达解码器 ~ 消息即将被业务逻辑处理器处理” 之间的时间
    private InvokeContext                               invokeContext;

    public ChannelFuture writeAndFlush(RemotingCommand msg) {
        return this.channelContext.writeAndFlush(msg);
    }

    // whether this request already timeout: oneway 没有请求超时的概念
    public boolean isRequestTimeout() {
        if (this.timeout > 0 && (this.rpcCommandType != RpcCommandType.REQUEST_ONEWAY)
            && (System.currentTimeMillis() - this.arriveTimestamp) > this.timeout) {
            return true;
        }
        return false;
    }

    public UserProcessor<?> getUserProcessor(String className) {
        return StringUtils.isBlank(className) ? null : this.userProcessors.get(className);
    }

    public Connection getConnection() {
        return ConnectionUtil.getConnectionFromChannel(channelContext.channel());
    }
}

================== ConnectionUtil ==================
public class ConnectionUtil {
    public static Connection getConnectionFromChannel(Channel channel) {
         // 从 channel 的附属属性中获取 Connection
        Attribute<Connection> connAttr = channel.attr(Connection.CONNECTION);
        Connection connection = connAttr.get();
        return connection;
    }
}

RemotingContext 作用:

  • 包含 userProcessors 映射:用于在处理消息流程中选择业务逻辑处理
  • 包装 ChannelHandlerContext:用于在处理消息结束后或者异常后向对端发送消息
  • 包装 InvokeContext:用于存放添加服务端链路调用上下文

注意:BizContext 会包含 RemotingContext,但是不会提供 public 的 getRemotingContext 方法,但是会提供 getInvokeContext 方法。

使用链路

================== RpcHandler ==================
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ...
        protocol.getCommandHandler().handleCommand(new RemotingContext(ctx, new InvokeContext(), serverSide, userProcessors), msg);
    }

================== RpcRequestProcessor ==================
    public void process(RemotingContext ctx, RpcRequestCommand cmd, ExecutorService defaultExecutor) {
        // 从 RemotingContext 获取 UserProcessor
        UserProcessor userProcessor = ctx.getUserProcessor(cmd.getRequestClass());
        // set timeout check state from user's processor
        ctx.setTimeoutDiscard(userProcessor.timeoutDiscard());

        // use the final executor dispatch process task
        executor.execute(new ProcessTask(ctx, cmd));
    }

    private void dispatchToUserProcessor(RemotingContext ctx, RpcRequestCommand cmd) {
        // 从 RemotingContext 获取 UserProcessor
        UserProcessor processor = ctx.getUserProcessor(cmd.getRequestClass());
        if (processor instanceof AsyncUserProcessor) {
            // processor.preHandleRequest:使用 BizContext 包装 RemotingContext,避免 RemotingContext 直接暴露给用户(因为 RemotingContext 包含 ChannelHandlerContext,可直接发送消息给对端)
            // 创建 RpcAsyncContext 存根:包装 RemotingContext,内部使用其做一步发送操作
            processor.handleRequest(processor.preHandleRequest(ctx, cmd.getRequestObject()), new RpcAsyncContext(ctx, cmd, this), cmd.getRequestObject());
        } else {
            // processor.preHandleRequest:使用 BizContext 包装 RemotingContext
            Object responseObject = processor.handleRequest(processor.preHandleRequest(ctx, cmd.getRequestObject()), cmd.getRequestObject());
            // 使用 ctx.writeAndFlush(serializedResponse) 发送响应
            sendResponseIfNecessary(ctx, type, this.getCommandFactory().createResponse(responseObject, cmd));
        }
    }

三、BizContext

使用姿势见 SOFABolt 源码分析15 - 双工通信机制的设计 中的“1.1、基于 addr 链路模式”

public class DefaultBizContext implements BizContext {
    // 包裹 RemotingContext
    private RemotingContext remotingCtx;

    // protect 方式,只有其子类可以访问
    protected RemotingContext getRemotingCtx() {
        return this.remotingCtx;
    }

    @Override
    public String getRemoteAddress() {
        if (null != this.remotingCtx) {
            ChannelHandlerContext channelCtx = this.remotingCtx.getChannelContext();
            Channel channel = channelCtx.channel();
            if (null != channel) {
                return RemotingUtil.parseRemoteAddress(channel);
            }
        }
        return "UNKNOWN_ADDRESS";
    }
    
    ...
  
    // 这里也存储了 Connection,可以用于服务端向客户端直接发起调用
    @Override
    public Connection getConnection() {
        if (null != this.remotingCtx) {
            return this.remotingCtx.getConnection();
        }
        return null;
    }

    @Override
    public boolean isRequestTimeout() {
        return this.remotingCtx.isRequestTimeout();
    }
    
    ...
 
    @Override
    public InvokeContext getInvokeContext() {
        return this.remotingCtx.getInvokeContext();
    }
}

BizContext 是直接给用户程序使用的,而 RemotingContext 是程序内部使用的

    public BizContext preHandleRequest(RemotingContext remotingCtx, T request) {
        return new DefaultBizContext(remotingCtx);
    }

四、AsyncContext

public class RpcAsyncContext implements AsyncContext {
    /** remoting context */
    private RemotingContext     ctx;
    // rpc request command: 
    // 1. 会根据请求中的 type 是否是 oneway 来决定是否向对端发送数据
    // 2. 会将 RpcRequestCommand 中的 requestID 设置给响应
    private RpcRequestCommand   cmd;

    private RpcRequestProcessor processor;

    /** is response sent already */
    private AtomicBoolean       isResponseSentAlready = new AtomicBoolean();

    // 创造响应,发送消息(发送还是使用 RemotingContext)
    @Override
    public void sendResponse(Object responseObject) {
        if (isResponseSentAlready.compareAndSet(false, true)) {
            processor.sendResponseIfNecessary(this.ctx, cmd.getType(), processor.getCommandFactory().createResponse(responseObject, this.cmd));
        } else {
            throw new IllegalStateException("Should not send rpc response repeatedly!");
        }
    }
}
private void dispatchToUserProcessor(RemotingContext ctx, RpcRequestCommand cmd) {
    UserProcessor processor = ctx.getUserProcessor(cmd.getRequestClass());
    if (processor instanceof AsyncUserProcessor) {
        // yi'bu'chu'li
        processor.handleRequest(processor.preHandleRequest(ctx, cmd.getRequestObject()), new RpcAsyncContext(ctx, cmd, this), cmd.getRequestObject());
    }
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,701评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,649评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 166,037评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,994评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,018评论 6 395
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,796评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,481评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,370评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,868评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,014评论 3 338
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,153评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,832评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,494评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,039评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,156评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,437评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,131评论 2 356

推荐阅读更多精彩内容