dubbo之心跳机制

在网络传输中,怎么确保通道连接的可用性是一个很重要的问题,简单的说,在网络通信中有客户端和服务端,一个负责发送请求,一个负责接收请求,在保证连接有效性的背景下,这两个物体扮演了什么角色,心跳机制能有效的保证连接的可用性,那它的机制是什么,下文中将会详细讲解。

网络层的可用性

首先讲一下TCP,在dubbo中的通信是基于TCP的,TCP本身并没有长短连接的区别,在短连接中,每次通信时,都会创建Socket,当该次通信结束后,就会调用socket.close();而在长连接中,每次通信完毕后,不会关闭连接,这样就可以做到连接的复用,长连接的好处是省去了创建连接时的耗时。那么如何确保连接的有效性呢,在TCP中用到了KeepAlive机制,keepalive并不是TCP协议的一部分,但是大多数操作系统都实现了这个机制,在一定时间内,在链路上如果没有数据传送的情况下,TCP层将会发送相应的keepalive探针来确定连接可用性,探测失败后重试10次(tcp_keepalive_probes),每次间隔时间为75s(tcp_keepalive_intvl),所有探测失败后,才认为当前连接已经不可用了。

KeepAlive机制是在网络层保证了连接的可用性,但在应用层我们认为这还是不够的。

  • KeepAlive的报活机制只有在链路空闲的情况下才会起作用,假如此时有数据发送,且物理链路已经不通,操作系统这边的链路状态还是E STABLISHED,这时会发生TCP重传机制,要知道默认的TCP超时重传,指数退避算法也是一个相当长的过程。
  • KeepAlive本身是面向网络的,并不是面向于应用的,可能是由于本身GC问题,系统load高等情况,但网络依然是通的,此时,应用已经失去了活性,所以连接自然认为是不可用的。

应用层的连接可用性:心跳机制

如何理解应用层的心跳?简单的说,就是客户端会开启一个定时任务,定时对已经建立连接的对端应用发送请求,服务端则需要特殊处理该请求,返回响应。如果心跳持续多次没有收到响应,客户端会认为连接不可用,主动断开连接。

客户端如何得知请求失败了?

在失败的场景下,服务端是不会返回响应的,所以只能在客户端自身上设计了。
当客户端发起一个RPC请求时,会设置一个超时时间client_timeout,同时它也会开启一个延迟的client_timeout的定时器。当接收到正常响应时,会移除该定时器;而当计时器倒计时完毕后,还没有被移除,则会认为请求超时,构造一个失败的响应传递给客户端。

连接建立时创建定时器

HeaderExchangeClient类

 public HeaderExchangeClient(Client client, boolean needHeartbeat) {
        if (client == null) {
            throw new IllegalArgumentException("client == null");
        }
        this.client = client;
        // 创建信息交换通道
        this.channel = new HeaderExchangeChannel(client);
        // 获得dubbo版本
        String dubbo = client.getUrl().getParameter(Constants.DUBBO_VERSION_KEY);
        //获得心跳周期配置,如果没有配置,并且dubbo是1.0版本的,则这只为1分钟,否则设置为0
        this.heartbeat = client.getUrl().getParameter(Constants.HEARTBEAT_KEY, dubbo != null && dubbo.startsWith("1.0.") ? Constants.DEFAULT_HEARTBEAT : 0);
        // 获得心跳超时配置,默认是心跳周期的三倍
        this.heartbeatTimeout = client.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
                 
        if (needHeartbeat) {
            // 开启心跳
          long tickDuration = calculateLeastDuration(heartbeat);
          heartbeatTimer = new HashedWheelTimer(new NamedThreadFactory("dubbo-client-heartbeat", true) , tickDuration, TimeUnit.MILLISECONDS, Constants.TICKS_PER_WHEEL);
          startHeartbeatTimer();
        }
    }

创建了一个HashedWheelTimer开启心跳检测,这是 Netty 所提供的一个经典的时间轮定时器实现。

HeaderExchangeServer也同时开启了定时器,代码逻辑和上述差不多。

开启两个定时任务

private void startHeartbeatTimer() {
           long heartbeatTick = calculateLeastDuration(heartbeat); 
   long heartbeatTimeoutTick = calculateLeastDuration(heartbeatTimeout);
   HeartbeatTimerTask heartBeatTimerTask =new  HeartbeatTimerTask(cp, heartbeatTick, heartbeat);
   ReconnectTimerTask reconnectTimerTask = new ReconnectTimerTask(cp, heartbeatTimeoutTick, heartbeatTimeout);
    
  heartbeatTimer.newTimeout(heartBeatTimerTask, heartbeatTick, TimeUnit.MILLISECONDS); 
  heartbeatTimer.newTimeout(reconnectTimerTask, heartbeatTimeoutTick, TimeUnit.MILLISECONDS);
}

在该方法中主要开启了两个定时器

  • HeartbeatTimerTask 主要是定时发送心跳请求
  • ReconnectTimerTask 主要是心跳失败后处理重连,断连的逻辑

旧版的心跳处理HeartBeatTask类

final class HeartBeatTask implements Runnable {

    private static final Logger logger = LoggerFactory.getLogger(HeartBeatTask.class);

    /**
     * 通道管理
     */
    private ChannelProvider channelProvider;

    /**
     * 心跳间隔 单位:ms
     */
    private int heartbeat;

    /**
     * 心跳超时时间 单位:ms
     */
    private int heartbeatTimeout;

    HeartBeatTask(ChannelProvider provider, int heartbeat, int heartbeatTimeout) {
        this.channelProvider = provider;
        this.heartbeat = heartbeat;
        this.heartbeatTimeout = heartbeatTimeout;
    }

    @Override
    public void run() {
        try {
            long now = System.currentTimeMillis();
            // 遍历所有通道
            for (Channel channel : channelProvider.getChannels()) {
                // 如果通道关闭了,则跳过
                if (channel.isClosed()) {
                    continue;
                }
                try {
                    // 最后一次接收到消息的时间戳
                    Long lastRead = (Long) channel.getAttribute(
                            HeaderExchangeHandler.KEY_READ_TIMESTAMP);
                    // 最后一次发送消息的时间戳
                    Long lastWrite = (Long) channel.getAttribute(
                            HeaderExchangeHandler.KEY_WRITE_TIMESTAMP);
                    // 如果最后一次接收或者发送消息到时间到现在的时间间隔超过了心跳间隔时间
                    if ((lastRead != null && now - lastRead > heartbeat)
                            || (lastWrite != null && now - lastWrite > heartbeat)) {
                        // 创建一个request
                        Request req = new Request();
                        // 设置版本号
                        req.setVersion(Version.getProtocolVersion());
                        // 设置需要得到响应
                        req.setTwoWay(true);
                        // 设置事件类型,为心跳事件
                        req.setEvent(Request.HEARTBEAT_EVENT);
                        // 发送心跳请求
                        channel.send(req);
                        if (logger.isDebugEnabled()) {
                            logger.debug("Send heartbeat to remote channel " + channel.getRemoteAddress()
                                    + ", cause: The channel has no data-transmission exceeds a heartbeat period: " + heartbeat + "ms");
                        }
                    }
                    // 如果最后一次接收消息的时间到现在已经超过了超时时间
                    if (lastRead != null && now - lastRead > heartbeatTimeout) {
                        logger.warn("Close channel " + channel
                                + ", because heartbeat read idle time out: " + heartbeatTimeout + "ms");
                        // 如果该通道是客户端,也就是请求的服务器挂掉了,客户端尝试重连服务器
                        if (channel instanceof Client) {
                            try {
                                // 重新连接服务器
                                ((Client) channel).reconnect();
                            } catch (Exception e) {
                                //do nothing
                            }
                        } else {
                            // 如果不是客户端,也就是是服务端返回响应给客户端,但是客户端挂掉了,则服务端关闭客户端连接
                            channel.close();
                        }
                    }
                } catch (Throwable t) {
                    logger.warn("Exception when heartbeat to remote channel " + channel.getRemoteAddress(), t);
                }
            }
        } catch (Throwable t) {
            logger.warn("Unhandled exception when heartbeat, cause: " + t.getMessage(), t);
        }
    }

    interface ChannelProvider {
        // 获得所有的通道集合,需要心跳的通道数组
        Collection<Channel> getChannels();
    }

}

它首先遍历所有的Channel,在服务端对用的是所有客户端连接,在客户端对应的是服务端连接,判断当前TCP连接是否空闲,如果空闲就发送心跳报文,判断是否空闲,根据Channel是否有读或写来决定,比如一分钟内没有读或写就发送心跳报文,然后是处理超时的问题,处理客户端超时重新建立TCP连接,目前的策略是检查是否在3分钟内都没有成功接受或发送报文,如果在服务端检测则就会主动关闭远程客户端连接。

新版本的心跳机制

定时任务一: 发送心跳请求

在新版本下,去除了HeartBeatTask类,添加了HeartbeatTimerTask和ReconnectTimerTask类

public class HeartbeatTimerTask extends AbstractTimerTask {

    private static final Logger logger = LoggerFactory.getLogger(HeartbeatTimerTask.class);

    private final int heartbeat;

    HeartbeatTimerTask(ChannelProvider channelProvider, Long heartbeatTick, int heartbeat) {
        super(channelProvider, heartbeatTick);
        this.heartbeat = heartbeat;
    }

    @Override
    protected void doTask(Channel channel) {
        try {
            Long lastRead = lastRead(channel);
            Long lastWrite = lastWrite(channel);
            if ((lastRead != null && now() - lastRead > heartbeat)
                    || (lastWrite != null && now() - lastWrite > heartbeat)) {
                Request req = new Request();
                req.setVersion(Version.getProtocolVersion());
                req.setTwoWay(true);
                req.setEvent(Request.HEARTBEAT_EVENT);
                channel.send(req);
                if (logger.isDebugEnabled()) {
                    logger.debug("Send heartbeat to remote channel " + channel.getRemoteAddress()
                            + ", cause: The channel has no data-transmission exceeds a heartbeat period: "
                            + heartbeat + "ms");
                }
            }
        } catch (Throwable t) {
            logger.warn("Exception when heartbeat to remote channel " + channel.getRemoteAddress(), t);
        }
    }
}

Dubbo采取的是双向心跳设计,即服务端会向客户端发送心跳,客户端也会向服务端发送心跳,接收的一方更新lastread字段,发送的一方更新lastWrite字段,超过心跳间隙的时间,便发送心跳请求给对端。

定时任务二: 处理重连和断连

public class ReconnectTimerTask extends AbstractTimerTask {

    private static final Logger logger = LoggerFactory.getLogger(ReconnectTimerTask.class);

    private final int idleTimeout;

    public ReconnectTimerTask(ChannelProvider channelProvider, Long heartbeatTimeoutTick, int idleTimeout) {
        super(channelProvider, heartbeatTimeoutTick);
        this.idleTimeout = idleTimeout;
    }

    @Override
    protected void doTask(Channel channel) {
        try {
            Long lastRead = lastRead(channel);
            Long now = now();

            // Rely on reconnect timer to reconnect when AbstractClient.doConnect fails to init the connection
            if (!channel.isConnected()) {
                try {
                    logger.info("Initial connection to " + channel);
                    ((Client) channel).reconnect();
                } catch (Exception e) {
                    logger.error("Fail to connect to " + channel, e);
                }
            // check pong at client
            } else if (lastRead != null && now - lastRead > idleTimeout) {
                logger.warn("Reconnect to channel " + channel + ", because heartbeat read idle time out: "
                        + idleTimeout + "ms");
                try {
                    ((Client) channel).reconnect();
                } catch (Exception e) {
                    logger.error(channel + "reconnect failed during idle time.", e);
                }
            }
        } catch (Throwable t) {
            logger.warn("Exception when reconnect to remote channel " + channel.getRemoteAddress(), t);
        }
    }
}

不同类型处理机制不同,当超过设置的心跳总时间后,客户端选择的是重新连接,服务端是选择直接断开连接。

心跳改进方案

Netty对空闲连接的检测提供了天然的支持,使用IdleStateHandler可以很方便的实现空闲检测逻辑。

public IdleStateHandler(long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit){}
  • readerIdleTime: 读超时的时间
  • writerIdleTime: 写超时的时间
  • allIdleTime: 所有类型的超时时间
    客户端和服务端配置
    客户端:
bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
    @Override
    protected void initChannel(NioSocketChannel ch) throws Exception {
        ch.pipeline().addLast("clientIdleHandler", new IdleStateHandler(60, 0, 0));
    }
});

服务端:

serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
    @Override
    protected void initChannel(NioSocketChannel ch) throws Exception {
        ch.pipeline().addLast("serverIdleHandler",new IdleStateHandler(0, 0, 200));
    }
}

从上面看出,客户端配置了read超时为60s,服务端配置了write/read超时未200s,

空闲超时逻辑-客户端

对于空闲超时的处理逻辑,客户端和服务端是不同的,首先来看客户端的:

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
    if (evt instanceof IdleStateEvent) {
        // send heartbeat
        sendHeartBeat();
    } else {
        super.userEventTriggered(ctx, evt);
    }
}

检测到空闲超时后,采取的行为是向服务端发送心跳包,

public void sendHeartBeat() {
    Invocation invocation = new Invocation();
    invocation.setInvocationType(InvocationType.HEART_BEAT);
    channel.writeAndFlush(invocation).addListener(new CallbackFuture() {
        @Override
        public void callback(Future future) {
            RPCResult result = future.get();
            //超时 或者 写失败
            if (result.isError()) {
                channel.addFailedHeartBeatTimes();
                if (channel.getFailedHeartBeatTimes() >= channel.getMaxHeartBeatFailedTimes()) {
                    channel.reconnect();
                }
            } else {
                channel.clearHeartBeatFailedTimes();
            }
        }
    });
}

构造一个心跳包发送到服务端,接受响应结果

  • 响应成功,清除请求失败标记
  • 响应失败,心跳失败标记+1,如果超过配置的失败次数,则重新连接

空闲超时逻辑 - 服务端

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
    if (evt instanceof IdleStateEvent) {
        channel.close();
    } else {
        super.userEventTriggered(ctx, evt);
    }
}

服务端直接关闭连接。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,922评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,591评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,546评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,467评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,553评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,580评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,588评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,334评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,780评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,092评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,270评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,925评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,573评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,194评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,437评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,154评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,127评论 2 352

推荐阅读更多精彩内容