上周六,我负责的业务在凌晨00-04点的支付全部失败了。
结果一查,MD,晚上银行维护,下游支付系统没有挂维护公告,在此期间一直请求维护中的银行,当然所有返回就是失败了,有种欲哭无泪的感觉,锅让业务来背。。。
为了杜绝在此出现这种大面积批量的支付失败情况发生,保障系统的健壮性。我需要个在集中性异常的时候可以终止请求,当服务恢复,恢复请求。
我想了一些方式,最后,觉得熔断器比较适合干这种事情。
他山之石
- 防雪崩利器:熔断器 Hystrix 的原理与使用:https://segmentfault.com/a/1190000005988895
- 设计模式--状态模式(分布式中间件熔断器Java实现): https://www.cnblogs.com/jager/p/6253166.html
- 状态模式-State Pattern:https://quanke.gitbooks.io/design-pattern-java/%E7%8A%B6%E6%80%81%E6%A8%A1%E5%BC%8F-State%20Pattern.html
- 状态模式:http://www.runoob.com/design-pattern/state-pattern.html
状态模式
在状态模式中,我们创建表示各种状态的对象和一个行为随着状态对象改变而改变的 context 对象。
我们已一个开关为例
/**
* User: Rudy Tan
* Date: 2018/9/22
*/
public class Main{
public static void main(String[] args){
Context context = new Context();
context.state = new CloseState();
context.switchState();
context.switchState();
context.switchState();
context.switchState();
context.switchState();
}
}
/**
* 状态的抽象
*/
interface State{
void switchState(Context context);
}
/**
* 状态上下文
*/
class Context{
public State state;
void switchState(){
state.switchState(this);
}
}
/**
* 开状态
**/
class OpenState implements State{
public void switchState(Context context) {
System.out.println("当前状态:开");
context.state = new CloseState();
}
}
/**
* 关状态
**/
class CloseState implements State{
public void switchState(Context context) {
System.out.println("当前状态:关");
context.state = new OpenState();
}
}
在每一种状态下,context不必关心每一种状态下的行为。交给每一种状态自己处理。
熔断器基本原理
熔断器是当依赖的服务已经出现故障时,为了保证自身服务的正常运行不再访问依赖的服务,防止雪崩效应
熔断器本身就是一个状态机。
- 关闭状态:熔断器的初始化状态,该状态下允许请求通过。当失败超过阀值,转入打开状态,
- 打开状态:熔断状态,该状态下不允许请求通过,当进入该状态经过一段时间,进入半开状态。
- 半开状态:在半开状态期间,允许部分请求通过,在半开期间,观察失败状态是否超过阀值。如果没有超过进入关闭状态,如果超过了进入关闭状态。如此往复。
之前,查了一些资料,网上所有的资料几乎都是针对Hystrix的。这个只是针对分布式系统的接口请求,并不能运用于我们的系统中,因此这种情况下,根据原理自己实现了一个基本的分布式熔断器,数值与计数器存放在redis中,因为redis的操作客户端不一样,我就以本地熔断器为例,讲解熔断器实现。
希望我的文章能对于理解熔断器,以及需要熔断器的人有所帮助。
简单的本地熔断器实现
一个基本的本地熔断器。
对外暴露接口
熔断器对外暴露接口
/**
* 熔断器接口
*/
public interface CircuitBreaker {
/**
* 重置熔断器
*/
void reset();
/**
* 是否允许通过熔断器
*/
boolean canPassCheck();
/**
* 统计失败次数
*/
void countFailNum();
}
熔断器状态对外暴露接口
/**
* 熔断器状态
*/
public interface CBState {
/**
* 获取当前状态名称
*/
String getStateName();
/**
* 检查以及校验当前状态是否需要扭转
*/
void checkAndSwitchState(AbstractCircuitBreaker cb);
/**
* 是否允许通过熔断器
*/
boolean canPassCheck(AbstractCircuitBreaker cb);
/**
* 统计失败次数
*/
void countFailNum(AbstractCircuitBreaker cb);
}
三种状态
关闭状态实现:
package com.hirudy.cb.state;
import com.hirudy.cb.cb.AbstractCircuitBreaker;
import java.util.concurrent.atomic.AtomicInteger;
/**
* User: Rudy Tan
* Date: 2018/9/21
*
* 熔断器-关闭状态
*/
public class CloseCBState implements CBState {
/**
* 进入当前状态的初始化时间
*/
private long stateTime = System.currentTimeMillis();
/**
* 关闭状态,失败计数器,以及失败计数器初始化时间
*/
private AtomicInteger failNum = new AtomicInteger(0);
private long failNumClearTime = System.currentTimeMillis();
public String getStateName() {
// 获取当前状态名称
return this.getClass().getSimpleName();
}
public void checkAndSwitchState(AbstractCircuitBreaker cb) {
// 阀值判断,如果失败到达阀值,切换状态到打开状态
long maxFailNum = Long.valueOf(cb.thresholdFailRateForClose.split("/")[0]);
if (failNum.get() >= maxFailNum){
cb.setState(new OpenCBState());
}
}
public boolean canPassCheck(AbstractCircuitBreaker cb) {
// 关闭状态,请求都应该允许通过
return true;
}
public void countFailNum(AbstractCircuitBreaker cb) {
// 检查计数器是否过期了,否则重新计数
long period = Long.valueOf(cb.thresholdFailRateForClose.split("/")[1]) * 1000;
long now = System.currentTimeMillis();
if (failNumClearTime + period <= now){
failNum.set(0);
}
// 失败计数
failNum.incrementAndGet();
// 检查是否切换状态
checkAndSwitchState(cb);
}
}
打开状态
package com.hirudy.cb.state;
import com.hirudy.cb.cb.AbstractCircuitBreaker;
/**
* User: Rudy Tan
* Date: 2018/9/21
*
* 熔断器-打开状态
*/
public class OpenCBState implements CBState {
/**
* 进入当前状态的初始化时间
*/
private long stateTime = System.currentTimeMillis();
public String getStateName() {
// 获取当前状态名称
return this.getClass().getSimpleName();
}
public void checkAndSwitchState(AbstractCircuitBreaker cb) {
// 打开状态,检查等待时间是否已到,如果到了就切换到半开状态
long now = System.currentTimeMillis();
long idleTime = cb.thresholdIdleTimeForOpen * 1000L;
if (stateTime + idleTime <= now){
cb.setState(new HalfOpenCBState());
}
}
public boolean canPassCheck(AbstractCircuitBreaker cb) {
// 检测状态
checkAndSwitchState(cb);
return false;
}
public void countFailNum(AbstractCircuitBreaker cb) {
// nothing
}
}
半开状态
package com.hirudy.cb.state;
import com.hirudy.cb.cb.AbstractCircuitBreaker;
import java.util.concurrent.atomic.AtomicInteger;
/**
* User: Rudy Tan
* Date: 2018/9/21
*
* 熔断器-半开状态
*/
public class HalfOpenCBState implements CBState {
/**
* 进入当前状态的初始化时间
*/
private long stateTime = System.currentTimeMillis();
/**
* 半开状态,失败计数器
*/
private AtomicInteger failNum = new AtomicInteger(0);
/**
* 半开状态,允许通过的计数器
*/
private AtomicInteger passNum = new AtomicInteger(0);
public String getStateName() {
// 获取当前状态名称
return this.getClass().getSimpleName();
}
public void checkAndSwitchState(AbstractCircuitBreaker cb) {
// 判断半开时间是否结束
long idleTime = Long.valueOf(cb.thresholdPassRateForHalfOpen.split("/")[1]) * 1000L;
long now = System.currentTimeMillis();
if (stateTime + idleTime <= now){
// 如果半开状态已结束,失败次数是否超过了阀值
int maxFailNum = cb.thresholdFailNumForHalfOpen;
if (failNum.get() >= maxFailNum){
// 失败超过阀值,认为服务没有恢复,重新进入熔断打开状态
cb.setState(new OpenCBState());
}else {
// 没超过,认为服务恢复,进入熔断关闭状态
cb.setState(new CloseCBState());
}
}
}
public boolean canPassCheck(AbstractCircuitBreaker cb) {
// 检查是否切换状态
checkAndSwitchState(cb);
// 超过了阀值,不再放量
int maxPassNum = Integer.valueOf(cb.thresholdPassRateForHalfOpen.split("/")[0]);
if (passNum.get() > maxPassNum){
return false;
}
// 检测是否超过了阀值
if (passNum.incrementAndGet() <= maxPassNum){
return true;
}
return false;
}
public void countFailNum(AbstractCircuitBreaker cb) {
// 失败计数
failNum.incrementAndGet();
// 检查是否切换状态
checkAndSwitchState(cb);
}
}
熔断器
抽象熔断器
package com.hirudy.cb.cb;
import com.hirudy.cb.state.CBState;
import com.hirudy.cb.state.CloseCBState;
/**
* User: Rudy Tan
* Date: 2018/9/21
*
* 基础熔断器
*/
public abstract class AbstractCircuitBreaker implements CircuitBreaker {
/**
* 熔断器当前状态
*/
private volatile CBState state = new CloseCBState();
/**
* 在熔断器关闭的情况下,在多少秒内失败多少次进入,熔断打开状态(默认10分钟内,失败10次进入打开状态)
*/
public String thresholdFailRateForClose = "10/600";
/**
* 在熔断器打开的情况下,熔断多少秒进入半开状态,(默认熔断30分钟)
*/
public int thresholdIdleTimeForOpen = 1800;
/**
* 在熔断器半开的情况下, 在多少秒内放多少次请求,去试探(默认10分钟内,放10次请求)
*/
public String thresholdPassRateForHalfOpen = "10/600";
/**
* 在熔断器半开的情况下, 试探期间,如果有超过多少次失败的,重新进入熔断打开状态,否者进入熔断关闭状态。
*/
public int thresholdFailNumForHalfOpen = 1;
public CBState getState() {
return state;
}
public void setState(CBState state) {
// 当前状态不能切换为当前状态
CBState currentState = getState();
if (currentState.getStateName().equals(state.getStateName())){
return;
}
// 多线程环境加锁
synchronized (this){
// 二次判断
currentState = getState();
if (currentState.getStateName().equals(state.getStateName())){
return;
}
// 更新状态
this.state = state;
System.out.println("熔断器状态转移:" + currentState.getStateName() + "->" + state.getStateName());
}
}
}
本地熔断器
package com.hirudy.cb.cb;
import com.hirudy.cb.state.CloseCBState;
/**
* User: Rudy Tan
* Date: 2018/9/22
*
* 本地熔断器(把它当成了工厂了)
*/
public class LocalCircuitBreaker extends AbstractCircuitBreaker {
public LocalCircuitBreaker(String failRateForClose,
int idleTimeForOpen,
String passRateForHalfOpen, int failNumForHalfOpen){
this.thresholdFailRateForClose = failRateForClose;
this.thresholdIdleTimeForOpen = idleTimeForOpen;
this.thresholdPassRateForHalfOpen = passRateForHalfOpen;
this.thresholdFailNumForHalfOpen = failNumForHalfOpen;
}
public void reset() {
this.setState(new CloseCBState());
}
public boolean canPassCheck() {
return getState().canPassCheck(this);
}
public void countFailNum() {
getState().countFailNum(this);
}
}
测试例子
import com.hirudy.cb.cb.CircuitBreaker;
import com.hirudy.cb.cb.LocalCircuitBreaker;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
/**
* User: Rudy Tan
* Date: 2018/8/27
*/
public class App {
public static void main(String[] args) throws InterruptedException {
final int maxNum = 200;
final CountDownLatch countDownLatch = new CountDownLatch(maxNum);
final CircuitBreaker circuitBreaker = new LocalCircuitBreaker("5/20", 10, "5/10", 2);
for (int i=0; i < maxNum; i++){
new Thread(new Runnable() {
public void run() {
// 模拟随机请求
try {
Thread.sleep(new Random().nextInt(20) * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
try{
// 过熔断器
if (circuitBreaker.canPassCheck()){
// do something
System.out.println("正常业务逻辑操作");
// 模拟后期的服务恢复状态
if (countDownLatch.getCount() >= maxNum/2){
// 模拟随机失败
if (new Random().nextInt(2) == 1){
throw new Exception("mock error");
}
}
} else {
System.out.println("拦截业务逻辑操作");
}
}catch (Exception e){
System.out.println("业务执行失败了");
// 熔断器计数器
circuitBreaker.countFailNum();
}
countDownLatch.countDown();
}
}).start();
// 模拟随机请求
try {
Thread.sleep(new Random().nextInt(5) * 100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
countDownLatch.await();
System.out.println("end");
}
}