Dubbo剖析-并发控制

一、前言

前面讲解了Dubbo的服务降级,本节我们来讲解dubbo中的并发控制,并发控制分为客户端并发控制和服务端并发控制。

二、并发控制

2.1 客户端并发控制

在服务消费方法进行并发控制需要设置actives参数,如下:

<dubbo:reference id="userService" interface="com.test.UserServiceBo"
        group="dubbo" version="1.0.0" timeout="3000" actives="10"/>

设置com.test.UserServiceBo接口中所有方法,每个方法最多同时并发请求10个请求。

也可以使用下面方法设置接口中的单个方法的并发请求个数,如下:


    <dubbo:reference id="userService" interface="com.test.UserServiceBo"
        group="dubbo" version="1.0.0" timeout="3000">
                <dubbo:method name="sayHello" actives="10" />
    </dubbo:reference>

如上设置sayHello方法的并发请求数量最大为10,如果客户端请求该方法并发超过了10则客户端会被阻塞,等客户端并发请求数量少于10的时候,该请求才会被发送到服务提供方服务器。在dubbo中客户端并发控制是使用ActiveLimitFilter过滤器来控制的,代码如下:

public class ActiveLimitFilter implements Filter {

    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        URL url = invoker.getUrl();
        String methodName = invocation.getMethodName();
        //获取设置的acvites的值,默认为0
        int max = invoker.getUrl().getMethodParameter(methodName, Constants.ACTIVES_KEY, 0);
        //获取当前方法目前并发请求数量
        RpcStatus count = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName());
        if (max > 0) {//说明设置了actives变量
            long timeout = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, 0);
            long start = System.currentTimeMillis();
            long remain = timeout;
            int active = count.getActive();
            //如果该方法并发请求数量大于设置值,则挂起当前线程。
            if (active >= max) {
                synchronized (count) {
                    while ((active = count.getActive()) >= max) {
                        try {
                            count.wait(remain);
                        } catch (InterruptedException e) {
                        }
                        //如果等待时间超时,则抛出异常
                        long elapsed = System.currentTimeMillis() - start;
                        remain = timeout - elapsed;
                        if (remain <= 0) {
                            throw new RpcException("Waiting concurrent invoke timeout in client-side for service:  "
                                    + invoker.getInterface().getName() + ", method: "
                                    + invocation.getMethodName() + ", elapsed: " + elapsed
                                    + ", timeout: " + timeout + ". concurrent invokes: " + active
                                    + ". max concurrent invoke limit: " + max);
                        }
                    }
                }
            }
        }
        //没有限流时候,正常调用
        try {
            long begin = System.currentTimeMillis();
            RpcStatus.beginCount(url, methodName);
            try {
                Result result = invoker.invoke(invocation);
                RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, true);
                return result;
            } catch (RuntimeException t) {
                RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, false);
                throw t;
            }
        } finally {
            if (max > 0) {
                synchronized (count) {
                    count.notify();
                }
            }
        }
    }

}

可知客户端并发控制,是如果当并发量达到指定值后,当前客户端请求线程会被挂起,如果在等待超时期间并发请求量少了,那么阻塞的线程会被激活,然后发送请求到服务提供方,如果等待超时了,则直接抛出异常,这时候服务根本都没有发送到服务提供方服务器。

2.2 服务端并发控制

在服务提供方进行并发控制需要设置executes参数,如下:

        <dubbo:service interface="com.test.UserServiceBo" ref="userService"
            group="dubbo"  version="1.0.0" timeout="3000" executes="10"/>

设置com.test.UserServiceBo接口中所有方法,每个方法最多同时并发处理10个请求,这里并发是指同时在处理10个请求。

也可以使用下面方法设置接口中的单个方法的并发处理个数,如下:



        <dubbo:service interface="com.test.UserServiceBo" ref="userService"
            group="dubbo" version="1.0.0" timeout="3000" >
            <dubbo:method name="sayHello" executes="10" />
        </dubbo:service>

如上设置sayHello方法的并发处理数量为10.

需要注意的是,服务提供方设置并发数量后,如果同时请求数量大于了设置的executes的值,则会抛出异常,而不是像消费端设置actives时候,会等待。服务提供方并发控制是使用ExecuteLimitFilter过滤器实现的,ExecuteLimitFilter代码如下:

    public class ExecuteLimitFilter implements Filter {

        public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
            URL url = invoker.getUrl();
            //默认不设置executes时候,其值为0
            int max = url.getMethodParameter(methodName, Constants.EXECUTES_KEY, 0);
            if (max > 0) {//max>0说明设置了executes值
                RpcStatus count = RpcStatus.getStatus(url, invocation.getMethodName());
                //可知如果并发处理数量大于设置的值,会抛出异常
                executesLimit = count.getSemaphore(max);
                if(executesLimit != null && !(acquireResult = executesLimit.tryAcquire())) {
                    throw new RpcException("Failed to invoke method " + invocation.getMethodName() + " in provider " + url + ", cause: The service using threads greater than <dubbo:service executes=\"" + max + "\" /> limited.");
                }
            }
            ...
            try {//没有限流时候,激活filter链
                Result result = invoker.invoke(invocation);
                return result;
            } catch (Throwable t) {
             ...
            } finally {
               ...
            }
        }
    }

所以当使用executes参数时候要注意,当并发量过大时候,多余的请求会失败。

三、总结

本节我们讲解了dubbo中客户端并发控制和服务端并发控制。另外另外想系统学dubbo的单击我 ,想学并发的童鞋可以 单击我

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 135,087评论 19 139
  • 听,见真知 《逻辑思维》 微信群管理 1.微信弱关系群人员多、情况复杂,最好采用比较保守的策略:不冒险,不冒进。 ...
    风月潇湘阅读 509评论 0 0
  • 作者|伏枥老马 最近,在网上看到一则视频,光天化日,朗朗乾坤,一个算卦的竟然在路边,手插进一个女子的衣服里,表情肃...
    道讯阅读 3,296评论 8 8
  • 中医让我们“盲目”乐观,一高兴病就好了!西医叫我们“莫名”悲哀,一难过,病越来越严重…… ——《滴水斋悟语》
    则贤阅读 170评论 0 2
  • 又到一年高考季,记得那时候什么也没有除了梦想。无奈凡考试就各种不适,。后来才知道这就是心理学中所谓的考试焦虑。 ...
    千古东方阅读 1,021评论 1 8