本节介绍 Dubbo 十层架构中的 Cluster 层。首先来看 Cluster 层包含的组件及其调用关系:Cluster、ClusterInvoker、Directory、Router、LoadBalance。
- 首先使用 Cluster(eg. FailoverCluster) 创建一个具体的 ClusterInvoker(eg. FailoverClusterInvoker);(
Cluster 是 ClusterInvoker 的抽象工厂
)- 之后调用 ClusterInvoker#list 从 Directory(eg. RegistryDirectory)中根据方法名获取具体的 Invoker(eg. DubboInvoker(filtered))列表,然后 Directory 会调用其包含的 List<Router> 进行过滤(此时可实现读写分离等);
- 之后调用 ClusterInvoker#select:根据 SPI 机制获取 LoadBalance 实例,之后使用 LoadBalance 实例从 2 中过滤出的 List<Invoker> 中选出最终的一个 Invoker;
- 最后调用 ClusterInvoker#doInvoke:调用 3 中选出的 Invoker#invoke(...),发起远程调用。
一、Cluster 接口
Dubbo 提供了八种不同姿势的 Cluster(分别实现了不同的集群容错策略):
- FailoverCluster(默认):当请求失败时(如果是抛出业务异常 bizException,则直接抛出异常 RpcException,不再重试),否则重试其他服务器。
- 可以通过
retries = "2"
来设置重试次数,重试次数为retries + 1
。- 使用场景:读操作或幂等的写操作。
- FailfastCluster:当请求失败时,直接抛出 RpcException。
- 使用场景:非幂等操作
- FailbackCluster:当请求失败时,会记录在失败列表中,并由一个定时线程池进行定时重试(每5s执行一次重试),如果重试成功,则从失败列表中删除,否则,记录error日志,下一个5s继续重试(如果一直失败,会一直重试)。
- 使用场景:异步或最终一致性,eg. 通知服务。
- FailsafeCluster:当请求失败时,会忽略异常。即不关心响应结果的成功与否。
- 使用场景:eg. 写日志。
- AvailableCluster:遍历所有节点,找到第一个可用节点,直接请求并返回响应;如果没有可用节点,则抛出异常。(
不会对请求做负载均衡
,对于DubboInvoker
来讲,可用 = Invoker 没有被 destroy && 至少有一个 NettyClient 处于已连接状态 && 不是只读)- ForkingCluster:同时并行调用多个相同的服务(使用异步线程池),此时主线程等待返回结果(默认等待1s,可通过 timeout 参数进行配置),只要其中一个返回,则立即返回结果;如果全失败了,则抛出异常;如果 timeout 内没返回响应结果,则抛出 timeout 异常
- 通过配置
forks = "2"
(默认为2) 来指定并行调用的服务数量- 通过配置
timeout="5000"
(默认为1000,即1s)来指定主线程等待并行异步线程返回结果的时间- 使用场景:对实时性要求极高。
- BroadcastCluster:广播调用所有可用的服务,
任意一个节点报错则报错
。- MergeableCluster:自动把对多个节点请求的结果进行合并。
Dubbo 还提供了一种 ClusterWrapper(MockClusterWrapper),其包含一个具体的 Cluster(eg. FailoverCluster),通过 AOP 的方式实现了 mock 操作。
@SPI(FailoverCluster.NAME) // 默认 FailoverCluster
public interface Cluster {
/**
* 创建 ClusterInvoker,该 ClusterInvoker 持有 Directory 实例。
* 实际上就是将 Directory 中的多个 Invoker 封装成了一个 Invoker(ClusterInvoker)。
*
* Cluster 是 ClusterInvoker 的抽象工厂。
*/
@Adaptive
<T> Invoker<T> join(Directory<T> directory) throws RpcException;
}
Cluster 是 ClusterInvoker 的抽象工厂
,用于创建 ClusterInvoker,该 ClusterInvoker 持有 Directory 实例,实际上就是将 Directory 中的多个 Invoker 封装成了一个 Invoker(ClusterInvoker)。
以 FailoverCluster 和 FailfastCluster 为例。
public class FailoverCluster implements Cluster {
public final static String NAME = "failover";
@Override
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
return new FailoverClusterInvoker<T>(directory);
}
}
public class FailfastCluster implements Cluster {
public final static String NAME = "failfast";
@Override
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
return new FailfastClusterInvoker<T>(directory);
}
}
二、ClusterInvoker
针对每一种 Cluster,Dubbo 都提供了对应的 ClusterInvoker(该 ClusterInvoker 由对应的 Cluster 来创建),其中 MockClusterInvoker 和 MergeableClusterInvoker 较为特殊,直接实现了 Invoker 接口;其他的 ClusterInvoker 都继承与模板基类 AbstractClusterInvoker,复写了其中的 doInvoker(...) 方法。
MockClusterInvoker 和 MergeableClusterInvoker 较为特殊,后续分析。
AbstractClusterInvoker 模板基类
public abstract class AbstractClusterInvoker<T> implements Invoker<T> {
protected final Directory<T> directory;
...
public AbstractClusterInvoker(Directory<T> directory) {
this(directory, directory.getUrl());
}
...
@Override
public Result invoke(Invocation invocation) throws RpcException {
...
LoadBalance loadbalance = null;
...
/**
* 1. 从 Directory(eg. RegistryDirectory)中根据方法名获取具体的 Invoker(eg. DubboInvoker(filtered))列表
* 然后 Directory 会调用其包含的 List<Router> 进行过滤
*/
List<Invoker<T>> invokers = list(invocation);
if (invokers != null && !invokers.isEmpty()) {
/**
* 2. 根据 SPI 机制获取 LoadBalance 实例
*/
loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl().getMethodParameter(RpcUtils.getMethodName(invocation), "loadbalance", "random"));
}
...
/**
* 3. 调用子类实现负载均衡选择和rpc调用
*/
return doInvoke(invocation, invokers, loadbalance);
}
/**
* 从 Directory(eg. RegistryDirectory)中根据方法名获取具体的 Invoker(eg. DubboInvoker(filtered))列表,
* 然后 Directory 会调用其包含的 List<Router> 进行过滤
*/
protected List<Invoker<T>> list(Invocation invocation) throws RpcException {
return directory.list(invocation);
}
/**
* 调用子类实现负载均衡选择和rpc调用
*/
protected abstract Result doInvoke(Invocation invocation, List<Invoker<T>> invokers,
LoadBalance loadbalance) throws RpcException;
/**
* 选择一个 Invoker。
* a) 首先, 使用 loadbalance 选择一个 Invoker。如果该 Invoker 之前已经被选择了(在 selected List 中)或者该 Invoker 不可用(eg. DubboInvoker,netty 没有处于连接状态),则进行重新选择操作(reselect);否则返回直接返回该 Invoker;
* b) 重新选择操作(reselect):
* 1. 使用 lb 从可用的且之前没有被选择的Invoker列表中选择一个,如果成功,直接返回,否则;
* 2. 使用 lb 从可用的且之前被选择的Invoker列表中选择一个,如果成功,直接返回
* c) 如果 reselect 还没有成功的话,则从 invokers(被 List<Router> 过滤过的)列表中选出 a) 中的选出的 Invoker 的下一个 Invoker
*/
protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
// 1. 处理 sticky 逻辑
...
// 2. 真正的进行选择
Invoker<T> invoker = doSelect(loadbalance, invocation, invokers, selected);
...
return invoker;
}
private Invoker<T> doSelect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
...
// 如果只有一个 Invoker,直接返回,否则进行选择操作
if (invokers.size() == 1) {
return invokers.get(0);
}
if (loadbalance == null) {
loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension("random");
}
// 1. 使用 loadbalance 进行选择
Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation);
// 2. 如果该 Invoker 之前已经被选择了(在 selected List 中)或者该 Invoker 不可用(eg. DubboInvoker,netty 没有处于连接状态),则进行重新选择操作(reselect(...))
if ((selected != null && selected.contains(invoker)) || (!invoker.isAvailable() && getUrl() != null && availablecheck)) {
// 2.1. 进行重新选择,如果选择成功,则直接返回;否则获取当前选出的 Invoker 的下一个 Invoker
Invoker<T> rinvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck);
if (rinvoker != null) {
invoker = rinvoker;
} else {
int index = invokers.indexOf(invoker);
invoker = index < invokers.size() - 1 ? invokers.get(index + 1) : invokers.get(0);
}
}
return invoker;
}
/**
* 重新选择操作(reselect):
* 1. 使用 lb 从可用的且之前没有被选择的Invoker列表中选择一个,如果成功,直接返回,否则;
* 2. 使用 lb 从可用的且之前被选择的Invoker列表中选择一个,如果成功,直接返回
*/
private Invoker<T> reselect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected, boolean availablecheck) throws RpcException {
...
}
...
}
选择部分的流程相对复杂,流程图如下:
1、FailoverClusterInvoker
三种使用姿势
<!-- 1. 所有的消费者使用 FailoverCluster,且重试次数为6次(retries+1) -->
<dubbo:consumer cluster="failover" retries="5"/>
<!-- 2. 指定的消费者的所有方法使用 FailoverCluster,且重试次数为6次(retries+1) -->
<dubbo:reference ... cluster="failover" retries="5"/>
<!-- 3. 指定的消费者使用 FailoverCluster,且指定方法 sayHello 的重试次数为5次(retries+1),其余方法使用默认值 -->
<dubbo:reference ... cluster="failover">
<dubbo:method name="sayHello" retries="5"/>
</dubbo:reference>
- retries 默认为2,即重试3次
public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> {
...
RpcException le = null; // last exception.
@Override
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
List<Invoker<T>> copyinvokers = invokers;
// 默认重试3次(retries+1)
int len = getUrl().getMethodParameter(invocation.getMethodName(), "retries", 2) + 1;
...
// 已选择列表
List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size());
for (int i = 0; i < len; i++) {
// 每一次重新选择都要重置 copyinvokers:重新从 Directory 根据方法名进行 Invoker(filtered) 的获取。
// 因为有可能此时提供者列表已经发生了变化,提供者列表发生变化,Directory 也会更新其缓存的 Invoker(filtered) 列表。
if (i > 0) {
copyinvokers = list(invocation);
}
// 1. 调用父类 AbstractClusterInvoker 的选择机制进行选择
Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
// 2. 将选择出来的 Invoker 添加到已选择列表中,供下一次选择时使用
invoked.add(invoker);
RpcContext.getContext().setInvokers((List) invoked);
try {
// 3. 发起 rpc 调用,如果抛出异常(业务异常,直接抛出,不再重试),否则,catch 住,进行下一个 Invoker 的重试
Result result = invoker.invoke(invocation);
return result;
} catch (RpcException e) {
if (e.isBiz()) { // biz exception.
throw e;
}
le = e;
} catch (Throwable e) {
le = new RpcException(e.getMessage(), e);
}
}
throw new RpcException(...le.getCause());
}
}
2、FailbackClusterInvoker
public class FailbackClusterInvoker<T> extends AbstractClusterInvoker<T> {
/**
* 重试线程池
*/
private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2, new NamedInternalThreadFactory("failback-cluster-timer", true));
/**
* 调用失败列表
*/
private final ConcurrentMap<Invocation, AbstractClusterInvoker<?>> failed = new ConcurrentHashMap<Invocation, AbstractClusterInvoker<?>>();
private volatile ScheduledFuture<?> retryFuture;
...
private void addFailed(Invocation invocation, AbstractClusterInvoker<?> router) {
// 1. 双重检测创建并启动定时重试任务
if (retryFuture == null) {
synchronized (this) {
if (retryFuture == null) {
retryFuture = scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
try {
retryFailed();
} catch (Throwable t) { // Defensive fault tolerance
logger.error("Unexpected error occur at collect statistic", t);
}
}
}, 5 * 1000, 5 * 1000, TimeUnit.MILLISECONDS);
}
}
}
// 2. 将失败的调用加入到失败列表中
failed.put(invocation, router);
}
void retryFailed() {
if (failed.size() == 0) {
return;
}
for (Map.Entry<Invocation, AbstractClusterInvoker<?>> entry : new HashMap<Invocation, AbstractClusterInvoker<?>>(failed).entrySet()) {
Invocation invocation = entry.getKey();
Invoker<?> invoker = entry.getValue();
try {
// 对于失败的 Invoker,进行重试:如果重试成功,则从失败列表中删除,否则,记录error日志,下一个5s继续重试(如果一直失败,会一直重试)
invoker.invoke(invocation);
failed.remove(invocation);
} catch (Throwable e) {
logger.error(...);
}
}
}
@Override
protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
try {
checkInvokers(invokers, invocation);
// 1. 调用父类 AbstractClusterInvoker 的选择机制进行选择
Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
// 2. 发起 rpc 调用,如果失败,添加到失败列表,后续进行定时重试。返回空 RpcResult
return invoker.invoke(invocation);
} catch (Throwable e) {
logger.error("Failback to invoke method " + invocation.getMethodName() + ", wait for retry in background. Ignored exception: "
+ e.getMessage() + ", ", e);
// 如果失败,添加到失败列表,后续进行定时重试
addFailed(invocation, this);
return new RpcResult(); // ignore
}
}
}
3、FailfastClusterInvoker
public class FailfastClusterInvoker<T> extends AbstractClusterInvoker<T> {
...
@Override
public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
// 1. 调用父类 AbstractClusterInvoker 的选择机制进行选择
Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
try {
// 2. 发起 rpc 调用,如果失败,直接抛出异常
return invoker.invoke(invocation);
} catch (Throwable e) {
...
throw new RpcException(...);
}
}
}
4、FailsafeClusterInvoker
public class FailsafeClusterInvoker<T> extends AbstractClusterInvoker<T> {
...
@Override
public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
try {
// 1. 调用父类 AbstractClusterInvoker 的选择机制进行选择
Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
// 2. 发起 rpc 调用,如果失败,直接忽略,返回空的 RpcResult
return invoker.invoke(invocation);
} catch (Throwable e) {
return new RpcResult(); // ignore
}
}
}
5、AvailableClusterInvoker
public class AvailableClusterInvoker<T> extends AbstractClusterInvoker<T> {
...
@Override
public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
// 找出第一个可用的 Invoker,直接发起 rpc 调用(没有负载均衡)
for (Invoker<T> invoker : invokers) {
if (invoker.isAvailable()) {
return invoker.invoke(invocation);
}
}
throw new RpcException("No provider available in " + invokers);
}
}
对于 DubboInvoker,可用的逻辑如下:
@Override
public boolean isAvailable() {
// AbstractInvoker:Invoker 没有被 destroy,则可用
if (!super.isAvailable()) {
return false;
}
// 只要有一个 NettyClient 处于已连接状态 && 不是只读,则可用
for (ExchangeClient client : clients) {
if (client.isConnected() && !client.hasAttribute(Constants.CHANNEL_ATTRIBUTE_READONLY_KEY)) {
return true;
}
}
return false;
}
6、ForkingClusterInvoker
两种使用姿势
<!-- 1. 所有的消费者使用ForkingCluster,且同时并发调用3个Invoker(forks),并且等待结果返回时间为5s -->
<dubbo:consumer cluster="forking" forks="3" timeout="5000"/>
<!-- 2. 指定的消费者使用ForkingCluster,且同时并发调用3个Invoker(forks),并且等待结果返回时间为5s -->
<dubbo:reference ... cluster="forking" forks="3" timeout="5000"/>
public class ForkingClusterInvoker<T> extends AbstractClusterInvoker<T> {
/** 异步并行调用线程池 */
private final ExecutorService executor = Executors.newCachedThreadPool(new NamedInternalThreadFactory("forking-cluster-timer", true));
...
@Override
public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
List<Invoker<T>> selected; // 选择多少个Invoker进行调用
int forks = getUrl().getParameter("forks", 2);
int timeout = getUrl().getParameter("timeout", 1000);
if (forks <= 0 || forks >= invokers.size()) {
selected = invokers; // 选择全部Invoker进行调用
} else {
selected = new ArrayList<Invoker<T>>();
// 1. 多次调用父类 AbstractClusterInvoker 的选择机制进行 Invoker 的选择,并添加到 List<Invoker<T>> selected 中
for (int i = 0; i < forks; i++) {
Invoker<T> invoker = select(loadbalance, invocation, invokers, selected);
if (!selected.contains(invoker)) { // 去重操作
selected.add(invoker);
}
}
}
...
final AtomicInteger count = new AtomicInteger();
final BlockingQueue<Object> ref = new LinkedBlockingQueue<Object>();
for (final Invoker<T> invoker : selected) {
executor.execute(new Runnable() {
public void run() {
try {
// 2. 使用线程池并行发起多次调用,并将调用结果存储到ref阻塞队列中;如果全部调用都失败了,则将失败异常也存储到ref中
Result result = invoker.invoke(invocation);
ref.offer(result);
} catch (Throwable e) {
int value = count.incrementAndGet();
// 全部调用都失败了
if (value >= selected.size()) {
ref.offer(e);
}
}
}
});
}
// 3. 主线程从阻塞队列弹出第一个对象(阻塞等待1s,即1s内如果调用无法完成,则抛出 timeout 异常;如果1s内有任一个调用返回,则直接处理),如果得到的是异常,则直接抛出,否则,返回调用结果
Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS);
if (ret instanceof Throwable) {
throw new RpcException(...ret...);
}
return (Result) ret;
}
}
7、BroadcastClusterInvoker
public class BroadcastClusterInvoker<T> extends AbstractClusterInvoker<T> {
...
@Override
public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
RpcException exception = null;
Result result = null;
// 对每一个节点进行调用,只要有一个失败,直接抛出异常
for (Invoker<T> invoker : invokers) {
try {
result = invoker.invoke(invocation);
} catch (Throwable e) {
exception = new RpcException(e.getMessage(), e);
}
}
if (exception != null) {
throw exception;
}
return result;
}
}
8、MergeableCluster
在合并器 merger 部分进行分析。
9、MockClusterInvoker
在服务降级 mock 部分进行分析。