部署服务
启动两个provider,一个consumer。其中一个provider修改配置文件端口为20881打包成jar并运行,idea中中运行另一个provider(20880)和cunsumer。
- 客户端代码
public class DemoAction {
private DemoService demoService;
public void setDemoService(DemoService demoService) {
this.demoService = demoService;
}
public void start() throws Exception {
for (int i = 0; i < Integer.MAX_VALUE; i ++) {
try {
String hello = demoService.sayHello("world" + i);
System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "] " + hello);
} catch (Exception e) {
e.printStackTrace();
}
Thread.sleep(2000);
}
}
}
- 服务端代码
public class DemoServiceImpl implements DemoService {
public String sayHello(String name) {
System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "] Hello " + name + ", request from consumer: " + RpcContext.getContext().getRemoteAddress());
return "Hello " + name + ", response form provider: " + RpcContext.getContext().getLocalAddress();
}
}
打包provider
修改provider的配置
在dubbo-master 根目录执行
E:\dubbo\dubbo-master>mvn clean compile package install -Dmaven.test.skip=true
打包完成后dubbo-demo-provider\target
目录下有dubbo-demo-provider-2.5.4-SNAPSHOT-assembly.tar.gz
解压运行其dubbo-demo-provider-2.5.4-SNAPSHOT\bin\start.bat
启动服务
部署dubb-admin
dubbo-admin是dubbo进行服务治理的一个web控制台,可以动态变更zk上的配置,来控制provider、consumer的行为。部署只需要修改一下/WEB-INF/dubbo.properties
下的配置文件,再打包成dubbo-admin.war直接部署就可以了。
dubbo.registry.address=zookeeper://192.168.99.100:2181
dubbo.admin.root.password=root
dubbo.admin.guest.password=guest
目录服务、路由、负载均衡、集群容错代码执行流程
代码的入口是MockClusterInvoker.invoke
,从这里开始会依次进行容错或屏蔽、路由策略、负载均衡等来处理从RegistryDirectory中invoker列表,最终找到一个合适的invoker
demoService.sayHello("world" + i)
-->InvokerInvocationHandler.invoke
-->invoker.invoke
-->RpcInvocation//所有请求参数都会转换为RpcInvocation
-->MockClusterInvoker.invoke //1.进入集群
-->invoker.invoke(invocation)
-->AbstractClusterInvoker.invoke
-->list(invocation)
-->directory.list//2.进入目录查找 从this.methodInvokerMap里面查找一个Invoker
-->AbstractDirectory.list
-->doList(invocation)
-->RegistryDirectory.doList// 从this.methodInvokerMap里面查找一个Invoker
-->router.route //3.进入路由
-->MockInvokersSelector.route
-->getNormalInvokers
-->ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension("roundrobin")
-->doInvoke
-->FailoverClusterInvoker.doInvoke
-->select//4.进入负载均衡
-->AbstractClusterInvoker.select
-->doselect
-->loadbalance.select
-->AbstractLoadBalance.select
-->doSelect
-->RoundRobinLoadBalance.doSelect
-->invokers.get(currentSequence % length)//取模轮循
-->Result result = invoker.invoke(invocation)
服务调用流程
代码执行流程
- 动态代理:当调用provider的demoService.sayHello服务的时候,这个动态代理类的InvocationHandler.invoke方法
public class DemoAction {
private DemoService demoService;
public void setDemoService(DemoService demoService) {
this.demoService = demoService;
}
public void start() throws Exception {
for (int i = 0; i < Integer.MAX_VALUE; i ++) {
try {
String hello = demoService.sayHello("world" + i);
System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "] " + hello);
} catch (Exception e) {
e.printStackTrace();
}
Thread.sleep(2000);
}
}
}
- 进入Cluster处理
将所有参数都封装程RpcInvocation,调用clusterinoker的invoke,第一层包装的是MockClusterInvoker
#com.alibaba.dubbo.rpc.proxy.InvokerInvocationHandler#invoke
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
......
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
//调用远端服务 所有请求参数都封装成RpcInvocation
return invoker.invoke(new RpcInvocation(method, args)).recreate();
}
#com.alibaba.dubbo.rpc.cluster.support.wrapper.MockClusterInvoker#invoke
public Result invoke(Invocation invocation) throws RpcException {
Result result = null;
String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim();
if (value.length() == 0 || value.equalsIgnoreCase("false")){
//no mock 不需要mock
result = this.invoker.invoke(invocation);
} else if (value.startsWith("force")) {
//force:direct mock
result = doMockInvoke(invocation, null);
} else {
//fail-mock
try {
result = this.invoker.invoke(invocation);
}catch (RpcException e) {
result = doMockInvoke(invocation, e);
}
}
return result;
}
- 进入目录服务
目录服务获取当前方法的invoker列表,根据路由规则筛选invoker,然后进行负载均衡策略再次筛选出一个invoker,最后执行该invoker的调用
#com.alibaba.dubbo.rpc.cluster.support.AbstractClusterInvoker#invoke
public Result invoke(final Invocation invocation) throws RpcException {
checkWhetherDestroyed();
LoadBalance loadbalance;
//2.进入目录查找 从this.methodInvokerMap里面查找一个Invoker 然后进入路由
List<Invoker<T>> invokers = list(invocation);
if (invokers != null && invokers.size() > 0) {
//负载均衡器roundrobin
loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
.getMethodParameter(invocation.getMethodName(),Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
} else {
loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);
}
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
//进入集群容错负载均衡 FailoverClusterInvoker
return doInvoke(invocation, invokers, loadbalance);
}
invoker列表
Directory目录服务
- StaticDirectory
静态目录服务,他的Invoker是固定的。 - RegistryDirectory
注册目录服务,他的Invoker集合数据来源于zk注册中心的,他实现了NotifyListener接口,并且实现回调notify(List<URL> urls),整个过程有一个重要的map变量,methodInvokerMap(它是数据的来源;同时也是notify的重要操作对象,重点是写操作。key为方法名,value为invoker对象)
RegistryDirectory主要用于维护注册中心的动态配置,zk服务变更的时候会触发其notify方法,然后调用refreshInvoker重新刷新methodInvokerMap,实现了配置的动态变更,通过doList根据invocation中的方法名获取最新的invoker。而且我们在管理后台的配置基本上都是直接操作zk的configurators,变更configurators后consumer会收到通知,
会合并configurators的配置、consumer自己的配置、provider的配置来调整methodInvokerMap中invoker的url,进行调用的时候会根据url参数动态选择路由器、负载、集群容错等
#com.alibaba.dubbo.registry.integration.RegistryDirectory#doList
public List<Invoker<T>> doList(Invocation invocation) {
if (forbidden) {
throw new RpcException(RpcException.FORBIDDEN_EXCEPTION, "Forbid consumer " + NetUtils.getLocalHost() + " access service " + getInterface().getName() + " from registry " + getUrl().getAddress() + " use dubbo version " + Version.getVersion() + ", Please check registry access list (whitelist/blacklist).");
}
List<Invoker<T>> invokers = null;
Map<String, List<Invoker<T>>> localMethodInvokerMap = this.methodInvokerMap; // local reference
if (localMethodInvokerMap != null && localMethodInvokerMap.size() > 0) {
//sayHello
String methodName = RpcUtils.getMethodName(invocation);
Object[] args = RpcUtils.getArguments(invocation);
if(args != null && args.length > 0 && args[0] != null
&& (args[0] instanceof String || args[0].getClass().isEnum())) {
invokers = localMethodInvokerMap.get(methodName + "." + args[0]); // 可根据第一个参数枚举路由
}
if(invokers == null) {
invokers = localMethodInvokerMap.get(methodName);
}
if(invokers == null) {
invokers = localMethodInvokerMap.get(Constants.ANY_VALUE);
}
if(invokers == null) {
Iterator<List<Invoker<T>>> iterator = localMethodInvokerMap.values().iterator();
if (iterator.hasNext()) {
invokers = iterator.next();
}
}
}
return invokers == null ? new ArrayList<Invoker<T>>(0) : invokers;
}
public synchronized void notify(List<URL> urls) {
List<URL> invokerUrls = new ArrayList<URL>();
List<URL> routerUrls = new ArrayList<URL>();
List<URL> configuratorUrls = new ArrayList<URL>();
for (URL url : urls) {
String protocol = url.getProtocol();
String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
if (Constants.ROUTERS_CATEGORY.equals(category)
|| Constants.ROUTE_PROTOCOL.equals(protocol)) {
routerUrls.add(url);
} else if (Constants.CONFIGURATORS_CATEGORY.equals(category)
|| Constants.OVERRIDE_PROTOCOL.equals(protocol)) {
configuratorUrls.add(url);
} else if (Constants.PROVIDERS_CATEGORY.equals(category)) {
invokerUrls.add(url);
} else {
logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost());
}
}
......
// providers
//刷新invoker
refreshInvoker(invokerUrls);
}
private void refreshInvoker(List<URL> invokerUrls){
if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null
&& Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
this.forbidden = true; // 禁止访问
this.methodInvokerMap = null; // 置空列表
destroyAllInvokers(); // 关闭所有Invoker
} else {Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls) ;// 将URL列表转成Invoker列表
Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // 换方法名映射Invoker列表
// state change
//如果计算错误,则不进行处理.
if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0 ){
logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :"+invokerUrls.size() + ", invoker.size :0. urls :"+invokerUrls.toString()));
return ;
}
this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
//刷新invoker
this.urlInvokerMap = newUrlInvokerMap;
}
}
路由
负责从多个invoker中按路由规则选出子集,如应用隔离、读写分离、或灰度发布等,路由规则可以从dubbo-admin后台动态配置
#com.alibaba.dubbo.rpc.cluster.directory.AbstractDirectory#list
public List<Invoker<T>> list(Invocation invocation) throws RpcException {
if (destroyed){
throw new RpcException("Directory already destroyed .url: "+ getUrl());
}
//查找invoker列表
List<Invoker<T>> invokers = doList(invocation);
//3.进入路由
List<Router> localRouters = this.routers; // local reference
if (localRouters != null && localRouters.size() > 0) {
for (Router router: localRouters){
try {
if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME_KEY, true)) {
invokers = router.route(invokers, getConsumerUrl(), invocation);
}
} catch (Throwable t) {
logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);
}
}
}
return invokers;
}
使用路由规则灰度发布
使用dubbo-admin控制台可以动态新增路由规则,进行灰度发布。启动dubbo-admin.war(修改配置文件/WEB-INF/dubbo.properties中zk地址,用户名root密码root)
- 发布流程
provider 169.254.23.23:20880 169.254.23.23:20881
- 发布20880,切断20881访问流量,然后进行服务的发布。
- 20880发布成功后,恢复 20880的流量,
- 切断20880,继续发布20881
-
新建路由规则
-
启动路由规则消费者不会再访问20880,这时可以发布20881,等发布完成之后在切换流量,完成另一台的发布
路由规则有哪些实现类?
ConditionRouter:条件路由,后台管理的路由配置都是条件路由。
ScriptRouter:脚本路由
MockInvokersSelector:默认使用
启动路由规则,它触发了那些动作?
- 什么时候加入ConditionRouter?
默认情况只有MockInvokersSelector这个路由,新增路由规则的时候zk的配置会变更,然后触发consumer的RegistryDirectory.notify方法取到配置后重新维护路由规则,添加路由
#com.alibaba.dubbo.registry.integration.RegistryDirectory#notify
public synchronized void notify(List<URL> urls) {
.....
if (routerUrls != null && routerUrls.size() >0 ){
List<Router> routers = toRouters(routerUrls);
if(routers != null){ // null - do nothing
setRouters(routers);
}
}
......
}
#com.alibaba.dubbo.rpc.cluster.directory.AbstractDirectory#setRouters
protected void setRouters(List<Router> routers){
.....
// append mock invoker selector
routers.add(new MockInvokersSelector());
Collections.sort(routers);
this.routers = routers;
}
- ConditionRouter是怎么过滤的?
在添加完路由规则后会重新刷新invoker列表,这时候根据过滤规则过滤服务提供者,然后更新newMethodInvokerMap
private void refreshInvoker(List<URL> invokerUrls){
.....
Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // 换方法名映射Invoker列表
......
}
private Map<String, List<Invoker<T>>> toMethodInvokers(Map<String, Invoker<T>> invokersMap) {
......
newMethodInvokerMap.put(Constants.ANY_VALUE, invokersList);
if (serviceMethods != null && serviceMethods.length > 0) {
for (String method : serviceMethods) {
List<Invoker<T>> methodInvokers = newMethodInvokerMap.get(method);
if (methodInvokers == null || methodInvokers.size() == 0) {
methodInvokers = invokersList;
}
//路由规则路过滤
newMethodInvokerMap.put(method, route(methodInvokers, method));
}
}
}
#com.alibaba.dubbo.rpc.cluster.router.condition.ConditionRouter#route
public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation)
if (! matchWhen(url)) {
return invokers;
}
for (Invoker<T> invoker : invokers) {
if (matchThen(invoker.getUrl(), url)) {
result.add(invoker);
}
}
......
}
因为过滤规则有两个条件部分provider,consumer,所以除了在配置变更的时候统一过滤服务提供者,还会在consumer每次发起调用的时候根据consumer的条件动态过滤invoker
#com.alibaba.dubbo.rpc.cluster.directory.AbstractDirectory#list
public List<Invoker<T>> list(Invocation invocation) throws RpcException {
......
for (Router router: localRouters){
if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME_KEY, true)) {
invokers = router.route(invokers, getConsumerUrl(), invocation);
}
}
......
}
负载均衡
经过路由规则过滤后的invokers,需要通过负载均衡算法选择其中一个invoker进行RPC调用
#com.alibaba.dubbo.rpc.cluster.support.AbstractClusterInvoker#doselect
private Invoker<T> doselect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
......
Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation);
......
return invoker;
}
负载均衡算法
RandomLoadBalance
dubbo默认配置。随机,按权重设置随机概率。在一个截面上碰撞的概率高,但调用量越大分布越均匀,而且按概率使用权重后也比较均匀,有利于动态调整提供者权重。RoundRobinLoadBalance
轮循,按公约后的权重设置轮询比率。存在慢的提供者累计请求问题,比如:第二台机器很慢,但没挂,当请求调到第二台时就卡在那,久而久之,所有的请求都卡在调到第二台上。LeastActivieLoadBalance
最少活跃调用数,相同活跃数的随机,活跃数指调用前后计数差。使慢的提供者收到更少的请求,因为越慢的提供者的调用前后计数差会越大。ConsistentHashLoadBalance
一致性Hash,相同请求参数的请求总是发到同一提供者。当某一台提供者挂时,原本发往该提供者的请求,基于虚拟节点,平摊到其它提供者,不会引起剧烈变动。
动态修改负载均衡策略
可以针对服务提供者的某个方法或者整个服务提供者修改负载均衡策略
admin向zk修改之后会通知consumer这个动态配置configurators节点已经变更了,然后consumer会合并configurators的配置、consumer自己的配置、provider的配置调整invoker的url,进行调用的时候会根据url参数动态选择对应的负载均衡器
private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
......
URL url = mergeUrl(providerUrl);
......}
/**
* 合并url参数 顺序为override > -D >Consumer > Provider
* @param providerUrl
* @param overrides
* @return
*/
private URL mergeUrl(URL providerUrl){
providerUrl = ClusterUtils.mergeUrl(providerUrl, queryMap); // 合并消费端参数
List<Configurator> localConfigurators = this.configurators; // local reference
if (localConfigurators != null && localConfigurators.size() > 0) {
for (Configurator configurator : localConfigurators) {
providerUrl = configurator.configure(providerUrl);
}
}
......
}
服务降级
什么是服务开关
先讲一下开关的由来,例如淘宝在11月11日做促销活动,在交易下单环节,可能需要调用A、B、C三个接口来完成,但是其实A和B是必须的, C只是附加的功能(例如在下单的时候做一下推荐,或push消息),可有可无,在平时系统没有压力,容量充足的情况下,调用下没问题,但是在类似店庆之类的大促环节, 系统已经满负荷了,这时候其实完全可以不去调用C接口,怎么实现这个呢? 改代码?
什么是服务降级
服务降级,当服务器压力剧增的情况下,根据当前业务情况及流量对一些服务和页面有策略的降级(执行固定的逻辑),以此释放服务器资源以保证核心任务的正常运行。
dubbo如何实现服务降级?
容错
当系统出现非业务异常(比如并发数太高导致超时,网络异常等)时,不对该接口进行处理。(不可知)
mock=fail:return null屏蔽
在大促,促销活动的可预知情况下,例如双11活动。采用直接屏蔽接口访问。(可知)
mock=force:return "hhh"
如何使用
当consumer调用provider不通的时候,consumer直接超时报错了
-
新增容错动态配置后,超时异常直接返回为null
-
设置屏蔽,无论是否超时直接返回null,不会执行服务端的代码
屏蔽降级原理
- 降级
在调用provider发生异常后,会执行MockClusterInvoker.doMockInvoke
方法,该方法会构造一个MockInvoker
并调用器invoke方法,在invoke中解析配置的容错值并封装成RpcResult返回。 - 屏蔽
直接执行MockClusterInvoker.doMockInvoke
,不会去调用provider
public Result invoke(Invocation invocation) throws RpcException {
Result result = null;
//判断是否有容错或者屏蔽
String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim();
if (value.length() == 0 || value.equalsIgnoreCase("false")){
//no mock 1.进入集群 FailoverClusterInvoker
result = this.invoker.invoke(invocation);
} else if (value.startsWith("force")) {
//屏蔽
if (logger.isWarnEnabled()) {
logger.info("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + directory.getUrl());
}
//force:direct mock
result = doMockInvoke(invocation, null);
} else {
//容错mock
//fail-mock
try {
result = this.invoker.invoke(invocation);
}catch (RpcException e) {
if (e.isBiz()) {
throw e;
} else {
if (logger.isWarnEnabled()) {
logger.info("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + directory.getUrl(), e);
}
//集群容错获取mock值 mock=fail:return null
result = doMockInvoke(invocation, e);
}
}
}
return result;
}
private Result doMockInvoke(Invocation invocation,RpcException e){
......
minvoker = (Invoker<T>) new MockInvoker(directory.getUrl());
result = minvoker.invoke(invocation);
......
}
#com.alibaba.dubbo.rpc.support.MockInvoker
public Result invoke(Invocation invocation) throws RpcException {
......
Type[] returnTypes = RpcUtils.getReturnTypes(invocation);
Object value = parseMockValue(mock, returnTypes);
return new RpcResult(value);
}
集群容错
Failover Cluster
失败自动切换,当出现失败,重试其它服务器 。通常用于读操作,但重试会带来更长延迟。可通过retries="2"
来设置重试次数(不含第一次)。Failfast Cluster
快速失败,只发起一次调用,失败立即报错。通常用于非幂等性的写操作,比如新增记录。Failsafe Cluster
失败安全,出现异常时,直接忽略。通常用于写入审计日志等操作。Failback Cluster
失败自动恢复,后台记录失败请求,定时重发。通常用于消息通知操作。Forking Cluster
并行调用多个服务器,只要一个成功即返回。通常用于实时性要求较高的读操作,但需要浪费更多服务资源。可通过forks="2"
来设置最大并行数。Broadcast Cluster
广播调用所有提供者,逐个调用,任意一台报错则报错 [2]。通常用于通知所有提供者更新缓存或日志等本地资源信息
总结
- Invoker
它是一个可执行的对象,能够根据方法的名称、参数得到相应的执行结果。
它里面有一个很重要的方法 Result invoke(Invocation invocation),
Invocation是包含了需要执行的方法和参数等重要信息,目前它只有2个实现类RpcInvocation MockInvocation.它有3种类型的Invoker- 本地执行类的Invoker
server端:要执行 demoService.sayHello,就通过InjvmExporter来进行反射执行demoService.sayHello就可以了。 - 远程通信类的Invoker
client端:要执行 demoService.sayHello,它封装了DubboInvoker进行远程通信,发送要执行的接口给server端。
server端:采用了AbstractProxyInvoker执行了DemoServiceImpl.sayHello,然后将执行结果返回发送给client. - 多个远程通信执行类的Invoker聚合成集群版的Invoker
client端:要执行 demoService.sayHello,就要通过AbstractClusterInvoker来进行负载均衡,DubboInvoker进行远程通信,发送要执行的接口给server端。
server端:采用了AbstractProxyInvoker执行了DemoServiceImpl.sayHello,然后将执行结果返回发送给client.
- 本地执行类的Invoker