如上图所示,SOFABolt 中与心跳机制相关的为绿色的类。
类组成
- 心跳命令
- HeartbeatCommand:心跳请求命令
- HeartbeatAckCommand:心跳响应命令
- RpcHeartBeatProcessor:心跳命令处理器,处理 HeartbeatCommand 和 HeartbeatAckCommand 两种心跳请求
- IdleStateHandler:Netty 提供的空闲检测处理器,在指定的时间内没有读或写请求,那么发布一个 IdleStateEvent 事件
- ServerIdleHandler:服务端 IdleStateEvent 事件处理器,进行服务端空闲逻辑处理(SOFABolt 中服务端直接关闭连接)
- HeartbeatHandler:客户端 IdleStateEvent 事件处理器,进行客户端空闲逻辑处理(SOFABolt 中客户端使用 RpcHeartbeatTrigger 进行空闲处理)
- RpcHeartbeatTrigger:客户端真正的空闲处理器,也叫心跳触发器
ConnectionHeartbeatManager:用于开启或关闭 Connection 的心跳逻辑
注意:CommandHandler 和 HeartbeatTrigger 实例都属于 Protocol 实现类的一个属性,是在创建 Protocol 实现类的时候创建的。
客户端基本流程
- 在 15s 内没有读或者写事件,IdleStateHandler 就会发布一个 IdleStateEvent 事件
- HeartbeatHandler 进行该事件的处理:
- 首先从当前 Channel 的附属属性中获取相关的 ProtocolCode
- 再从 ProtocolManager 中获取 ProtocolCode 的 Protocol 实现类
- 再从 Protocol 实现类获取 HeartbeatTrigger 实例,最终调用该实例进行 IdleStateEvent 的处理
- HeartbeatTrigger 处理 IdleStateEvent 事件
- 首先从当前 Channel 的附属属性中获取已经发送心跳但是没有接收到响应的次数 heartbeatTimes,如果 heartbeatTimes 已经大于 3 次,则直接关闭连接,否则
- 从当前 Channel 的附属属性中获取心跳开关,如果关闭了心跳,则直接返回,表示对 IdleStateEvent 不做任何处理;如果开启了心跳
- 创建心跳请求命令 HeartbeatCommand + 创建本次请求的 InvokeFuture 对象 + 将 InvokeFuture 对象加入到当前的 Connection 中
InvokeFuture 中会设置心跳响应回调函数:当接收到了正常的心跳响应后,将 heartbeatTimes 置为 0;否则,将该连接的heartbeatTimes+1
- 使用 Netty 发送 HeartbeatCommand 到服务端
- 设置超时任务(1s内没有接收到心跳响应,则直接返回超时失败响应,实现快速失败)
服务端基本流程
- 在 90s 内没有读或者写事件,IdleStateHandler 就会发布一个 IdleStateEvent 事件(如果客户端还正常,那么在 90s 内,会发送至少 6 次心跳,那么服务端将不会触发 IdleStateEvent 事件)
- ServerIdleHandler 进行该事件的处理:直接关闭连接
心跳处理流程
- 心跳请求的处理:服务端接收到 HeartbeatCommand 后,构造心跳响应 HeartbeatAckCommand,之后使用 Netty 返回 HeartbeatAckCommand 给客户端
- 心跳响应的处理:客户端接收到 HeartbeatAckCommand 后,设置心跳响应消息到 InvokeFuture + 取消超时任务 + 执行 InvokeFuture 中的回调方法
注意
- 只有客户端会主动发送心跳请求;但是双端都会开启空闲检测
- 心跳除了上述应用端提供的这种之外,还有 tcp 提供的 keepAlive
一、客户端启动设置
============================== AbstractConnectionFactory ==============================
// 客户端心跳处理器
private final ChannelHandler heartbeatHandler = new HeartbeatHandler();
public void init(final ConnectionEventHandler connectionEventHandler) {
...
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel channel) {
...
// 心跳检测开关 -Dbolt.tcp.heartbeat.switch=true
boolean idleSwitch = ConfigManager.tcp_idle_switch();
if (idleSwitch) {
// 读或者写空闲,默认为 15s -Dbolt.tcp.heartbeat.interval=15000
pipeline.addLast("idleStateHandler", new IdleStateHandler(ConfigManager.tcp_idle(), ConfigManager.tcp_idle(), 0, TimeUnit.MILLISECONDS));
// 心跳处理器
pipeline.addLast("heartbeatHandler", heartbeatHandler);
}
...
}
});
}
============================== HeartbeatHandler ==============================
@Sharable
public class HeartbeatHandler extends ChannelDuplexHandler {
@Override
public void userEventTriggered(final ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
ProtocolCode protocolCode = ctx.channel().attr(Connection.PROTOCOL).get();
Protocol protocol = ProtocolManager.getProtocol(protocolCode);
// 调用 HeartbeatTrigger 做真正的心跳处理业务
protocol.getHeartbeatTrigger().heartbeatTriggered(ctx);
} else {
super.userEventTriggered(ctx, evt);
}
}
}
二、服务端启动设置
============================== RpcServer ==============================
protected void doInit() {
...
// 开启心跳检测开关
final boolean idleSwitch = ConfigManager.tcp_idle_switch();
// 服务端的心跳空闲时间(默认90s)-Dbolt.tcp.server.idle.interval=90000
final int idleTime = ConfigManager.tcp_server_idle();
// 服务端心跳处理器(直接关闭连接)
final ChannelHandler serverIdleHandler = new ServerIdleHandler();
...
this.bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel channel) {
...
if (idleSwitch) {
// 空闲检测处理器
pipeline.addLast("idleStateHandler", new IdleStateHandler(0, 0, idleTime, TimeUnit.MILLISECONDS));
// 服务端心跳处理器
pipeline.addLast("serverIdleHandler", serverIdleHandler);
}
...
}
}
============================== ServerIdleHandler ==============================
@Sharable
public class ServerIdleHandler extends ChannelDuplexHandler {
@Override
public void userEventTriggered(final ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
// 关闭连接
ctx.close();
} else {
super.userEventTriggered(ctx, evt);
}
}
}
三、心跳触发器 HeartbeatTrigger(客户端)
public interface HeartbeatTrigger {
void heartbeatTriggered(final ChannelHandlerContext ctx) throws Exception;
}
public class RpcHeartbeatTrigger implements HeartbeatTrigger {
// max trigger times,心跳最多多少次没响应,则关闭连接,默认为3
// -Dbolt.tcp.heartbeat.maxtimes=3
public static final Integer maxCount = ConfigManager.tcp_idle_maxtimes();
// 心跳响应返回的超时时间,发送请求后1s内没有接收到响应就触发超时逻辑
private static final long heartbeatTimeoutMillis = 1000;
public void heartbeatTriggered(final ChannelHandlerContext ctx) throws Exception {
// 已经心跳的次数,默认为0
Integer heartbeatTimes = ctx.channel().attr(Connection.HEARTBEAT_COUNT).get();
final Connection conn = ctx.channel().attr(Connection.CONNECTION).get();
// 心跳次数已经超过3次,直接关闭连接
if (heartbeatTimes >= maxCount) {
conn.close();
} else {
// 检测该连接的心跳开关是否打开(只针对当前的 Connection 实例,不是全局的)
boolean heartbeatSwitch = ctx.channel().attr(Connection.HEARTBEAT_SWITCH).get();
if (!heartbeatSwitch) {
return;
}
// 创建心跳命令
final HeartbeatCommand heartbeat = new HeartbeatCommand();
// 创建 InvokeFuture
final InvokeFuture future = new DefaultInvokeFuture(heartbeat.getId(),
new InvokeCallbackListener() {
@Override
public void onResponse(InvokeFuture future) {
ResponseCommand response;
// 获取响应
response = (ResponseCommand) future.waitResponse(0);
if (response != null && response.getResponseStatus() == ResponseStatus.SUCCESS) {
// 接收到正常心跳响应,将该连接的已心跳次数置为0
ctx.channel().attr(Connection.HEARTBEAT_COUNT).set(0);
} else {
// 接收到错误的心跳响应,将该连接的已心跳次数+1
Integer times = ctx.channel().attr(Connection.HEARTBEAT_COUNT).get();
ctx.channel().attr(Connection.HEARTBEAT_COUNT).set(times + 1);
}
}
}, null, heartbeat.getProtocolCode().getFirstByte(), this.commandFactory);
final int heartbeatId = heartbeat.getId();
// 将 InvokeFuture 加入连接
conn.addInvokeFuture(future);
// 发送 heartbeat
ctx.writeAndFlush(heartbeat);
// 设置超时任务(1s内没有接收到心跳响应,则直接返回超时失败响应,实现快速失败)
TimerHolder.getTimer().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
InvokeFuture future = conn.removeInvokeFuture(heartbeatId);
if (future != null) {
// 构造超时响应
future.putResponse(commandFactory.createTimeoutResponse(conn.getRemoteAddress()));
// 回调
future.tryAsyncExecuteInvokeCallbackAbnormally();
}
}
}, heartbeatTimeoutMillis, TimeUnit.MILLISECONDS);
}
}
}
四、心跳处理器 RpcHeartBeatProcessor
public class RpcHeartBeatProcessor extends AbstractRemotingProcessor {
public void doProcess(final RemotingContext ctx, RemotingCommand msg) {
// 如果是心跳请求
if (msg instanceof HeartbeatCommand) {
final int id = msg.getId();
// 构造心跳响应
HeartbeatAckCommand ack = new HeartbeatAckCommand();
ack.setId(id);
// 发送心跳响应
ctx.writeAndFlush(ack);
} else if (msg instanceof HeartbeatAckCommand) { // 如果是心跳响应
Connection conn = ctx.getChannelContext().channel().attr(Connection.CONNECTION).get();
InvokeFuture future = conn.removeInvokeFuture(msg.getId());
// 设置心跳响应消息
future.putResponse(msg);
// 取消超时任务
future.cancelTimeout();
// 回调
future.executeInvokeCallback();
} else {
throw new RuntimeException("Cannot process command: " + msg.getClass().getName());
}
}
}
注意:心跳的处理和响应消息的处理都是一样的,如果 RemotingProcessor#executor 存在,则使用该线程池执行,否则使用 ProcessorManager#defaultExecutor 执行
五、全局心跳开关和指定连接心跳开关
全局心跳开关
SOFABolt 提供了一个全局开关:在客户端和服务端启动设置的过程中,已经看到了
boolean idleSwitch = ConfigManager.tcp_idle_switch()
该值是通过系统属性进行设置的:-Dbolt.tcp.heartbeat.switch=true
,所以是多实例共享的。全局心跳开关是一个静态开关
指定连接心跳开关
注意:只有当全局开关打开的时候,指定连接心跳开关才起作用。实际上,指定连接心跳开关通常只用来关闭指定 Connection 上发送心跳消息。指定连接心跳开关由 ConnectionHeartbeatManager 提供的两个接口来设定。
指定连接心跳开关是一个动态开关,即运行时开关
public interface ConnectionHeartbeatManager {
void disableHeartbeat(Connection connection);
void enableHeartbeat(Connection connection);
}
public class DefaultConnectionManager implements ConnectionManager, ConnectionHeartbeatManager
public void disableHeartbeat(Connection connection) {
if (null != connection) {
// 向 Connection 中的 Channel 添加附属属性:Connection.HEARTBEAT_SWITCH,后续在 RpcHeartbeatTrigger 中会判断该附属属性,如果是 false,则不再处理 IdleStateEvent,即不再发送心跳请求消息。
connection.getChannel().attr(Connection.HEARTBEAT_SWITCH).set(false);
}
}
public void enableHeartbeat(Connection connection) {
if (null != connection) {
connection.getChannel().attr(Connection.HEARTBEAT_SWITCH).set(true);
}
}
}
============================== 程序使用入口 ==============================
public class RpcClient extends AbstractConfigurableInstance {
public void enableConnHeartbeat(String addr) {
Url url = this.addressParser.parse(addr);
this.enableConnHeartbeat(url);
}
public void enableConnHeartbeat(Url url) {
if (null != url) {
this.connectionManager.enableHeartbeat(this.connectionManager.get(url.getUniqueKey()));
}
}
public void disableConnHeartbeat(String addr) {
Url url = this.addressParser.parse(addr);
this.disableConnHeartbeat(url);
}
public void disableConnHeartbeat(Url url) {
if (null != url) {
this.connectionManager.disableHeartbeat(this.connectionManager.get(url.getUniqueKey()));
}
}
}