网络编程框架t-io的编程基本知识介绍

t-io作为目前国内最流行的开源网络编程框架软件,以简单易懂,上手容易而著称,相同的功能比起netty实现起来,要简单的多,代码量也大大减少,如果要使用好t-io,还是要先学习t-io的一些基本知识,这篇文章主要从8个方面介绍了t-io的基础知识。

具体请参考:https://www.wanetech.com/doc/tio/88

t-io收发消息过程

t-io收发消息及处理过程,可以用一张图清晰地表达出来

应用层包:Packet

Packet是用于表述业务数据结构的,我们通过继承Packet来实现自己的业务数据结构,对于各位而言,把Packet看作是一个普通的VO对象即可。

注意:不建议直接使用Packet对象,而是要继承Packet

一个简单的Packet可能长这样

可以结合AioHandler.java理解Packet

单条TCP连接上下文:ChannelContext 

每一个tcp连接的建立都会产生一个ChannelContext对象,这是个抽象类,如果你是用t-io作tcp客户端,那么就是ClientChannelContext,如果你是用tio作tcp服务器,那么就是ServerChannelContext

用户可以把业务数据通过ChannelContext对象和TCP连接关联起来,像下面这样设置属性

ChannelContext.set(String key, Object value)

然后用下面的方式获取属性

ChannelContext.get(String key)

当然最最常用的还是用t-io提供的强到没对手的bind功能,譬如用下面的代码绑定userid

Tio.bindUser(ChannelContext channelContext, String userid)

然后可以通过userid进行操作,示范代码如下

//获取某用户的ChannelContext集合

SetWithLock<ChannelContext> set = Tio.getChannelContextsByUserid(tioConfig, userid);

//给某用户发消息

Tio.sendToUser(TioConfig, userid, Packet)

除了可以绑定userid,t-io还内置了如下绑定API

绑定业务id

Tio.bindBsId(ChannelContext channelContext, String bsId)

绑定token

Tio.bindToken(ChannelContext channelContext, String token)

绑定群组

Tio.bindGroup(ChannelContext channelContext, String group)

ChannelContext对象包含的信息非常多,主要对象见下图


说明

ChannelContext是t-io中非常重要的类,他是业务和连接的沟通桥梁!

服务配置与维护:TioConfig

场景:我们在写TCP Server时,都会先选好一个端口以监听客户端连接,再创建N组线程池来执行相关的任务,譬如发送消息、解码数据包、处理数据包等任务,还要维护客户端连接的各种数据,为了和业务互动,还要把这些客户端连接和各种业务数据绑定起来,譬如把某个客户端绑定到一个群组,绑定到一个userid,绑定到一个token等。

TioConfig就是解决以上场景的:配置线程池、监听端口,维护客户端各种数据等的。

TioConfig是个抽象类

如果你是用tio作tcp客户端,那么你需要创建ClientTioConfig对象

服务器端对应一个ClientTioConfig对象

如果你是用tio作tcp服务器,那么你需要创建ServerTioConfig

一个监听端口对应一个ServerTioConfig ,一个jvm可以监听多个端口,所以一个jvm可以有多个ServerTioConfig对象

TioConfig对象包含的信息非常多,主要对象见下图

如何获取TioConfig对象

见:如何获取TioConfig对象

编码、解码、处理:AioHandler

AioHandler是处理消息的核心接口,它有两个子接口,ClientAioHandler和ServerAioHandler,当用tio作tcp客户端时需要实现ClientAioHandler,当用tio作tcp服务器时需要实现ServerAioHandler,它主要定义了3个方法,见下

package org.tio.core.intf;

import java.nio.ByteBuffer;

import org.tio.core.ChannelContext;

import org.tio.core.TioConfig;

import org.tio.core.exception.AioDecodeException;

/**

*

* @author tanyaowu

* 2017年10月19日 上午9:40:15

*/

public interface AioHandler {

    /**

    * 根据ByteBuffer解码成业务需要的Packet对象.

    * 如果收到的数据不全,导致解码失败,请返回null,在下次消息来时框架层会自动续上前面的收到的数据

    * @param buffer 参与本次希望解码的ByteBuffer

    * @param limit ByteBuffer的limit

    * @param position ByteBuffer的position,不一定是0哦

    * @param readableLength ByteBuffer参与本次解码的有效数据(= limit - position)

    * @param channelContext

    * @return

    * @throws AioDecodeException

    */

    Packet decode(ByteBuffer buffer, int limit, int position, int readableLength, ChannelContext channelContext) throws AioDecodeException;

    /**

    * 编码

    * @param packet

    * @param tioConfig

    * @param channelContext

    * @return

    * @author: tanyaowu

    */

    ByteBuffer encode(Packet packet, TioConfig tioConfig, ChannelContext channelContext);

    /**

    * 处理消息包

    * @param packet

    * @param channelContext

    * @throws Exception

    * @author: tanyaowu

    */

    void handler(Packet packet, ChannelContext channelContext) throws Exception;

}

消息来往监听:AioListener

AioListener是处理消息的核心接口,它有两个子接口:ClientAioListener和ServerAioListener

当用tio作tcp客户端时需要实现ClientAioListener

当用tio作tcp服务器时需要实现ServerAioListener

它主要定义了如下方法

package org.tio.core.intf;

import org.tio.core.ChannelContext;

/**

*

* @author tanyaowu

* 2017年4月1日 上午9:34:08

*/

public interface AioListener {

    /**

    * 建链后触发本方法,注:建链不一定成功,需要关注参数isConnected

    * @param channelContext

    * @param isConnected 是否连接成功,true:表示连接成功,false:表示连接失败

    * @param isReconnect 是否是重连, true: 表示这是重新连接,false: 表示这是第一次连接

    * @throws Exception

    * @author: tanyaowu

    */

    public void onAfterConnected(ChannelContext channelContext, boolean isConnected, boolean isReconnect) throws Exception;

    /**

    * 原方法名:onAfterDecoded

    * 解码成功后触发本方法

    * @param channelContext

    * @param packet

    * @param packetSize

    * @throws Exception

    * @author: tanyaowu

    */

    public void onAfterDecoded(ChannelContext channelContext, Packet packet, int packetSize) throws Exception;

    /**

    * 接收到TCP层传过来的数据后

    * @param channelContext

    * @param receivedBytes 本次接收了多少字节

    * @throws Exception

    */

    public void onAfterReceivedBytes(ChannelContext channelContext, int receivedBytes) throws Exception;

    /**

    * 消息包发送之后触发本方法

    * @param channelContext

    * @param packet

    * @param isSentSuccess true:发送成功,false:发送失败

    * @throws Exception

    * @author tanyaowu

    */

    public void onAfterSent(ChannelContext channelContext, Packet packet, boolean isSentSuccess) throws Exception;

    /**

    * 处理一个消息包后

    * @param channelContext

    * @param packet

    * @param cost 本次处理消息耗时,单位:毫秒

    * @throws Exception

    */

    public void onAfterHandled(ChannelContext channelContext, Packet packet, long cost) throws Exception;

    /**

    * 连接关闭前触发本方法

    * @param channelContext the channelcontext

    * @param throwable the throwable 有可能为空

    * @param remark the remark 有可能为空

    * @param isRemove

    * @author tanyaowu

    * @throws Exception

    */

    public void onBeforeClose(ChannelContext channelContext, Throwable throwable, String remark, boolean isRemove) throws Exception;

    /**

    * 连接关闭前后触发本方法

    * 警告:走到这个里面时,很多绑定的业务都已经解绑了,所以这个方法一般是空着不实现的

    * @param channelContext the channelcontext

    * @param throwable the throwable 有可能为空

    * @param remark the remark 有可能为空

    * @param isRemove 是否是删除

    * @throws Exception

    * @author: tanyaowu

    */

//    public void onAfterClose(ChannelContext channelContext, Throwable throwable, String remark, boolean isRemove) throws Exception;

}

服务器端入口:TioServer

这个对象大家稍微了解一下即可,服务器启动时会用到这个对象,简单贴一下它的源代码吧,大家只需要关注它有一个start()方法是用来启动网络服务的即可

package org.tio.server;

import java.io.IOException;

import java.lang.management.ManagementFactory;

import java.lang.management.RuntimeMXBean;

import java.net.InetSocketAddress;

import java.net.StandardSocketOptions;

import java.nio.channels.AsynchronousChannelGroup;

import java.nio.channels.AsynchronousServerSocketChannel;

import java.util.ArrayList;

import java.util.Date;

import java.util.List;

import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.tio.core.Node;

import org.tio.utils.SysConst;

import org.tio.utils.date.DateUtils;

import org.tio.utils.hutool.StrUtil;

/**

* @author tanyaowu

*

*/

public class TioServer {

    private static Logger log = LoggerFactory.getLogger(TioServer.class);

    private ServerTioConfig serverTioConfig;

    private AsynchronousServerSocketChannel serverSocketChannel;

    private AsynchronousChannelGroup channelGroup = null;

    private Node serverNode;

    private boolean isWaitingStop = false;

    /**

    *

    * @param serverTioConfig

    *

    * @author tanyaowu

    * 2017年1月2日 下午5:53:06

    *

    */

    public TioServer(ServerTioConfig serverTioConfig) {

        super();

        this.serverTioConfig = serverTioConfig;

    }

    /**

    * @return the serverTioConfig

    */

    public ServerTioConfig getServerTioConfig() {

        return serverTioConfig;

    }

    /**

    * @return the serverNode

    */

    public Node getServerNode() {

        return serverNode;

    }

    /**

    * @return the serverSocketChannel

    */

    public AsynchronousServerSocketChannel getServerSocketChannel() {

        return serverSocketChannel;

    }

    /**

    * @return the isWaitingStop

    */

    public boolean isWaitingStop() {

        return isWaitingStop;

    }

    /**

    * @param serverTioConfig the serverTioConfig to set

    */

    public void setServerTioConfig(ServerTioConfig serverTioConfig) {

        this.serverTioConfig = serverTioConfig;

    }

    /**

    * @param isWaitingStop the isWaitingStop to set

    */

    public void setWaitingStop(boolean isWaitingStop) {

        this.isWaitingStop = isWaitingStop;

    }

    public void start(String serverIp, int serverPort) throws IOException {

        long start = System.currentTimeMillis();

        this.serverNode = new Node(serverIp, serverPort);

        channelGroup = AsynchronousChannelGroup.withThreadPool(serverTioConfig.groupExecutor);

        serverSocketChannel = AsynchronousServerSocketChannel.open(channelGroup);

        serverSocketChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);

        serverSocketChannel.setOption(StandardSocketOptions.SO_RCVBUF, 64 * 1024);

        InetSocketAddress listenAddress = null;

        if (StrUtil.isBlank(serverIp)) {

            listenAddress = new InetSocketAddress(serverPort);

        } else {

            listenAddress = new InetSocketAddress(serverIp, serverPort);

        }

        serverSocketChannel.bind(listenAddress, 0);

        AcceptCompletionHandler acceptCompletionHandler = serverTioConfig.getAcceptCompletionHandler();

        serverSocketChannel.accept(this, acceptCompletionHandler);

        serverTioConfig.startTime = System.currentTimeMillis();

        //下面这段代码有点无聊,写得随意,纯粹是为了打印好看些

        String baseStr = "|----------------------------------------------------------------------------------------|";

        int baseLen = baseStr.length();

        StackTraceElement[] ses = Thread.currentThread().getStackTrace();

        StackTraceElement se = ses[ses.length - 1];

        int xxLen = 18;

        int aaLen = baseLen - 3;

        List<String> infoList = new ArrayList<>();

        infoList.add(StrUtil.fillAfter("Tio gitee address", ' ', xxLen) + "| " + SysConst.TIO_URL_GITEE);

        infoList.add(StrUtil.fillAfter("Tio site address", ' ', xxLen) + "| " + SysConst.TIO_URL_SITE);

        infoList.add(StrUtil.fillAfter("Tio version", ' ', xxLen) + "| " + SysConst.TIO_CORE_VERSION);

        infoList.add(StrUtil.fillAfter("-", '-', aaLen));

        infoList.add(StrUtil.fillAfter("TioConfig name", ' ', xxLen) + "| " + serverTioConfig.getName());

        infoList.add(StrUtil.fillAfter("Started at", ' ', xxLen) + "| " + DateUtils.formatDateTime(new Date()));

        infoList.add(StrUtil.fillAfter("Listen on", ' ', xxLen) + "| " + this.serverNode);

        infoList.add(StrUtil.fillAfter("Main Class", ' ', xxLen) + "| " + se.getClassName());

        try {

            RuntimeMXBean runtimeMxBean = ManagementFactory.getRuntimeMXBean();

            String runtimeName = runtimeMxBean.getName();

            String pid = runtimeName.split("@")[0];

            long startTime = runtimeMxBean.getStartTime();

            long startCost = System.currentTimeMillis() - startTime;

            infoList.add(StrUtil.fillAfter("Jvm start time", ' ', xxLen) + "| " + startCost + " ms");

            infoList.add(StrUtil.fillAfter("Tio start time", ' ', xxLen) + "| " + (System.currentTimeMillis() - start) + " ms");

            infoList.add(StrUtil.fillAfter("Pid", ' ', xxLen) + "| " + pid);

        } catch (Exception e) {

        }

        //100

        String printStr = "\r\n"+baseStr+"\r\n";

        //        printStr += "|--" + leftStr + " " + info + " " + rightStr + "--|\r\n";

        for (String string : infoList) {

            printStr += "| " + StrUtil.fillAfter(string, ' ', aaLen) + "|\r\n";

        }

        printStr += baseStr + "\r\n";

        if (log.isInfoEnabled()) {

            log.info(printStr);

        } else {

            System.out.println(printStr);

        }

    }

    /**

    *

    * @return

    * @author tanyaowu

    */

    public boolean stop() {

        isWaitingStop = true;

        boolean ret = true;

        try {

            channelGroup.shutdownNow();

        } catch (Exception e) {

            log.error("channelGroup.shutdownNow()时报错", e);

        }

        try {

            serverSocketChannel.close();

        } catch (Exception e1) {

            log.error("serverSocketChannel.close()时报错", e1);

        }

        try {

            serverTioConfig.groupExecutor.shutdown();

        } catch (Exception e1) {

            log.error(e1.toString(), e1);

        }

        try {

            serverTioConfig.tioExecutor.shutdown();

        } catch (Exception e1) {

            log.error(e1.toString(), e1);

        }

        serverTioConfig.setStopped(true);

        try {

            ret = ret && serverTioConfig.groupExecutor.awaitTermination(6000, TimeUnit.SECONDS);

            ret = ret && serverTioConfig.tioExecutor.awaitTermination(6000, TimeUnit.SECONDS);

        } catch (InterruptedException e) {

            log.error(e.getLocalizedMessage(), e);

        }

        log.info(this.serverNode + " stopped");

        return ret;

    }

}

客户端入口:TioClient 

只有当你在用t-io作为TCP客户端时,才用得到TioClient,此处简单贴一下它的源代码,它的用法,见后面的showcase示范工程

package org.tio.client;

import java.io.IOException;

import java.net.InetSocketAddress;

import java.net.StandardSocketOptions;

import java.nio.channels.AsynchronousChannelGroup;

import java.nio.channels.AsynchronousSocketChannel;

import java.util.Set;

import java.util.concurrent.CountDownLatch;

import java.util.concurrent.LinkedBlockingQueue;

import java.util.concurrent.TimeUnit;

import java.util.concurrent.locks.ReentrantReadWriteLock;

import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;

import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.tio.client.intf.ClientAioHandler;

import org.tio.core.ChannelContext;

import org.tio.core.Node;

import org.tio.core.Tio;

import org.tio.core.intf.Packet;

import org.tio.core.ssl.SslFacadeContext;

import org.tio.core.stat.ChannelStat;

import org.tio.utils.SystemTimer;

import org.tio.utils.hutool.StrUtil;

import org.tio.utils.lock.SetWithLock;

/**

*

* @author tanyaowu

* 2017年4月1日 上午9:29:58

*/

public class TioClient {

    /**

    * 自动重连任务

    * @author tanyaowu

    *

    */

    private static class ReconnRunnable implements Runnable {

        ClientChannelContext channelContext = null;

        TioClient tioClient = null;

        //        private static Map<Node, Long> cacheMap = new HashMap<>();

        public ReconnRunnable(ClientChannelContext channelContext, TioClient tioClient) {

            this.channelContext = channelContext;

            this.tioClient = tioClient;

        }

        /**

        * @see java.lang.Runnable#run()

        *

        * @author tanyaowu

        * 2017年2月2日 下午8:24:40

        *

        */

        @Override

        public void run() {

            ReentrantReadWriteLock closeLock = channelContext.closeLock;

            WriteLock writeLock = closeLock.writeLock();

            writeLock.lock();

            try {

                if (!channelContext.isClosed) //已经连上了,不需要再重连了

                {

                    return;

                }

                long start = SystemTimer.currTime;

                tioClient.reconnect(channelContext, 2);

                long end = SystemTimer.currTime;

                long iv = end - start;

                if (iv >= 100) {

                    log.error("{},重连耗时:{} ms", channelContext, iv);

                } else {

                    log.info("{},重连耗时:{} ms", channelContext, iv);

                }

                if (channelContext.isClosed) {

                    channelContext.setReconnCount(channelContext.getReconnCount() + 1);

                    //                    cacheMap.put(channelContext.getServerNode(), SystemTimer.currTime);

                    return;

                }

            } catch (java.lang.Throwable e) {

                log.error(e.toString(), e);

            } finally {

                writeLock.unlock();

            }

        }

    }

    private static Logger log = LoggerFactory.getLogger(TioClient.class);

    private AsynchronousChannelGroup channelGroup;

    private ClientTioConfig clientTioConfig;

    /**

    * @param serverIp 可以为空

    * @param serverPort

    * @param aioDecoder

    * @param aioEncoder

    * @param aioHandler

    *

    * @author tanyaowu

    * @throws IOException

    *

    */

    public TioClient(final ClientTioConfig clientTioConfig) throws IOException {

        super();

        this.clientTioConfig = clientTioConfig;

        this.channelGroup = AsynchronousChannelGroup.withThreadPool(clientTioConfig.groupExecutor);

        startHeartbeatTask();

        startReconnTask();

    }

    /**

    *

    * @param serverNode

    * @throws Exception

    *

    * @author tanyaowu

    *

    */

    public void asynConnect(Node serverNode) throws Exception {

        asynConnect(serverNode, null);

    }

    /**

    *

    * @param serverNode

    * @param timeout

    * @throws Exception

    *

    * @author tanyaowu

    *

    */

    public void asynConnect(Node serverNode, Integer timeout) throws Exception {

        asynConnect(serverNode, null, null, timeout);

    }

    /**

    *

    * @param serverNode

    * @param bindIp

    * @param bindPort

    * @param timeout

    * @throws Exception

    *

    * @author tanyaowu

    *

    */

    public void asynConnect(Node serverNode, String bindIp, Integer bindPort, Integer timeout) throws Exception {

        connect(serverNode, bindIp, bindPort, null, timeout, false);

    }

    /**

    *

    * @param serverNode

    * @return

    * @throws Exception

    *

    * @author tanyaowu

    *

    */

    public ClientChannelContext connect(Node serverNode) throws Exception {

        return connect(serverNode, null);

    }

    /**

    *

    * @param serverNode

    * @param timeout

    * @return

    * @throws Exception

    * @author tanyaowu

    */

    public ClientChannelContext connect(Node serverNode, Integer timeout) throws Exception {

        return connect(serverNode, null, 0, timeout);

    }

    /**

    *

    * @param serverNode

    * @param bindIp

    * @param bindPort

    * @param initClientChannelContext

    * @param timeout 超时时间,单位秒

    * @return

    * @throws Exception

    * @author tanyaowu

    */

    public ClientChannelContext connect(Node serverNode, String bindIp, Integer bindPort, ClientChannelContext initClientChannelContext, Integer timeout) throws Exception {

        return connect(serverNode, bindIp, bindPort, initClientChannelContext, timeout, true);

    }

    /**

    *

    * @param serverNode

    * @param bindIp

    * @param bindPort

    * @param initClientChannelContext

    * @param timeout 超时时间,单位秒

    * @param isSyn true: 同步, false: 异步

    * @return

    * @throws Exception

    * @author tanyaowu

    */

    private ClientChannelContext connect(Node serverNode, String bindIp, Integer bindPort, ClientChannelContext initClientChannelContext, Integer timeout, boolean isSyn)

            throws Exception {

        AsynchronousSocketChannel asynchronousSocketChannel = null;

        ClientChannelContext channelContext = null;

        boolean isReconnect = initClientChannelContext != null;

        //        ClientAioListener clientAioListener = clientTioConfig.getClientAioListener();

        long start = SystemTimer.currTime;

        asynchronousSocketChannel = AsynchronousSocketChannel.open(channelGroup);

        long end = SystemTimer.currTime;

        long iv = end - start;

        if (iv >= 100) {

            log.error("{}, open 耗时:{} ms", channelContext, iv);

        }

        asynchronousSocketChannel.setOption(StandardSocketOptions.TCP_NODELAY, true);

        asynchronousSocketChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);

        asynchronousSocketChannel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);

        InetSocketAddress bind = null;

        if (bindPort != null && bindPort > 0) {

            if (false == StrUtil.isBlank(bindIp)) {

                bind = new InetSocketAddress(bindIp, bindPort);

            } else {

                bind = new InetSocketAddress(bindPort);

            }

        }

        if (bind != null) {

            asynchronousSocketChannel.bind(bind);

        }

        channelContext = initClientChannelContext;

        start = SystemTimer.currTime;

        InetSocketAddress inetSocketAddress = new InetSocketAddress(serverNode.getIp(), serverNode.getPort());

        ConnectionCompletionVo attachment = new ConnectionCompletionVo(channelContext, this, isReconnect, asynchronousSocketChannel, serverNode, bindIp, bindPort);

        if (isSyn) {

            Integer realTimeout = timeout;

            if (realTimeout == null) {

                realTimeout = 5;

            }

            CountDownLatch countDownLatch = new CountDownLatch(1);

            attachment.setCountDownLatch(countDownLatch);

            asynchronousSocketChannel.connect(inetSocketAddress, attachment, clientTioConfig.getConnectionCompletionHandler());

            boolean f = countDownLatch.await(realTimeout, TimeUnit.SECONDS);

            if (f) {

                return attachment.getChannelContext();

            } else {

                log.error("countDownLatch.await(realTimeout, TimeUnit.SECONDS) 返回false ");

                return attachment.getChannelContext();

            }

        } else {

            asynchronousSocketChannel.connect(inetSocketAddress, attachment, clientTioConfig.getConnectionCompletionHandler());

            return null;

        }

    }

    /**

    *

    * @param serverNode

    * @param bindIp

    * @param bindPort

    * @param timeout 超时时间,单位秒

    * @return

    * @throws Exception

    *

    * @author tanyaowu

    *

    */

    public ClientChannelContext connect(Node serverNode, String bindIp, Integer bindPort, Integer timeout) throws Exception {

        return connect(serverNode, bindIp, bindPort, null, timeout);

    }

    /**

    * @return the channelGroup

    */

    public AsynchronousChannelGroup getChannelGroup() {

        return channelGroup;

    }

    /**

    * @return the clientTioConfig

    */

    public ClientTioConfig getClientTioConfig() {

        return clientTioConfig;

    }

    /**

    *

    * @param channelContext

    * @param timeout

    * @return

    * @throws Exception

    *

    * @author tanyaowu

    *

    */

    public void reconnect(ClientChannelContext channelContext, Integer timeout) throws Exception {

        connect(channelContext.getServerNode(), channelContext.getBindIp(), channelContext.getBindPort(), channelContext, timeout);

    }

    /**

    * @param clientTioConfig the clientTioConfig to set

    */

    public void setClientTioConfig(ClientTioConfig clientTioConfig) {

        this.clientTioConfig = clientTioConfig;

    }

    /**

    * 定时任务:发心跳

    * @author tanyaowu

    *

    */

    private void startHeartbeatTask() {

        final ClientGroupStat clientGroupStat = (ClientGroupStat)clientTioConfig.groupStat;

        final ClientAioHandler aioHandler = clientTioConfig.getClientAioHandler();

        final String id = clientTioConfig.getId();

        new Thread(new Runnable() {

            @Override

            public void run() {

                while (!clientTioConfig.isStopped()) {

//                    final long heartbeatTimeout = clientTioConfig.heartbeatTimeout;

                    if (clientTioConfig.heartbeatTimeout <= 0) {

                        log.warn("用户取消了框架层面的心跳定时发送功能,请用户自己去完成心跳机制");

                        break;

                    }

                    SetWithLock<ChannelContext> setWithLock = clientTioConfig.connecteds;

                    ReadLock readLock = setWithLock.readLock();

                    readLock.lock();

                    try {

                        Set<ChannelContext> set = setWithLock.getObj();

                        long currtime = SystemTimer.currTime;

                        for (ChannelContext entry : set) {

                            ClientChannelContext channelContext = (ClientChannelContext) entry;

                            if (channelContext.isClosed || channelContext.isRemoved) {

                                continue;

                            }

                            ChannelStat stat = channelContext.stat;

                            long compareTime = Math.max(stat.latestTimeOfReceivedByte, stat.latestTimeOfSentPacket);

                            long interval = currtime - compareTime;

                            if (interval >= clientTioConfig.heartbeatTimeout / 2) {

                                Packet packet = aioHandler.heartbeatPacket(channelContext);

                                if (packet != null) {

                                    if (log.isInfoEnabled()) {

                                        log.info("{}发送心跳包", channelContext.toString());

                                    }

                                    Tio.send(channelContext, packet);

                                }

                            }

                        }

                        if (log.isInfoEnabled()) {

                            log.info("[{}]: curr:{}, closed:{}, received:({}p)({}b), handled:{}, sent:({}p)({}b)", id, set.size(), clientGroupStat.closed.get(),

                                    clientGroupStat.receivedPackets.get(), clientGroupStat.receivedBytes.get(), clientGroupStat.handledPackets.get(),

                                    clientGroupStat.sentPackets.get(), clientGroupStat.sentBytes.get());

                        }

                    } catch (Throwable e) {

                        log.error("", e);

                    } finally {

                        try {

                            readLock.unlock();

                            Thread.sleep(clientTioConfig.heartbeatTimeout / 4);

                        } catch (Throwable e) {

                            log.error(e.toString(), e);

                        } finally {

                        }

                    }

                }

            }

        }, "tio-timer-heartbeat" + id).start();

    }

    /**

    * 启动重连任务

    *

    *

    * @author tanyaowu

    *

    */

    private void startReconnTask() {

        final ReconnConf reconnConf = clientTioConfig.getReconnConf();

        if (reconnConf == null || reconnConf.getInterval() <= 0) {

            return;

        }

        final String id = clientTioConfig.getId();

        Thread thread = new Thread(new Runnable() {

            @Override

            public void run() {

                while (!clientTioConfig.isStopped()) {

                    //log.info("准备重连");

                    LinkedBlockingQueue<ChannelContext> queue = reconnConf.getQueue();

                    ClientChannelContext channelContext = null;

                    try {

                        channelContext = (ClientChannelContext) queue.take();

                    } catch (InterruptedException e1) {

                        log.error(e1.toString(), e1);

                    }

                    if (channelContext == null) {

                        continue;

                        //                        return;

                    }

                    if (channelContext.isRemoved) //已经删除的,不需要重新再连

                    {

                        continue;

                    }

                    SslFacadeContext sslFacadeContext = channelContext.sslFacadeContext;

                    if (sslFacadeContext != null) {

                        sslFacadeContext.setHandshakeCompleted(false);

                    }

                    long sleeptime = reconnConf.getInterval() - (SystemTimer.currTime - channelContext.stat.timeInReconnQueue);

                    //log.info("sleeptime:{}, closetime:{}", sleeptime, timeInReconnQueue);

                    if (sleeptime > 0) {

                        try {

                            Thread.sleep(sleeptime);

                        } catch (InterruptedException e) {

                            log.error(e.toString(), e);

                        }

                    }

                    if (channelContext.isRemoved || !channelContext.isClosed) //已经删除的和已经连上的,不需要重新再连

                    {

                        continue;

                    }

                    ReconnRunnable runnable = new ReconnRunnable(channelContext, TioClient.this);

                    reconnConf.getThreadPoolExecutor().execute(runnable);

                }

            }

        });

        thread.setName("tio-timer-reconnect-" + id);

        thread.setDaemon(true);

        thread.start();

    }

    /**

    *

    * @return

    * @author tanyaowu

    */

    public boolean stop() {

        boolean ret = true;

        try {

            clientTioConfig.groupExecutor.shutdown();

        } catch (Exception e1) {

            log.error(e1.toString(), e1);

        }

        try {

            clientTioConfig.tioExecutor.shutdown();

        } catch (Exception e1) {

            log.error(e1.toString(), e1);

        }

        clientTioConfig.setStopped(true);

        try {

            ret = ret && clientTioConfig.groupExecutor.awaitTermination(6000, TimeUnit.SECONDS);

            ret = ret && clientTioConfig.tioExecutor.awaitTermination(6000, TimeUnit.SECONDS);

        } catch (InterruptedException e) {

            log.error(e.getLocalizedMessage(), e);

        }

        log.info("client resource has released");

        return ret;

    }

}

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,509评论 6 504
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,806评论 3 394
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,875评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,441评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,488评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,365评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,190评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,062评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,500评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,706评论 3 335
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,834评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,559评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,167评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,779评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,912评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,958评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,779评论 2 354

推荐阅读更多精彩内容