DUBBO线程模型和调度策略

DUBBO线程模型

线程模型

从官方描述来看dubbo线程模型支持业务线程和I/O线程分离,并且提供5种不同的调度策略。

调度策略

拿Netty组件为例(Netty4x),在NettyServer的构造方法中通过ChannelHandlers#wrap方法设置MultiMessageHandler,HeartbeatHandler并通过SPI扩展选择调度策略。

public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
        super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME))); // 线程派发
    }

ChannelHandlers#wrapInternal

protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
        // 选择调度策略 默认是all
        return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
                .getAdaptiveExtension().dispatch(handler, url))); //
    }

接下来看下NettyServer#doOpen方法 ,主要设置Netty的boss线程池数量为1,worker线程池(也就是I/O线程)为cpu核心数+1和向Netty中注测Handler用于消息的编解码和处理。

 protected void doOpen() throws Throwable {
        bootstrap = new ServerBootstrap();
        // 多线程模型
        // boss线程池,负责和消费者建立新的连接
        bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
        // worker线程池,负责连接的数据交换
        workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
                new DefaultThreadFactory("NettyServerWorker", true));

        final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
        channels = nettyServerHandler.getChannels();

        bootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE) // nagele 算法
                .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)// TIME_WAIT
                .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) //内存池
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                        ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                                .addLast("decoder", adapter.getDecoder()) //设置编解码器
                                .addLast("encoder", adapter.getEncoder())
                                .addLast("handler", nettyServerHandler);
                    }
                });
        // bind 端口
        ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
        channelFuture.syncUninterruptibly();
        channel = channelFuture.channel();

    }

可以看出,如果我们在一个JVM进程只暴露一个Dubbo服务端口,那么一个JVM进程只会有一个NettyServer实例,也会只有一个NettyHandler实例 。从上面代码也可以看出,Dubbo在Netty的Pipeline中只注册了三个Handler,而Dubbo内部也定义了一个ChannelHandler接口用来将和Channel相关的处理串起来,而第一个ChannelHandler就是由NettyHandler来调用的。有趣的是NettyServer本身也是一个ChannelHandler。当Dubbo将Spring容器中的服务实例做了动态代理的处理后,就会通过NettyServer#doOpen来暴露服务端口,再接着将服务注册到注册中心。这些步骤做完后,Dubbo的消费者就可以来和提供者建立连接了,当然是消费者来主动建立连接,而提供者在初始化连接后会调用NettyHandler#channelActive方法来创建一个NettyChannel:

public void channelActive(ChannelHandlerContext ctx) throws Exception {
        logger.debug("channelActive <" + NetUtils.toAddressString((InetSocketAddress) ctx.channel().remoteAddress()) + ">" + " channle <" + ctx.channel());
        //获取或者创建一个NettyChannel
        NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
        try {
            if (channel != null) {
                // <ip:port, channel>
                channels.put(NetUtils.toAddressString((InetSocketAddress) ctx.channel().remoteAddress()), channel);
            }
            // 这里的 handler就是NettyServer
            handler.connected(channel);
        } finally {
            NettyChannel.removeChannelIfDisconnected(ctx.channel());
        }
    }

与Netty和Dubbo都有自己的ChannelHandler一样,Netty和Dubbo也有着自己的Channel。该方法最后会调用NettyServer#connected方法来检查新添加channel后是否会超出提供者配置的accepts配置,如果超出,则直接打印错误日志并关闭该Channel,这样的话消费者端自然会收到连接中断的异常信息,详细可以见AbstractServer#connected方法。

public void connected(Channel ch) throws RemotingException {
        // If the server has entered the shutdown process, reject any new connection
        if (this.isClosing() || this.isClosed()) {
            logger.warn("Close new channel " + ch + ", cause: server is closing or has been closed. For example, receive a new connect request while in shutdown process.");
            ch.close();
            return;
        }

        Collection<Channel> channels = getChannels();
        //大于accepts的tcp连接直接关闭
        if (accepts > 0 && channels.size() > accepts) { 
            logger.error("Close channel " + ch + ", cause: The server " + ch.getLocalAddress() + " connections greater than max config " + accepts);
            ch.close();
            return;
        }
        super.connected(ch);
    }

注意的是在dubbo中消费者和提供者默认只建立一个TCP长连接,为了增加消费者调用服务提供者的吞吐量, 可以在消费方增加如下配置:

<dubbo:reference id="demoService" check="false" interface="org.apache.dubbo.demo.DemoService" connections="20"/>

而作为提供者可以使用accepts控制长连接的数量防止连接数量过多,配置如下:

<dubbo:protocol name="dubbo" port="20880" accepts="10"/>

当连接建立完成后,消费者就可以请求提供者的服务了,当请求到来,提供者这边会依次经过如下Handler的处理:

--->NettyServerHandler#channelRead接收请求消息

--- >AbstractPeer#received:如果服务已经关闭,则返回,否则调用下一个Handler来处理。

--->MultiMessageHandler#received:如果是批量请求,则依次对请求调用下一个Handler来处理。

​ --->HeartbeatHandler#received: 处理心跳消息。

​ --->AllChannelHandler#received:该Dubbo的Handler非常重要,因为从这里是IO线程池和业务线程池的隔离。

​ --->DecodeHandler#received: 消息解码

​ --->HeaderExchangeHandler #received :消息处理

​ --->DubboProtocol : 远程调用

看下AllChannelHandler#received:

public void received(Channel channel, Object message) throws RemotingException {
        // 获取业务线程池
        ExecutorService cexecutor = getExecutorService();
        try {
            // 使用线程池处理消息
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
         throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
        }
    }

这里对execute进行了异常捕获,这是因为I/O线程池是无界的,但业务线程池可能是有界的,所以进行execute提交可能会遇到RejectedExecutionException异常 。

那么这里是如何获取到业务线程池的那?实际上WrappedChannelHandler是xxxChannelHandlerd的装饰类,根据dubbo SPI可以知道,获取AllChannelHandler会首先实例化WrappedChannelHandler。

public WrappedChannelHandler(ChannelHandler handler, URL url) {
        this.handler = handler;
        this.url = url;
        // 获取业务线程池
        executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);

        String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY;
        if (Constants.CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(Constants.SIDE_KEY))) {
            componentKey = Constants.CONSUMER_SIDE;
        }
        DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
        dataStore.put(componentKey, Integer.toString(url.getPort()), executor);
    }

根据上面的代码可以看出dubbo业务线程池是在WrappedChannelHandler实例化的时候获取的。

接下来我们要看下dubbo的业务线程池模型,先上一个官方描述:

线程池扩展
  • FixedThreadPool:
public class FixedThreadPool implements ThreadPool {

    @Override
    public Executor getExecutor(URL url) {
        // 线程池名称DubboServerHanler-server:port
        String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
        // 缺省线程数量200
        int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
        // 任务队列类型
        int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);

        return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
                queues == 0 ? new SynchronousQueue<Runnable>() :
                        (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                : new LinkedBlockingQueue<Runnable>(queues)),
                new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }

}

缺省情况下使用200个线程和SynchronousQueue这意味着如果如果线程池所有线程都在工作再有新任务会直接拒绝。

  • CachedThreadPool:

    public class CachedThreadPool implements ThreadPool {
    
        @Override
        public Executor getExecutor(URL url) {
            String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
            // 核心线程数量 缺省为0
            int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
            // 最大线程数量 缺省为Integer.MAX_VALUE
            int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE);
            // queue 缺省为0
            int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
            // 空闲线程存活时间
            int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE);
            return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS,
                    queues == 0 ? new SynchronousQueue<Runnable>() :
                            (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                    : new LinkedBlockingQueue<Runnable>(queues)),
                    new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
        }
    }
    

    缓存线程池,可以看出如果提交任务的速度大于maxThreads将会不断创建线程,极端条件下将会耗尽CPU和内存资源。在突发大流量进入时不适合使用。

  • LimitedThreadPool:

    public class  LimitedThreadPool implements ThreadPool {
    
        @Override
        public Executor getExecutor(URL url) {
            String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
            // 缺省核心线程数量为0
            int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
            // 缺省最大线程数量200
            int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
            // 任务队列缺省0
            int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
            return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS,
                    queues == 0 ? new SynchronousQueue<Runnable>() :
                            (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                    : new LinkedBlockingQueue<Runnable>(queues)),
                    new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
        }
    
    }
    

    不配置的话和FixedThreadPool没有区别

  • EagerThreadPool :

    public class EagerThreadPool implements ThreadPool {
    
        @Override
        public Executor getExecutor(URL url) {
            String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
            // 0
            int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
            // Integer.MAX_VALUE
            int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE);
            // 0
            int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
            // 60s
            int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE);
    
            // init queue and executor
            // 初始任务队列为1
            TaskQueue<Runnable> taskQueue = new TaskQueue<Runnable>(queues <= 0 ? 1 : queues);
            EagerThreadPoolExecutor executor = new EagerThreadPoolExecutor(cores,
                    threads,
                    alive,
                    TimeUnit.MILLISECONDS,
                    taskQueue,
                    new NamedInternalThreadFactory(name, true),
                    new AbortPolicyWithReport(name, url));
            taskQueue.setExecutor(executor);
            return executor;
        }
    }
    

    EagerThreadPoolExecutor:

    public void execute(Runnable command) {
            if (command == null) {
                throw new NullPointerException();
            }
            // do not increment in method beforeExecute!
            //已提交任务数量
            submittedTaskCount.incrementAndGet();
            try {
                super.execute(command);
            } catch (RejectedExecutionException rx) { //大于最大线程数被拒绝任务 重新添加到任务队列
                // retry to offer the task into queue.
                final TaskQueue queue = (TaskQueue) super.getQueue();
                try {
                    if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) {
                        submittedTaskCount.decrementAndGet();
                        throw new RejectedExecutionException("Queue capacity is full.", rx);
                    }
                } catch (InterruptedException x) {
                    submittedTaskCount.decrementAndGet();
                    throw new RejectedExecutionException(x);
                }
            } catch (Throwable t) {
                // decrease any way
                submittedTaskCount.decrementAndGet();
                throw t;
            }
        }
    

    TaskQueue:

    public boolean offer(Runnable runnable) {
            if (executor == null) {
                throw new RejectedExecutionException("The task queue does not have executor!");
            }
            // 获取当前线程池中的线程数量
            int currentPoolThreadSize = executor.getPoolSize();
            // have free worker. put task into queue to let the worker deal with task.
            // 如果已经提交的任务数量小于当前线程池中线程数量(不是很理解这里的操作)
            if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {
                return super.offer(runnable);
            }
    
            // return false to let executor create new worker.
            //当前线程数小于最大线程程数直接创建新worker
            if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
                return false;
            }
    
            // currentPoolThreadSize >= max
            return super.offer(runnable);
        }
    

    优先创建Worker线程池。在任务数量大于corePoolSize但是小于maximumPoolSize时,优先创建Worker来处理任务。当任务数量大于maximumPoolSize时,将任务放入阻塞队列中。阻塞队列充满时抛出RejectedExecutionException。(相比于cached:cached在任务数量超过maximumPoolSize时直接抛出异常而不是将任务放入阻塞队列)

根据以上的代码分析,如果消费者的请求过快很有可能导致服务提供者业务线程池抛出RejectedExecutionException异常。这个异常是duboo的采用的线程拒绝策略AbortPolicyWithReport#rejectedExecution抛出的,并且会被反馈到消费端,此时简单的解决办法就是将提供者的服务调用线程池数目调大点,例如如下配置:

<dubbo:provider threads="500"/>
或
<dubbo:protocol name="dubbo" port="20882" accepts="10" threads="500"/>

我们为了保证模块内的主要服务有线程可用(防止次要服务抢占过多服务调用线程),可以对次要服务进行并发限制,例如:

<dubbo:service interface="org.apache.dubbo.demo.DemoService" ref="demoService" executes="100"/>

回过头来再看下dubbo得Dispatcher 策略默认是all,实际上比较好的处理方式是I/O线程和业务线程分离,所以采取message是比较好得配置。并且如果采用all如果使用的dubo版本比较低很有可能会触发dubbo的bug。

原因请参见:全部请求都使用业务线程池的问题

在dubbo重新维护之后这个bug已经被修复:

public void received(Channel channel, Object message) throws RemotingException {
        // 获取业务线程池
        ExecutorService cexecutor = getExecutorService();
        try {
            // 使用线程池处理消息
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
            //TODO A temporary solution to the problem that the exception information can not be sent to the opposite end after the thread pool is full. Need a refactoring
            //fix The thread pool is full, refuses to call, does not return, and causes the consumer to wait for time out
            if(message instanceof Request && t instanceof RejectedExecutionException){
                Request request = (Request)message;
                if(request.isTwoWay()){
                    String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage();
                    Response response = new Response(request.getId(), request.getVersion());
                    response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
                    response.setErrorMessage(msg);
                    channel.send(response);
                    return;
                }
            }
            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
        }
    }

如果是RejectedExecutionException异常直接返回给消费者。

建议的配置是:

<dubbo:protocol name="dubbo" port="8888" threads="500" dispatcher="message" />
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,258评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,335评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,225评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,126评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,140评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,098评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,018评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,857评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,298评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,518评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,678评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,400评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,993评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,638评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,801评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,661评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,558评论 2 352

推荐阅读更多精彩内容