Dubbo源码分析----容错

先看下Dubbo官方的一张图


image.png

Cluster是容错的核心,官方的说法是

Cluster 将 Directory 中的多个 Invoker 伪装成一个 Invoker,对上层透明,伪装过程包含了容错逻辑,调用失败后,重试另一个

即Cluster是对外暴露的一个接口,内部返回一个集群版的Invoker,通过不同的容错策略,对从Directory中获取的invoker有不同的调用方式

下面看下代码实现

AvailableCluster

public class AvailableCluster implements Cluster {
    
    public static final String NAME = "available";

    public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
        
        return new AbstractClusterInvoker<T>(directory) {
            public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
                for (Invoker<T> invoker : invokers) {
                    if (invoker.isAvailable()) {
                        return invoker.invoke(invocation);
                    }
                }
                throw new RpcException("No provider available in " + invokers);
            }
        };
        
    }

}

这个策略很简单,就是从List<Invoker<T>>中获取一个可用的Invoker来进行调用

BroadcastCluster

public class BroadcastClusterInvoker<T> extends AbstractClusterInvoker<T> {
    public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        checkInvokers(invokers, invocation);
        RpcContext.getContext().setInvokers((List)invokers);
        RpcException exception = null;
        Result result = null;
        for (Invoker<T> invoker: invokers) {
            try {
                result = invoker.invoke(invocation);
            } catch (RpcException e) {
                exception = e;
            } catch (Throwable e) {
                exception = e;
            }
        }
        if (exception != null) {
            throw exception;
        }
        return result;
    }

}

看名字就知道大概的意思,广播调用。看代码也很简单,遍历List<Invoker<T>>进行调用,如果有一个出现异常则抛出异常

FailbackCluster

public class FailbackClusterInvoker<T> extends AbstractClusterInvoker<T> {
    private final ConcurrentMap<Invocation, AbstractClusterInvoker<?>> failed = new ConcurrentHashMap<Invocation, AbstractClusterInvoker<?>>();

    public FailbackClusterInvoker(Directory<T> directory){
        super(directory);
    }

    private void addFailed(Invocation invocation, AbstractClusterInvoker<?> router) {
        if (retryFuture == null) {
            synchronized (this) {
                if (retryFuture == null) {
                    retryFuture = scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {

                        public void run() {
                            // 收集统计信息
                            try {
                                retryFailed();
                            } catch (Throwable t) { // 防御性容错
                                logger.error("Unexpected error occur at collect statistic", t);
                            }
                        }
                    }, RETRY_FAILED_PERIOD, RETRY_FAILED_PERIOD, TimeUnit.MILLISECONDS);
                }
            }
        }
        failed.put(invocation, router);
    }

    protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        try {
            checkInvokers(invokers, invocation);
            Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
            return invoker.invoke(invocation);
        } catch (Throwable e) {
            addFailed(invocation, this);
            return new RpcResult(); // ignore
        }
    }

}

到这里就和上面的几种方式不一样了,上面的都是直接对List<Invoker<T>>进行操作,而这里首先需要通过负载均衡策略获取到一个Invoker,然后才进行调用,这个方法是普遍的,那么出错怎么处理才是核心。
这里出错的时候,会调用addFailed方法
首先addFailed方法,使用了double-check的方式来初始化retryFuture,保证其是单例的。如果是已经初始化的,直接放入map中等待定时重试,如何重试那么要看retryFailed方法

    void retryFailed() {
        if (failed.size() == 0) {
            return;
        }
        for (Map.Entry<Invocation, AbstractClusterInvoker<?>> entry : new HashMap<Invocation, AbstractClusterInvoker<?>>(
                                                                                                                         failed).entrySet()) {
            Invocation invocation = entry.getKey();
            Invoker<?> invoker = entry.getValue();
            try {
                invoker.invoke(invocation);
                failed.remove(invocation);
            } catch (Throwable e) {
                logger.error("Failed retry to invoke method " + invocation.getMethodName() + ", waiting again.", e);
            }
        }
    }

重试逻辑也很简单,从map中拿出失败的Invoker进行调用,成功则从failed中移除

FailfastCluster

public class FailfastClusterInvoker<T> extends AbstractClusterInvoker<T>{
    public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        checkInvokers(invokers, invocation);
        Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
        try {
            return invoker.invoke(invocation);
        } catch (Throwable e) {
            if (e instanceof RpcException && ((RpcException)e).isBiz()) { // biz exception.
                throw (RpcException) e;
            }
            throw new RpcException(e instanceof RpcException ? ((RpcException)e).getCode() : 0, "Failfast invoke providers " + invoker.getUrl() + " " + loadbalance.getClass().getSimpleName() + " select from all providers " + invokers + " for service " + getInterface().getName() + " method " + invocation.getMethodName() + " on consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);
        }
    }

这个策略也很简单,从名字也可以猜出是什么功能:快速失败。
从代码中看,选出一个Invoker进行调用,如果失败,那么不重试

FailoverCluster

类上的注释:

失败转移,当出现失败,重试其它服务器,通常用于读操作,但重试会带来更长延迟。
看到注释,有几个问题:

  1. 重试其他服务器,这个其他服务器是随机挑选的吗?
  2. 如果所有服务器都失败,还会继续重试吗?
  3. 会重试几次呢?

我们看下代码具体是怎么实现的:

public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> {

    public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        List<Invoker<T>> copyinvokers = invokers;
        checkInvokers(copyinvokers, invocation);
        int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
        if (len <= 0) {
            len = 1;
        }
        // retry loop.
        RpcException le = null; // last exception.
        List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // 已经调用过的Invoker集合.
        Set<String> providers = new HashSet<String>(len);
        for (int i = 0; i < len; i++) {
            //重试时,进行重新选择,避免重试时invoker列表已发生变化.
            //注意:如果列表发生了变化,那么invoked判断会失效,因为invoker示例已经改变
            if (i > 0) {
                        //进行到这里,证明第一次已经失败
                checkWheatherDestoried();
                copyinvokers = list(invocation);
                //重新检查一下
                checkInvokers(copyinvokers, invocation);
            }
            Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
            invoked.add(invoker);
            RpcContext.getContext().setInvokers((List)invoked);
            try {
                Result result = invoker.invoke(invocation);
                return result;
            } catch (RpcException e) {
                if (e.isBiz()) { // biz exception.
                    throw e;
                }
                le = e;
            } catch (Throwable e) {
                le = new RpcException(e.getMessage(), e);
            } finally {
                providers.add(invoker.getUrl().getAddress());
            }
        }
        throw new RpcException(...);
    }

}

首先,从url中获取重试次数,在这个基础上+1,进行len次调用。
调用的过程和其他策略,都是先使用LoadBalance策略选出一个Invoker进行调用,但是有没有注意到,select方法在其他策略里传入的是null,这里传入的是List,这会导致什么不同的结果,那么需要看下select的实现

    protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
        if (invokers == null || invokers.size() == 0)
            return null;
        String methodName = invocation == null ? "" : invocation.getMethodName();
        
        boolean sticky = invokers.get(0).getUrl().getMethodParameter(methodName,Constants.CLUSTER_STICKY_KEY, Constants.DEFAULT_CLUSTER_STICKY) ;
        {
            //ignore overloaded method
            if ( stickyInvoker != null && !invokers.contains(stickyInvoker) ){
                stickyInvoker = null;
            }
            //ignore cucurrent problem
            if (sticky && stickyInvoker != null && (selected == null || !selected.contains(stickyInvoker))){
                if (availablecheck && stickyInvoker.isAvailable()){
                    return stickyInvoker;
                }
            }
        }
        Invoker<T> invoker = doselect(loadbalance, invocation, invokers, selected);
        
        if (sticky){
            stickyInvoker = invoker;
        }
        return invoker;
    }

select不是核心所在,而是sticky这个参数的实现,有兴趣的可以研究一下,我们主要看核心方法doselect

    private Invoker<T> doselect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
        if (invokers == null || invokers.size() == 0)
            return null;
        if (invokers.size() == 1)// 如果只有一个invoker,那么没法负载均衡了,只能选这个了
            return invokers.get(0);
        // 如果只有两个invoker,退化成轮循
        // 如果有两个Invoker,那么不需要进行复杂的负载均衡计算,这个不行就选另外一个
        if (invokers.size() == 2 && selected != null && selected.size() > 0) {
            return selected.get(0) == invokers.get(0) ? invokers.get(1) : invokers.get(0);
        }
        // 使用LB策略选出一个Invoker
        Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation);
        
        //如果 selected中包含(优先判断) 或者 不可用&&availablecheck=true 则重试.
        if( (selected != null && selected.contains(invoker))
                ||(!invoker.isAvailable() && getUrl()!=null && availablecheck)){
            try{
                // 重新选择
                Invoker<T> rinvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck);
                if(rinvoker != null){
                    invoker =  rinvoker;
                }else{
                    //看下第一次选的位置,如果不是最后,选+1位置.
                    int index = invokers.indexOf(invoker);
                    try{
                        //最后在避免碰撞
                        invoker = index <invokers.size()-1?invokers.get(index+1) :invoker;
                    }catch (Exception e) {
                        logger.warn(e.getMessage()+" may because invokers list dynamic change, ignore.",e);
                    }
                }
            }catch (Throwable t){
                logger.error("clustor relselect fail reason is :"+t.getMessage() +" if can not slove ,you can set cluster.availablecheck=false in url",t);
            }
        }
        return invoker;
    } 

通过LB选出一个Invoker之后,会判断改Invoker是否符合条件,不符合条件会进行重新选择,如果选出的不为空那么直接返回,如果重新选择之后还是返回null,那么如果原来选中的这个Invoker是List<Invoker<T>>最后一个元素,那么还是使用这个Invoker,否则在该Invoker后找一个Invoker(这里为什么是List<Invoker<T>>最后一个元素那么还是选回原来的Invoker,而不是另外的元素,例如第一个,或者),是看下reselect的实现

    private Invoker<T> reselect(LoadBalance loadbalance,Invocation invocation,
                                List<Invoker<T>> invokers, List<Invoker<T>> selected ,boolean availablecheck)
            throws RpcException {
        
        List<Invoker<T>> reselectInvokers = new ArrayList<Invoker<T>>(invokers.size()>1?(invokers.size()-1):invokers.size());
        
        //先从非select中选
        if( availablecheck ){ //选isAvailable 的非select
            for(Invoker<T> invoker : invokers){
                if(invoker.isAvailable()){
                    if(selected ==null || !selected.contains(invoker)){
                        reselectInvokers.add(invoker);
                    }
                }
            }
            if(reselectInvokers.size()>0){
                return  loadbalance.select(reselectInvokers, getUrl(), invocation);
            }
        }else{ //选全部非select
            for(Invoker<T> invoker : invokers){
                if(selected ==null || !selected.contains(invoker)){
                    reselectInvokers.add(invoker);
                }
            }
            if(reselectInvokers.size()>0){
                return  loadbalance.select(reselectInvokers, getUrl(), invocation);
            }
        }
        //到了这里,证明 reselectInvokers为空,那么从已选的列表中选择
        {
            if(selected != null){
                for(Invoker<T> invoker : selected){
                    if((invoker.isAvailable()) //优先选available 
                            && !reselectInvokers.contains(invoker)){
                        reselectInvokers.add(invoker);
                    }
                }
            }
            if(reselectInvokers.size()>0){
                return  loadbalance.select(reselectInvokers, getUrl(), invocation);
            }
        }
        return null;
    }

总的分3个分支

  1. 选择不在selected中的Invoker
  2. 选择不在selected中的Invoker且可用的invoker
  3. 上面选择出来的invokers为空,那么从已选的列表中选择

如果上面3种情况都不能返回一个invoker,那么才会执行这部操作

invoker = index <invokers.size()-1?invokers.get(index+1) :invoker;

那么到这里,整个选择流程就结束了,上面几个问题也有了答案

  1. 重试其他的服务器,这个不是随机挑选的,需要根据LB策略,提供方是否可用等策略进行判断,重试过的在下一次重试的时候,基本不会从这个重试过的服务器选择(看到代码中,某些情况还是会选择已经选择过的)
  2. 重试有一定的次数,次数为retries参数+1,如果都失败了,那么只能抛出异常了
  3. 重试有一定的次数,次数为retries参数+1

另外,重试的时候有个注意点

      if (i > 0) {
                   //进行到这里,证明第一次已经失败
          checkWheatherDestoried();
          copyinvokers = list(invocation);
          checkInvokers(copyinvokers, invocation);
      }

由于invoker会动态变化(例如某个服务器挂了,或者某个服务器被禁用了,甚至整个Dubbo实例已经关闭了),而在一开始的时候拿到的invokers是当时的可用的invoker列表,所以可能存在某个invoker已经不可用了,那么需要检查一下,然后从Directory中获取最新的Invoker列表

FailsafeCluster

public class FailsafeClusterInvoker<T> extends AbstractClusterInvoker<T>{

    public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        try {
            checkInvokers(invokers, invocation);
            Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
            return invoker.invoke(invocation);
        } catch (Throwable e) {
            logger.error("Failsafe ignore exception: " + e.getMessage(), e);
            return new RpcResult(); // ignore
        }
    }
}

这个比较简单,失败了记录日志....

ForkingCluster

这也是一种比较有意思的容错策略,先看下官方描述

并行调用,只要一个成功即返回,通常用于实时性要求较高的操作,但需要浪费更多服务资源

看了描述,有几个问题

  1. 并行调用是把所有invoker拿来并行调用吗?
  2. 如何进行并行调用?
  3. 有一个成功即返回,这个怎么做?
  4. 如果有失败的怎么处理?

带着问题去看源码

public class ForkingClusterInvoker<T> extends AbstractClusterInvoker<T>{

    private final ExecutorService executor = Executors.newCachedThreadPool(new NamedThreadFactory("forking-cluster-timer", true)); 
    public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        checkInvokers(invokers, invocation);
        final List<Invoker<T>> selected;
        final int forks = getUrl().getParameter(Constants.FORKS_KEY, Constants.DEFAULT_FORKS);
        final int timeout = getUrl().getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
        //如果并行数大于invoker的数量或小于0,那么直接用并行调用所有invoker
        // 因为如果并行为4,而invoker数量为3,那么其实最大并行量也只有3
        if (forks <= 0 || forks >= invokers.size()) {
            selected = invokers;
        } else {
            // 如果并行数大于invoker的数量,那么就需要挑选invoker了,例如并行为4,invoker数量为5,那么从5个挑4个进行并行调用
            // 这里和failover类似,通过select选出一个invoker,然后放到selected里,保证不重复
            selected = new ArrayList<Invoker<T>>();//并行调用的invoker集合
            for (int i = 0; i < forks; i++) {
                //在invoker列表(排除selected)后,如果没有选够,则存在重复循环问题.见select实现.
                Invoker<T> invoker = select(loadbalance, invocation, invokers, selected);
                if(!selected.contains(invoker)){//防止重复添加invoker
                    selected.add(invoker);
                }
            }
        }
        RpcContext.getContext().setInvokers((List)selected);
        // 记录失败次数
        final AtomicInteger count = new AtomicInteger();
        //返回结果放到队列中
        final BlockingQueue<Object> ref = new LinkedBlockingQueue<Object>();
        // 每个invoker调用都使用一个线程调用
        for (final Invoker<T> invoker : selected) {
            executor.execute(new Runnable() {
                public void run() {
                    try { 
                        Result result = invoker.invoke(invocation);
                        ref.offer(result);
                    } catch(Throwable e) {
                        int value = count.incrementAndGet();
                        if (value >= selected.size()) {//当所有的invoker都调用失败则把异常加入到队列中
                            ref.offer(e);
                        }
                    }
                }
            });
        }
        try {
            // 从队列中获取结果,该过程阻塞
            Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS);
            if (ret instanceof Throwable) {// 当所有
                Throwable e = (Throwable) ret;
                throw new RpcException(e instanceof RpcException ? ((RpcException)e).getCode() : 0, "Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);
            }
            return (Result) ret;
        } catch (InterruptedException e) {
            throw new RpcException("Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e);
        }
    }
}

看到源码,答案就出来了

  1. 如果并行数大于invoker数或者小于0,那么拿全部invoker进行调用
  2. 每个invoker使用一个线程执行
  3. 每个invoker调用后,会返回结果,结果会放到队列中,主线程会使用poll从队列获取值,只要有一个线程从invoker中获取到数据,那么就返回结果
  4. 当所有的invoker都失败了,那么队列就放的是异常,而不是结果,主线程poll会判断该返回值,如果是异常则抛出
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,445评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,889评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,047评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,760评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,745评论 5 367
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,638评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,011评论 3 398
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,669评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,923评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,655评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,740评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,406评论 4 320
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,995评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,961评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,197评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,023评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,483评论 2 342

推荐阅读更多精彩内容