SOFABolt 源码分析17 - Heartbeat 心跳机制的设计

image.png

如上图所示,SOFABolt 中与心跳机制相关的为绿色的类。

类组成

  • 心跳命令
  • HeartbeatCommand:心跳请求命令
  • HeartbeatAckCommand:心跳响应命令
  • RpcHeartBeatProcessor:心跳命令处理器,处理 HeartbeatCommand 和 HeartbeatAckCommand 两种心跳请求
  • IdleStateHandler:Netty 提供的空闲检测处理器,在指定的时间内没有读或写请求,那么发布一个 IdleStateEvent 事件
  • ServerIdleHandler:服务端 IdleStateEvent 事件处理器,进行服务端空闲逻辑处理(SOFABolt 中服务端直接关闭连接)
  • HeartbeatHandler:客户端 IdleStateEvent 事件处理器,进行客户端空闲逻辑处理(SOFABolt 中客户端使用 RpcHeartbeatTrigger 进行空闲处理)
  • RpcHeartbeatTrigger:客户端真正的空闲处理器,也叫心跳触发器
  • ConnectionHeartbeatManager:用于开启或关闭 Connection 的心跳逻辑


    image.png

    注意:CommandHandler 和 HeartbeatTrigger 实例都属于 Protocol 实现类的一个属性,是在创建 Protocol 实现类的时候创建的。

客户端基本流程

  1. 在 15s 内没有读或者写事件,IdleStateHandler 就会发布一个 IdleStateEvent 事件
  2. HeartbeatHandler 进行该事件的处理:
  • 首先从当前 Channel 的附属属性中获取相关的 ProtocolCode
  • 再从 ProtocolManager 中获取 ProtocolCode 的 Protocol 实现类
  • 再从 Protocol 实现类获取 HeartbeatTrigger 实例,最终调用该实例进行 IdleStateEvent 的处理
  1. HeartbeatTrigger 处理 IdleStateEvent 事件
  • 首先从当前 Channel 的附属属性中获取已经发送心跳但是没有接收到响应的次数 heartbeatTimes,如果 heartbeatTimes 已经大于 3 次,则直接关闭连接,否则
  • 从当前 Channel 的附属属性中获取心跳开关,如果关闭了心跳,则直接返回,表示对 IdleStateEvent 不做任何处理;如果开启了心跳
  • 创建心跳请求命令 HeartbeatCommand + 创建本次请求的 InvokeFuture 对象 + 将 InvokeFuture 对象加入到当前的 Connection 中

InvokeFuture 中会设置心跳响应回调函数:当接收到了正常的心跳响应后,将 heartbeatTimes 置为 0;否则,将该连接的heartbeatTimes+1

  • 使用 Netty 发送 HeartbeatCommand 到服务端
  • 设置超时任务(1s内没有接收到心跳响应,则直接返回超时失败响应,实现快速失败)

服务端基本流程

  1. 在 90s 内没有读或者写事件,IdleStateHandler 就会发布一个 IdleStateEvent 事件(如果客户端还正常,那么在 90s 内,会发送至少 6 次心跳,那么服务端将不会触发 IdleStateEvent 事件)
  2. ServerIdleHandler 进行该事件的处理:直接关闭连接

心跳处理流程

  • 心跳请求的处理:服务端接收到 HeartbeatCommand 后,构造心跳响应 HeartbeatAckCommand,之后使用 Netty 返回 HeartbeatAckCommand 给客户端
  • 心跳响应的处理:客户端接收到 HeartbeatAckCommand 后,设置心跳响应消息到 InvokeFuture + 取消超时任务 + 执行 InvokeFuture 中的回调方法

注意

  • 只有客户端会主动发送心跳请求;但是双端都会开启空闲检测
  • 心跳除了上述应用端提供的这种之外,还有 tcp 提供的 keepAlive

一、客户端启动设置

============================== AbstractConnectionFactory ==============================
    // 客户端心跳处理器
    private final ChannelHandler heartbeatHandler = new HeartbeatHandler();
    public void init(final ConnectionEventHandler connectionEventHandler) {
        ...
        bootstrap.handler(new ChannelInitializer<SocketChannel>() {
            protected void initChannel(SocketChannel channel) {
                ...
                // 心跳检测开关 -Dbolt.tcp.heartbeat.switch=true
                boolean idleSwitch = ConfigManager.tcp_idle_switch();
                if (idleSwitch) {
                    // 读或者写空闲,默认为 15s -Dbolt.tcp.heartbeat.interval=15000
                    pipeline.addLast("idleStateHandler", new IdleStateHandler(ConfigManager.tcp_idle(), ConfigManager.tcp_idle(), 0, TimeUnit.MILLISECONDS));
                    // 心跳处理器
                    pipeline.addLast("heartbeatHandler", heartbeatHandler);
                }
                ...
            }
        });
    }

============================== HeartbeatHandler ==============================
@Sharable
public class HeartbeatHandler extends ChannelDuplexHandler {
    @Override
    public void userEventTriggered(final ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            ProtocolCode protocolCode = ctx.channel().attr(Connection.PROTOCOL).get();
            Protocol protocol = ProtocolManager.getProtocol(protocolCode);
            // 调用 HeartbeatTrigger 做真正的心跳处理业务
            protocol.getHeartbeatTrigger().heartbeatTriggered(ctx);
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }
}

二、服务端启动设置

============================== RpcServer ==============================
    protected void doInit() {
        ...
        // 开启心跳检测开关
        final boolean idleSwitch = ConfigManager.tcp_idle_switch();
        // 服务端的心跳空闲时间(默认90s)-Dbolt.tcp.server.idle.interval=90000
        final int idleTime = ConfigManager.tcp_server_idle();
        // 服务端心跳处理器(直接关闭连接)
        final ChannelHandler serverIdleHandler = new ServerIdleHandler();
        ...
        this.bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
            protected void initChannel(SocketChannel channel) {
                ...
                if (idleSwitch) {
                    // 空闲检测处理器
                    pipeline.addLast("idleStateHandler", new IdleStateHandler(0, 0, idleTime, TimeUnit.MILLISECONDS));
                    // 服务端心跳处理器
                    pipeline.addLast("serverIdleHandler", serverIdleHandler);
                }
                ...
            }
    }

============================== ServerIdleHandler ==============================
@Sharable
public class ServerIdleHandler extends ChannelDuplexHandler {
    @Override
    public void userEventTriggered(final ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            // 关闭连接
            ctx.close();
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }
}

三、心跳触发器 HeartbeatTrigger(客户端)

public interface HeartbeatTrigger {
    void heartbeatTriggered(final ChannelHandlerContext ctx) throws Exception;
}

public class RpcHeartbeatTrigger implements HeartbeatTrigger {
    // max trigger times,心跳最多多少次没响应,则关闭连接,默认为3
    // -Dbolt.tcp.heartbeat.maxtimes=3
    public static final Integer maxCount = ConfigManager.tcp_idle_maxtimes();
    // 心跳响应返回的超时时间,发送请求后1s内没有接收到响应就触发超时逻辑
    private static final long heartbeatTimeoutMillis = 1000;

    public void heartbeatTriggered(final ChannelHandlerContext ctx) throws Exception {
        // 已经心跳的次数,默认为0
        Integer heartbeatTimes = ctx.channel().attr(Connection.HEARTBEAT_COUNT).get();
        final Connection conn = ctx.channel().attr(Connection.CONNECTION).get();
        // 心跳次数已经超过3次,直接关闭连接
        if (heartbeatTimes >= maxCount) {
            conn.close();
        } else {
            // 检测该连接的心跳开关是否打开(只针对当前的 Connection 实例,不是全局的)
            boolean heartbeatSwitch = ctx.channel().attr(Connection.HEARTBEAT_SWITCH).get();
            if (!heartbeatSwitch) {
                return;
            }
            // 创建心跳命令
            final HeartbeatCommand heartbeat = new HeartbeatCommand();
            // 创建 InvokeFuture
            final InvokeFuture future = new DefaultInvokeFuture(heartbeat.getId(),
                new InvokeCallbackListener() {
                    @Override
                    public void onResponse(InvokeFuture future) {
                        ResponseCommand response;
                        // 获取响应
                        response = (ResponseCommand) future.waitResponse(0);
                        if (response != null && response.getResponseStatus() == ResponseStatus.SUCCESS) {
                            // 接收到正常心跳响应,将该连接的已心跳次数置为0
                            ctx.channel().attr(Connection.HEARTBEAT_COUNT).set(0);
                        } else {
                            // 接收到错误的心跳响应,将该连接的已心跳次数+1
                            Integer times = ctx.channel().attr(Connection.HEARTBEAT_COUNT).get();
                            ctx.channel().attr(Connection.HEARTBEAT_COUNT).set(times + 1);
                        }
                    }
                }, null, heartbeat.getProtocolCode().getFirstByte(), this.commandFactory);
            final int heartbeatId = heartbeat.getId();
            // 将 InvokeFuture 加入连接
            conn.addInvokeFuture(future);
            // 发送 heartbeat
            ctx.writeAndFlush(heartbeat);
            // 设置超时任务(1s内没有接收到心跳响应,则直接返回超时失败响应,实现快速失败)
            TimerHolder.getTimer().newTimeout(new TimerTask() {
                @Override
                public void run(Timeout timeout) throws Exception {
                    InvokeFuture future = conn.removeInvokeFuture(heartbeatId);
                    if (future != null) {
                        // 构造超时响应
                        future.putResponse(commandFactory.createTimeoutResponse(conn.getRemoteAddress()));
                        // 回调
                        future.tryAsyncExecuteInvokeCallbackAbnormally();
                    }
                }
            }, heartbeatTimeoutMillis, TimeUnit.MILLISECONDS);
        }
    }
}

四、心跳处理器 RpcHeartBeatProcessor

public class RpcHeartBeatProcessor extends AbstractRemotingProcessor {
    public void doProcess(final RemotingContext ctx, RemotingCommand msg) {
        // 如果是心跳请求
        if (msg instanceof HeartbeatCommand) {
            final int id = msg.getId();
            // 构造心跳响应
            HeartbeatAckCommand ack = new HeartbeatAckCommand();
            ack.setId(id);
            // 发送心跳响应
            ctx.writeAndFlush(ack);
        } else if (msg instanceof HeartbeatAckCommand) { // 如果是心跳响应
            Connection conn = ctx.getChannelContext().channel().attr(Connection.CONNECTION).get();
            InvokeFuture future = conn.removeInvokeFuture(msg.getId());
            // 设置心跳响应消息
            future.putResponse(msg);
            // 取消超时任务
            future.cancelTimeout();
            // 回调
            future.executeInvokeCallback();
        } else {
            throw new RuntimeException("Cannot process command: " + msg.getClass().getName());
        }
    }
}

注意:心跳的处理和响应消息的处理都是一样的,如果 RemotingProcessor#executor 存在,则使用该线程池执行,否则使用 ProcessorManager#defaultExecutor 执行

五、全局心跳开关和指定连接心跳开关

全局心跳开关

SOFABolt 提供了一个全局开关:在客户端和服务端启动设置的过程中,已经看到了 boolean idleSwitch = ConfigManager.tcp_idle_switch()
该值是通过系统属性进行设置的:-Dbolt.tcp.heartbeat.switch=true,所以是多实例共享的。全局心跳开关是一个静态开关

指定连接心跳开关

注意:只有当全局开关打开的时候,指定连接心跳开关才起作用。实际上,指定连接心跳开关通常只用来关闭指定 Connection 上发送心跳消息。指定连接心跳开关由 ConnectionHeartbeatManager 提供的两个接口来设定。指定连接心跳开关是一个动态开关,即运行时开关

public interface ConnectionHeartbeatManager {
    void disableHeartbeat(Connection connection);
    void enableHeartbeat(Connection connection);
}

public class DefaultConnectionManager implements ConnectionManager, ConnectionHeartbeatManager
    public void disableHeartbeat(Connection connection) {
        if (null != connection) {
            // 向 Connection 中的 Channel 添加附属属性:Connection.HEARTBEAT_SWITCH,后续在 RpcHeartbeatTrigger 中会判断该附属属性,如果是 false,则不再处理 IdleStateEvent,即不再发送心跳请求消息。
            connection.getChannel().attr(Connection.HEARTBEAT_SWITCH).set(false);
        }
    }

    public void enableHeartbeat(Connection connection) {
        if (null != connection) {
            connection.getChannel().attr(Connection.HEARTBEAT_SWITCH).set(true);
        }
    }
}

============================== 程序使用入口 ==============================
public class RpcClient extends AbstractConfigurableInstance {
    public void enableConnHeartbeat(String addr) {
        Url url = this.addressParser.parse(addr);
        this.enableConnHeartbeat(url);
    }

    public void enableConnHeartbeat(Url url) {
        if (null != url) {
            this.connectionManager.enableHeartbeat(this.connectionManager.get(url.getUniqueKey()));
        }
    }

    public void disableConnHeartbeat(String addr) {
        Url url = this.addressParser.parse(addr);
        this.disableConnHeartbeat(url);
    }

    public void disableConnHeartbeat(Url url) {
        if (null != url) {
            this.connectionManager.disableHeartbeat(this.connectionManager.get(url.getUniqueKey()));
        }
    }
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,222评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,455评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,720评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,568评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,696评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,879评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,028评论 3 409
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,773评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,220评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,550评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,697评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,360评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,002评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,782评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,010评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,433评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,587评论 2 350

推荐阅读更多精彩内容