SOFARPC 源码分析6 - 通信层的设计与实现

服务发布流程

  1. 检查配置参数
  2. 构造器调用链
  3. 提前初始化注册中心
  4. Server 的创建、初始化、注册和启动
  5. 注册配置变化监听器
  6. 注册服务到注册中心
for (ServerConfig serverConfig : serverConfigs) {
    Server server = serverConfig.buildIfAbsent();
    // 注册请求调用器
    server.registerProcessor(providerConfig, providerProxyInvoker);
    if (serverConfig.isAutoStart()) {
        server.start();
    }
}

注意:可以将同一服务发布成多种协议(例如 bolt、rest 等),让调用端可以使用不同的协议调用服务提供方。所以会有一个 ServerConfig 列表。关于发布多协议,http://www.sofastack.tech/sofa-rpc/docs/Publish-And-Reference

image.png

Server 是 SPI 可扩展接口。ServerFactory 是 Server 工厂。

bolt = BoltServer,默认,其包含的 BoltServerProcessor 就是最终的业务逻辑处理器
rest = RestServer
http = Http1Server
h2c = Http2ClearTextServer

设计思想:对于 SPI 实现类的创建使用工厂模式很合适。

一、Server 的创建

========================== ServerConfig ==========================
    private transient volatile Server server;
    // 启动服务
    public synchronized Server buildIfAbsent() {
        if (server != null) {
            return server;
        }
        server = ServerFactory.getServer(this);
        return server;
    }

    // 关闭服务
    public synchronized void destroy() {
        if (server != null) {
            server.destroy();
        }
    }

========================== ServerFactory ==========================
public final class ServerFactory {
    // 全部服务端 {port:Server}
    private final static ConcurrentMap<String, Server> SERVER_MAP = new ConcurrentHashMap<String, Server>();

    // 初始化Server实例
    public synchronized static Server getServer(ServerConfig serverConfig) {
        // 获取 Server
        Server server = SERVER_MAP.get(Integer.toString(serverConfig.getPort()));
        if (server == null) {
            // 算下网卡和端口
            resolveServerConfig(serverConfig);
            // SPI 获取 Server,默认是 bolt
            ExtensionClass<Server> ext = ExtensionLoaderFactory.getExtensionLoader(Server.class).getExtensionClass(serverConfig.getProtocol());
            server = ext.getExtInstance();
            server.init(serverConfig);
            SERVER_MAP.put(serverConfig.getPort() + "", server);
        }
        return server;
    }

    // 关闭全部服务端
    public static void destroyAll() {
        for (Map.Entry<String, Server> entry : SERVER_MAP.entrySet()) {
            Server server = entry.getValue();
            server.destroy();
        }
        SERVER_MAP.clear();
    }
}

二、BoltServer

========================== BoltServer ==========================
@Extension("bolt")
public class BoltServer implements Server {
    // Bolt服务端
    protected RemotingServer remotingServer;
    // 服务端配置
    protected ServerConfig serverConfig;
    // BoltServerProcessor
    protected BoltServerProcessor boltServerProcessor;
    // 业务线程池
    protected ThreadPoolExecutor bizThreadPool;
    // Invoker列表:{key:Invoker},key = interface:version[:uniqueId]
    protected Map<String, Invoker> invokerMap = new ConcurrentHashMap<String, Invoker>();

    @Override
    public void init(ServerConfig serverConfig) {
        this.serverConfig = serverConfig;
        // 创建业务线程池
        bizThreadPool = initThreadPool(serverConfig);
        // 创建最终业务逻辑处理器
        boltServerProcessor = new BoltServerProcessor(this);
    }

    protected ThreadPoolExecutor initThreadPool(ServerConfig serverConfig) {
        ThreadPoolExecutor threadPool = BusinessPool.initPool(serverConfig);
        threadPool.setThreadFactory(new NamedThreadFactory("SEV-BOLT-BIZ-" + serverConfig.getPort(), serverConfig.isDaemon()));
        threadPool.setRejectedExecutionHandler(new SofaRejectedExecutionHandler());
        if (serverConfig.isPreStartCore()) { // 创建核心线程,否则核心线程会延迟创建
            threadPool.prestartAllCoreThreads();
        }
        return threadPool;
    }

    @Override
    public void start() {
        // 创建 RemotingServer 并启动
        remotingServer = initRemotingServer();
        remotingServer.start(serverConfig.getBoundHost());
        // 如果启动了 ServerStartedEvent 事件监听功能,则发布 ServerStartedEvent 事件
        if (EventBus.isEnable(ServerStartedEvent.class)) {
            EventBus.post(new ServerStartedEvent(serverConfig, bizThreadPool));
        }
    }

    protected RemotingServer initRemotingServer() {
        // 绑定到端口
        RemotingServer remotingServer = new RpcServer(serverConfig.getPort());
        // 注册业务逻辑处理器 UserProcessor
        remotingServer.registerUserProcessor(boltServerProcessor);
        return remotingServer;
    }

    @Override
    public void stop() {
        // 关闭端口,不关闭线程池
        remotingServer.stop();
        if (EventBus.isEnable(ServerStoppedEvent.class)) {
            EventBus.post(new ServerStoppedEvent(serverConfig));
        }
        remotingServer = null;
    }

    @Override
    public void registerProcessor(ProviderConfig providerConfig, Invoker instance) {
        // key:interface:version[:uniqueId]
        String key = ConfigUniqueNameGenerator.getUniqueName(providerConfig);
        // 1. 缓存Invoker对象
        invokerMap.put(key, instance);
        // 2. 缓存接口的方法 {serviceUniqueName:{方法名#(参数类型列表):Method}}
        for (Method m : providerConfig.getProxyClass().getMethods()) {
            ReflectCache.putOverloadMethodCache(key, m);
        }
    }

    @Override
    public void unRegisterProcessor(ProviderConfig providerConfig, boolean closeIfNoEntry) {
        // 取消缓存Invoker对象
        String key = ConfigUniqueNameGenerator.getUniqueName(providerConfig);
        invokerMap.remove(key);
        // 如果最后一个需要关闭,则关闭
        if (closeIfNoEntry && invokerMap.isEmpty()) {
            stop();
        }
    }

    @Override
    public void destroy() {
        int stopTimeout = serverConfig.getStopTimeout();
        if (stopTimeout > 0) { // 需要等待结束时间
            AtomicInteger count = boltServerProcessor.processingCount;
            // 有正在执行的请求 或者 队列里有请求
            if (count.get() > 0 || bizThreadPool.getQueue().size() > 0) {
                long start = RpcRuntimeContext.now();
                while ((count.get() > 0 || bizThreadPool.getQueue().size() > 0) && RpcRuntimeContext.now() - start < stopTimeout) { // 等待返回结果
                    Thread.sleep(10);
                }
            }
        }

        // 关闭线程池
        bizThreadPool.shutdown();
        // 关闭 Server
        stop();
    }

    @Override
    public void destroy(DestroyHook hook) {
        if (hook != null) {
            hook.preDestroy();
        }
        destroy();
        if (hook != null) {
            hook.postDestroy();
        }
    }

    // 找到服务端Invoker
    public Invoker findInvoker(String serviceName) {
        return invokerMap.get(serviceName);
    }
}

========================== BusinessPool ==========================
public class BusinessPool {
    public static ThreadPoolExecutor initPool(ServerConfig serverConfig) {
        int minPoolSize = serverConfig.getCoreThreads();
        int maxPoolSize = serverConfig.getMaxThreads();
        int queueSize = serverConfig.getQueues();
        int aliveTime = serverConfig.getAliveTime();
        BlockingQueue<Runnable> poolQueue = queueSize > 0 ? new LinkedBlockingQueue<Runnable>(queueSize) : new SynchronousQueue<Runnable>();
        return new ThreadPoolExecutor(minPoolSize, maxPoolSize, aliveTime, TimeUnit.MILLISECONDS, poolQueue);
    }
}

========================== SofaRejectedExecutionHandler ==========================
public class SofaRejectedExecutionHandler implements RejectedExecutionHandler {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        throw new RejectedExecutionException();
    }
}

BoltServer 和其业务逻辑处理器 BoltServerProcessor 体现了 SOFABolt 在 SOFARPC 的使用姿势,是 SOFABolt 的最佳实践之一;关于 destroy 的内容会在《SOFARPC 源码分析 - 优雅停机的设计与实现》中进行分析。

  1. init() 中,创建了业务线程池 bizThreadPool,然后创建了业务逻辑处理器 BoltServerProcessor(这里的 bizThreadPool 会作为 BoltServerProcessor 的默认线程池,当用户没有自定义线程池时,由该线程池进行反序列化和处理业务逻辑)
  2. registerProcessor(ProviderConfig providerConfig, Invoker instance) 中,缓存了服务端调用链对象 ProviderProxyInvoker,key 为 interface:version[:uniqueId];缓存了接口方法于 {serviceUniqueName:{方法名#(参数类型列表):Method}},其中 serviceUniqueName 为 interface:version[:uniqueId]
  3. start() 中,首先初始化了一个 RemotingServer(SOFABolt 的类),为其绑定端口,注册业务逻辑处理器 BoltServerProcessor,然后启动 RemotingServer(此时会启动 Netty 服务端),最后如果启动了 ServerStartedEvent 事件监听功能,则发布 ServerStartedEvent 事件

关于线程池的精细选择(包括自定义线程池等),见 SOFABolt 源码分析10 - 精细的线程模型的设计
关于 SOFABolt 的基本使用,见 SOFABolt 源码分析1 - 最简使用姿势
关于业务逻辑处理器的设计与实现,见 SOFABolt 源码分析9 - UserProcessor 自定义处理器的设计

三、业务逻辑处理器 BoltServerProcessor

public class BoltServerProcessor extends AsyncUserProcessor<SofaRequest> {
    // 提前注册序列化器
    static {
        SofaRpcSerializationRegister.registerCustomSerializer();
    }

    // bolt server, which saved invoker map
    private final BoltServer boltServer;

    public BoltServerProcessor(BoltServer boltServer) {
        this.boltServer = boltServer;
        this.executorSelector = new UserThreadPoolSelector(); // 支持自定义业务线程池
    }

    // 记录当前Server正在处理的调用数量
    // 在 destroy 的时候,会在允许的关闭时间内将 processingCount 的正在处理的调用量和业务线程池内的任务进行尽可能多的处理
    AtomicInteger processingCount = new AtomicInteger(0);

    @Override
    public void handleRequest(BizContext bizCtx, AsyncContext asyncCtx, SofaRequest request) {
        // RPC内置上下文
        RpcInternalContext context = RpcInternalContext.getContext();
        context.setProviderSide(true);

        String appName = request.getTargetAppName();
        if (appName == null) {
            // 默认全局appName
            appName = (String) RpcRuntimeContext.get(RpcRuntimeContext.KEY_APPNAME);
        }

        // 是否链路异步化中
        boolean isAsyncChain = false;
        try { // 这个 try-finally 为了保证Context一定被清理
            processingCount.incrementAndGet(); // 统计值加1

            context.setRemoteAddress(bizCtx.getRemoteHost(), bizCtx.getRemotePort()); // 远程地址
            context.setAttachment(RpcConstants.HIDDEN_KEY_ASYNC_CONTEXT, asyncCtx); // 远程返回的通道

            if (RpcInternalContext.isAttachmentEnable()) {
                InvokeContext boltInvokeCtx = bizCtx.getInvokeContext();
                if (boltInvokeCtx != null) {
                    putToContextIfNotNull(boltInvokeCtx, InvokeContext.BOLT_PROCESS_WAIT_TIME,
                            context, RpcConstants.INTERNAL_KEY_PROCESS_WAIT_TIME); // rpc线程池等待时间 Long
                }
            }
            if (EventBus.isEnable(ServerReceiveEvent.class)) {
                EventBus.post(new ServerReceiveEvent(request));
            }

            // 开始处理
            SofaResponse response = null; // 响应,用于返回
            Throwable throwable = null; // 异常,用于记录
            ProviderConfig providerConfig = null;
            String serviceName = request.getTargetServiceUniqueName();

            invoke:
            {
                if (bizCtx.isRequestTimeout()) { // 加上丢弃超时的请求的逻辑
                    throwable = clientTimeoutWhenReceiveRequest(appName, serviceName, bizCtx.getRemoteAddress());
                    break invoke;
                }
                // 查找服务
                Invoker invoker = boltServer.findInvoker(serviceName);
                if (invoker instanceof ProviderProxyInvoker) {
                    providerConfig = ((ProviderProxyInvoker) invoker).getProviderConfig();
                    // 找到服务后,打印服务的appName
                    appName = providerConfig != null ? providerConfig.getAppName() : null;
                }
                // 查找方法
                String methodName = request.getMethodName();
                Method serviceMethod = ReflectCache.getOverloadMethodCache(serviceName, methodName, request.getMethodArgSigs());
                request.setMethod(serviceMethod);

                // 真正调用
                response = doInvoke(serviceName, invoker, request);

                if (bizCtx.isRequestTimeout()) { // 加上丢弃超时的响应的逻辑
                    throwable = clientTimeoutWhenSendResponse(appName, serviceName, bizCtx.getRemoteAddress());
                    break invoke;
                }
            }

            // Response不为空,代表需要返回给客户端
            if (response != null) {
                RpcInvokeContext invokeContext = RpcInvokeContext.peekContext();
                isAsyncChain = CommonUtils.isTrue(invokeContext != null ? (Boolean) invokeContext.remove(RemotingConstants.INVOKE_CTX_IS_ASYNC_CHAIN) : null);
                // 如果是服务端异步代理模式,特殊处理,因为该模式是在业务代码自主异步返回的
                if (!isAsyncChain) {
                    // 其它正常请求
                    try { // 这个try-catch 保证一定要记录tracer
                        asyncCtx.sendResponse(response);
                    } finally {
                        if (EventBus.isEnable(ServerSendEvent.class)) {
                            EventBus.post(new ServerSendEvent(request, response, throwable));
                        }
                    }
                }
            }
        } finally {
            processingCount.decrementAndGet();
            if (!isAsyncChain) {
                if (EventBus.isEnable(ServerEndHandleEvent.class)) {
                    EventBus.post(new ServerEndHandleEvent());
                }
            }
            RpcInvokeContext.removeContext();
            RpcInternalContext.removeAllContext();
        }
    }

    private SofaResponse doInvoke(String serviceName, Invoker invoker, SofaRequest request) throws SofaRpcException {
        return invoker.invoke(request);
    }

    private void putToContextIfNotNull(InvokeContext invokeContext, String oldKey,
                                       RpcInternalContext context, String key) {
        Object value = invokeContext.get(oldKey);
        if (value != null) {
            context.setAttachment(key, value);
        }
    }

    @Override
    public String interest() {
        return SofaRequest.class.getName();
    }

    @Override
    public Executor getExecutor() {
        return boltServer.getBizThreadPool();
    }

    @Override
    public ExecutorSelector getExecutorSelector() {
        return UserThreadPoolManager.hasUserThread() ? executorSelector : null;
    }

    public class UserThreadPoolSelector implements UserProcessor.ExecutorSelector {
        @Override
        public Executor select(String requestClass, Object requestHeader) {
            if (SofaRequest.class.getName().equals(requestClass) && requestHeader != null) {
                Map<String, String> headerMap = (Map<String, String>) requestHeader;
                String service = headerMap.get(RemotingConstants.HEAD_SERVICE);
                if (service == null) {
                    service = headerMap.get(RemotingConstants.HEAD_TARGET_SERVICE);
                }
                if (service != null) {
                    UserThreadPool threadPool = UserThreadPoolManager.getUserThread(service);
                    if (threadPool != null) {
                        Executor executor = threadPool.getExecutor();
                        if (executor != null) {
                            // 存在自定义线程池,且不为空
                            return executor;
                        }
                    }
                }
            }
            return getExecutor();
        }
    }

    @Override
    public boolean timeoutDiscard() {
        // 业务线程自己判断超时请求,不再使用 SOFABolt 的fail-fast功能,后续的超时与否从DefaultBizContext中来判定
        return false;
    }
}
  1. 注册了自定义的序列化器,SOFABolt 自定义序列化器的原理见 SOFABolt 源码分析20 - Serializer 序列化机制设计,SOFARPC 的序列化机制见 《SOFARPC 源码解析 - 序列化机制的设计与实现》
  2. 创建了 UserThreadPoolSelector 实例,支持用户自定义业务逻辑处理器,如果用户没有自定义业务逻辑线程池,则选择在初始化 BoltServer 时,创建的业务逻辑线程池 bizThreadPool,关于自定义线程池的使用见 http://www.sofastack.tech/sofa-rpc/docs/Custom-ThreadPool
UserThreadPool threadPool = new UserThreadPool();
threadPool.setCorePoolSize(10);
threadPool.setMaximumPoolSize(100);
threadPool.setKeepAliveTime(200);
threadPool.setPrestartAllCoreThreads(false);
threadPool.setAllowCoreThreadTimeOut(false);
threadPool.setQueueSize(200);

UserThreadPoolManager.registerUserThread(ConfigUniqueNameGenerator.getUniqueName(providerConfig), threadPool);
  1. 禁用了 SOFABolt 自带的超时 fail-fast 功能,超时在 handleRequest(BizContext bizCtx, AsyncContext asyncCtx, SofaRequest request) 中通过 DefaultBizContext.isRequestTimeout() 进行判断,然后记录日志,就结束了
  2. 业务逻辑的处理逻辑在 handleRequest(BizContext bizCtx, AsyncContext asyncCtx, SofaRequest request)
    (1)记录当前Server正在处理的调用数量,在 destroy 的时候,会在允许的关闭时间内将正在处理的调用量和业务线程池内的任务进行尽可能多的处理
    (2)如果启动了监听 ServerReceiveEvent 事件,则发送一个 ServerReceiveEvent 事件到事件总线
    (3)从 BoltServer 的调用链对象缓存中获取 key 为 interface:version[:uniqueId] 的服务端调用链对象 ProviderProxyInvoker
    (4)从接口方法缓存 {serviceUniqueName:{方法名#(参数类型列表):Method}} 中获取对应的 Method,并封装到 request 对象中
    (5)调用 ProviderProxyInvoker.invoker(request),进行调用链的执行,调用链的最末端是 ProviderInvoker.invoke(SofaRequest)ProviderInvoker.invoke(SofaRequest) 中从 SofaRequest 对象中抽取 Method对象,直接进行 Method.invoke 反射调用
    (6)执行完毕之后,将 Response 发送给调用端
    (7)如果启动了监听 ServerSendEvent 事件,则发送一个 ServerSendEvent 事件到事件总线
    (8)清理操作:当前Server正在处理的调用数量-1;如果启动了监听 ServerEndHandleEvent 事件,则发送一个 ServerEndHandleEvent 事件到事件总线;清空上下文

关于链路异步化 isAsyncChain,在《SOFARPC 源码分析 - 链路异步化的设计与实现》中进行分析

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 220,639评论 6 513
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 94,093评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 167,079评论 0 357
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,329评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,343评论 6 397
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 52,047评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,645评论 3 421
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,565评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 46,095评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,201评论 3 340
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,338评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 36,014评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,701评论 3 332
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,194评论 0 23
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,320评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,685评论 3 375
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,345评论 2 358

推荐阅读更多精彩内容