Dubbo源码分析(八)集群容错机制

前言

在上一章节,我们曾提到这样一个问题:
当调用服务失败后,我们怎么处理当前的请求?抛出异常亦或是重试?

为了解决这个问题,Dubbo 定义了集群接口 Cluster 以及 Cluster Invoker。集群 Cluster 用途是将多个服务提供者合并为一个 Cluster Invoker,并将这个 Invoker 暴露给服务消费者。这样一来,服务消费者只需通过这个 Invoker 进行远程调用即可,至于具体调用哪个服务提供者,以及调用失败后如何处理等问题,现在都交给集群模块去处理。

一、合并

在服务引用的过程中,我们最终会将一个或多个服务提供者Invoker封装成服务目录对象,但最后还要将它合并转换成Cluster Invoker对象。
Invoker invoker = cluster.join(directory);

这里的cluster就是扩展点自适应类,在Dubbo中默认是Failover,所以上面代码会调用到:

public class FailoverCluster implements Cluster {

    public final static String NAME = "failover";
    public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
        return new FailoverClusterInvoker<T>(directory);
    }
}

上面的代码很简单,所以最后的Invoker对象指向的是FailoverClusterInvoker实例。它也是一个Invoker,它继承了抽象的AbstractClusterInvoker

我们看下AbstractClusterInvoker类中的invoke方法。

public abstract class AbstractClusterInvoker<T> implements Invoker<T> {

    
    public Result invoke(final Invocation invocation) throws RpcException {
      
        LoadBalance loadbalance = null;
        //调用服务目录,获取所有的服务提供者Invoker对象
        List<Invoker<T>> invokers = directory.list(invocation);
        if (invokers != null && !invokers.isEmpty()) {
            //加载负载均衡组件
            loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).
                getExtension(invokers.get(0).getUrl().
                getMethodParameter(invocation.getMethodName(), "loadbalance", "random"));
        }
        RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
        //调用子类实现 ,不同的集群容错机制
        return doInvoke(invocation, invokers, loadbalance);
    }
}

以上代码也很简单,我们分为三个步骤来看

  • 调用服务目录,获取所有的服务提供者列表
  • 加载负载均衡组件
  • 调用子类实现,转发请求

关于负载均衡我们后续再深入了解,这是只知道它负责从多个Invoker中选取一个返回就行。

二、集群容错策略

Dubbo为我们提供了多种集群容错机制。主要如下:

  • Failover Cluster - 失败自动切换

FailoverClusterInvoker在调用失败时,会自动切换 Invoker 进行重试。默认配置下,Dubbo 会使用这个类作为缺省 Cluster Invoker。

  • Failfast Cluster - 快速失败

FailfastClusterInvoker 只会进行一次调用,失败后立即抛出异常。

  • Failsafe Cluster - 失败安全

FailsafeClusterInvoker 当调用过程中出现异常时,仅会打印异常,而不会抛出异常。

  • Failback Cluster - 失败自动恢复

FailbackClusterInvoker 会在调用失败后,返回一个空结果给服务提供者。并通过定时任务对失败的调用进行重传,适合执行消息通知等操作。

  • Forking Cluster - 并行调用多个服务提供者

ForkingClusterInvoker 会在运行时通过线程池创建多个线程,并发调用多个服务提供者。只要有一个服务提供者成功返回了结果,doInvoke 方法就会立即结束运行。ForkingClusterInvoker 的应用场景是在一些对实时性要求比较高读操作(注意是读操作,并行写操作可能不安全)下使用,但这将会耗费更多的资源。

  • BroadcastClusterInvoker - 广播

BroadcastClusterInvoker 会逐个调用每个服务提供者,如果其中一台报错,在循环调用结束后,BroadcastClusterInvoker 会抛出异常。该类通常用于通知所有提供者更新缓存或日志等本地资源信息。

三、自动切换

FailoverClusterInvoker 在调用失败时,会自动切换 Invoker 进行重试。我们重点看它的doInvoke方法。

public Result doInvoke(Invocation invocation, 
        final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {

    List<Invoker<T>> copyinvokers = invokers;
    //检查invokers是否为空
    checkInvokers(copyinvokers, invocation);
    //获取重试次数 这里默认是3次
    int len = getUrl().getMethodParameter(invocation.getMethodName(), "retries",2) + 1;
    if (len <= 0) {
        len = 1;
    }
    //异常信息对象
    RpcException le = null; // last exception.
    List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size());
    Set<String> providers = new HashSet<String>(len);
    
    //循环调用 失败重试len次
    for (int i = 0; i < len; i++) {
        if (i > 0) {
            checkWhetherDestroyed();
            //重新获取服务提供者列表
            copyinvokers = list(invocation);
            //再次检查
            checkInvokers(copyinvokers, invocation);
        }
        //通过loadbalance选取一个Invoker
        Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
        invoked.add(invoker);
        RpcContext.getContext().setInvokers((List) invoked);
        try {
            //调用服务
            Result result = invoker.invoke(invocation);
            if (le != null && logger.isWarnEnabled()) {
                logger.warn("");
            }
            return result;
        } catch (RpcException e) {
            if (e.isBiz()) {
                throw e;
            }
            le = e;
        } catch (Throwable e) {
            le = new RpcException(e.getMessage(), e);
        } finally {
            providers.add(invoker.getUrl().getAddress());
        }
    }
    //重试失败 
    throw new RpcException("");
}

我们可以看到,它的重点是invoker的调用是在一个循环方法中。只要不return,就会一直调用,重试 len 次。我们总结下它的过程:

  • 检查invokers是否为空
  • 获取重试次数,默认为3
  • 进入循环
  • 如果是重试,再次获取服务提供者列表,并校验
  • 选取Invoker,并调用
  • 无异常,返回结果,循环结束
  • 捕获到异常,继续循环调用直至重试最大次数

四、快速失败

FailfastClusterInvoker就很简单了,它只会进行一次调用,失败后立即抛出异常。

public Result doInvoke(Invocation invocation, 
        List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        
    checkInvokers(invokers, invocation);
    Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
    try {
        return invoker.invoke(invocation);
    } catch (Throwable e) {
        if (e instanceof RpcException && ((RpcException) e).isBiz()) {
            throw (RpcException) e;
        }
        throw new RpcException("....");
    }
}

五、失败安全

FailsafeClusterInvoker跟上面这个差异不大,它调用失败后并不抛出异常。而是打印异常信息并返回一个空的结果对象。

public Result doInvoke(Invocation invocation, 
    List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
    
    try {
        checkInvokers(invokers, invocation);
        Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
        return invoker.invoke(invocation);
    } catch (Throwable e) {
        logger.error("Failsafe ignore exception: " + e.getMessage(), e);
        return new RpcResult();
    }
}

六、自动恢复

FailbackClusterInvoker 会在调用失败后,也是打印异常信息并返回一个空的结果对象,但是还没结束,它还会偷偷开启一个定时任务,再次去调用。

protected Result doInvoke(Invocation invocation, 
        List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
    
    try {
        checkInvokers(invokers, invocation);
        Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
        return invoker.invoke(invocation);
    } catch (Throwable e) {
        logger.error("Failback to invoke method " + invocation.getMethodName() + ", 
            wait for retry in background. Ignored exception: "
                + e.getMessage() + ", ", e);
        //添加失败信息
        addFailed(invocation, this);
        return new RpcResult();
    }
}

我们可以看到,调用失败后,除了打印异常信息和返回空结果对象之外,还有一个方法addFailed 它就是开启定时任务的地方。

1、开启定时任务

首先,定义一个包含2个线程的线程池对象。

Executors.newScheduledThreadPool(2, new NamedThreadFactory("failback-cluster-timer", true));

然后,延迟5秒后,每隔5秒调用retryFailed方法,直到调用成功。

private void addFailed(Invocation invocation, AbstractClusterInvoker<?> router) {
    if (retryFuture == null) {
        synchronized (this) {
            if (retryFuture == null) {
                retryFuture = scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {
                    public void run() {
                        try {
                            //重试方法
                            retryFailed();
                        } catch (Throwable t) { 
                            logger.error("Unexpected error occur at collect statistic", t);
                        }
                    }
                }, 5000, 5000, TimeUnit.MILLISECONDS);
            }
        }
    }
    //ConcurrentHashMap 添加失败任务
    failed.put(invocation, router);
}

最后,我们需要注意failed.put(invocation, router); 它将当前失败的任务添加到failed,它是一个ConcurrentHashMap对象。

2、重试

重试的逻辑也不复杂,从failed对象中获取失败的记录,调用即可。

void retryFailed() {
    
    //如果为空,说明已经没有了失败的任务
    if (failed.size() == 0) {
        return;
    }
    //遍历failed,对失败的调用进行重试
    Set<Entry<Invocation, AbstractClusterInvoker<?>>> failedSet = failed.entrySet();     
    for (Entry<Invocation, AbstractClusterInvoker<?>> entry : failedSet) {
        Invocation invocation = entry.getKey();
        Invoker<?> invoker = entry.getValue();
        try {
            // 再次进行调用
            invoker.invoke(invocation);
            // 调用成功后,从 failed 中移除 invoker
            failed.remove(invocation);
        } catch (Throwable e) {
            logger.error("......", e);
        }
    }
}

如上代码,其中的重点是调用成功后,要将invocation移除。当再次调用到这个方法,开头的条件判断成立,就直接返回,不再继续调用。

3、问题

实际上,这套自动恢复的机制是有点小问题的。只要有一次调用失败,就会开启定时任务不断重试调用,直至成功。但问题是,即便重试调用成功后,定时任务并不会关闭,会持续的调用retryFailed方法。虽然这个方法有个判断,会直接返回。

如果服务调用失败次数多了之后,就会有大量的线程以5s的间隔,不断调用这个方法。

这句话不严谨。当时笔者是新开的消费者端项目,才看到有大量的新建线程;但如果是同一个服务中,自始至终就是一开始创建的2个线程在运行。不过空跑的情况依然存在。

笔者建议,如果有此类需求,不要直接用Dubbo中的这个Cluster。最好利用SPI机制重写一个方法来实现。

七、并行调用

ForkingClusterInvoker 会在运行时通过线程池创建多个线程,并发调用多个服务提供者。只要有一个服务提供者成功返回了结果,doInvoke 方法就会立即结束运行。

public Result doInvoke(final Invocation invocation, 
        List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
    
    final List<Invoker<T>> selected;
    //获取最大并行数 默认为2
    final int forks = getUrl().getParameter("forks", 2);
    //超时时间
    final int timeout = getUrl().getParameter("timeout", 1000);
    if (forks <= 0 || forks >= invokers.size()) {
        selected = invokers;
    } else {
        selected = new ArrayList<Invoker<T>>();
        //选择Invoker 并添加到selected
        for (int i = 0; i < forks; i++) {
            Invoker<T> invoker = select(loadbalance, invocation, invokers, selected);
            if (!selected.contains(invoker)) {//Avoid add the same invoker several times.
                selected.add(invoker);
            }
        }
    }
    RpcContext.getContext().setInvokers((List) selected);
    final AtomicInteger count = new AtomicInteger();
    //阻塞队列 先进先出
    final BlockingQueue<Object> ref = new LinkedBlockingQueue<Object>();
    for (final Invoker<T> invoker : selected) {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    //调用服务 将结果放入队列
                    Result result = invoker.invoke(invocation);
                    ref.offer(result);
                } catch (Throwable e) {
                    //如果异常调用次数大于等于最大并行数
                    int value = count.incrementAndGet();
                    if (value >= selected.size()) {
                        ref.offer(e);
                    }
                }
            }
        });
    }
    try {
        //从队列中获取结果
        Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS);
        if (ret instanceof Throwable) {
            Throwable e = (Throwable) ret;
            throw new RpcException("....");
        }
        return (Result) ret;
    } catch (InterruptedException e) {
        throw new RpcException(e.getMessage(), e);
    }
}

以上代码的重点就是阻塞队列LinkedBlockingQueue。如果有结果放入,poll方法会立即返回,完成整个调用。我们再总结下整体流程:

  • 获取最大并行数,默认为2;获取超时时间
  • 选择Invoker,并添加到selected
  • 通过newCachedThreadPool创建多个线程,调用服务。
  • 正常返回后,将结果offer到队列。此时调用流程结束,返回正常信息。
  • 调用服务异常后,判断异常次数是否大于等于最大并行数,条件成立则将异常信息offer到队列,此时调用流程结束,返回异常信息。

八、广播

BroadcastClusterInvoker 会逐个调用每个服务提供者,如果其中一台报错,在循环调用结束后,BroadcastClusterInvoker 会抛出异常。

public Result doInvoke(final Invocation invocation, 
        List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
    
    checkInvokers(invokers, invocation);
    RpcContext.getContext().setInvokers((List) invokers);
    RpcException exception = null;
    Result result = null;
    
    //循环调用服务
    for (Invoker<T> invoker : invokers) {
        try {
            result = invoker.invoke(invocation);
        } catch (RpcException e) {
            exception = e;
            logger.warn(e.getMessage(), e);
        } catch (Throwable e) {
            exception = new RpcException(e.getMessage(), e);
            logger.warn(e.getMessage(), e);
        }
    }
    //异常
    if (exception != null) {
        throw exception;
    }
    return result;
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,992评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,212评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,535评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,197评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,310评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,383评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,409评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,191评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,621评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,910评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,084评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,763评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,403评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,083评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,318评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,946评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,967评论 2 351

推荐阅读更多精彩内容