IdleStateHandler的使用
Netty中的IdleStateHandler作用:当channel在指定时间内没有触发read,write会触发相应的IdleStateEvent事件。借助IdleStateHandler可以方便的实现心跳机制。
我们来看官方的例子:实现空闲channel每隔30s发送一个ping信息,如果60s之内该channel没有读取到通讯信息,就关闭这个channel。
package learn.netty.idle;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
/**
* @author stone
* @date 2019/7/31 14:37
*/
public class IdleTest {
public class MyChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("idleStateHandler", new IdleStateHandler(60, 30, 0));
ch.pipeline().addLast("myHandler", new MyHandler());
}
}
public class MyHandler extends ChannelDuplexHandler {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
super.userEventTriggered(ctx, evt);
if (evt instanceof IdleStateEvent) {
IdleStateEvent e = (IdleStateEvent) evt;
if (e.state() == IdleState.READER_IDLE) {
ctx.close();
}else if (e.state() == IdleState.WRITER_IDLE) {
ctx.writeAndFlush(new PingMessage());
}
}
}
}
public class PingMessage {
private String msg = "ping";
public String getMsg() {
return msg;
}
public void setMsg(String msg) {
this.msg = msg;
}
}
}
使用时好是将IdleStateHandler放在入站的开头,并且重写userEventTriggered这个方法的handler必须在其后面。否则无法触发这个事件。
心跳机制及重连机制流程
首先理一些Netty应用心跳机制、重连机制的流程:
- 客户端连接服务端;
- 在客户端的ChannelPipeline中加入IdleStateHandler,设置一下客户端的写空闲时间,例如5s;
- 当客户端的所有ChannleHandler中4s内没有write时间,就会触发userEventTriggered方法;
- 在客户端的userEventTriggered中发送心跳包给服务端,检测服务端是否存活,避免服务端已经宕机但服务端还不知道的情况发生;
- 服务端对心跳包做出响应(服务端给客户端最好的回复是不回复,这样可以减轻服务端的压力;假如服务端有10万个连接,那么心跳回复也会占用可观的流量)。服务端怎么判断客户端是否存活呢?因为每隔5s服务端就会收到来自客户端的心跳信息,那么如果服务端10s还收不到客户端的信息,可以任务客户端已经宕机,这时就关闭客户端链路;
- 如果服务端发生故障,关闭了所有链路,客户端需要做的事情就是断线重连。
首先先定义一个接口ChannelHandlerHolder,用来保管ChannelPipeline中的Handlers:
package learn.netty.idle;
import io.netty.channel.ChannelHandler;
/**
* 客户端的ChannelHandler集合,由子类实现,这样做的好处:
* 继承这个接口的所有子类可以很方便地获取ChannelPipeline中的Handlers
* 获取到handlers之后方便ChannelPipeline中的handler的初始化和在重连的时候也能很方便
* 地获取所有的handlers
*
* @author stone
* @date 2019/7/31 14:17
*/
public interface ChannelHandlerHolder {
ChannelHandler[] handlers();
}
接下来是服务端代码HeartBeatServer:
public class HeartBeatServer {
private final IdleTriggerHandler idleTriggerHandler = new IdleTriggerHandler();
private int port;
public HeartBeatServer(int port) {
this.port = port;
}
public void start() {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.localAddress(new InetSocketAddress(port))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS));
ch.pipeline().addLast(idleTriggerHandler);
ch.pipeline().addLast("decoder", new StringDecoder());
ch.pipeline().addLast("encoder", new StringEncoder());
ch.pipeline().addLast(new HeartBeatServerHandler());
}
}).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE,true);
// 绑定端口,开始接受客户端的连接
ChannelFuture future = b.bind(port).sync();
System.out.println("Server start listen at " + port);
future.channel().closeFuture().sync();
} catch (Exception e) {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) {
int port;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
} else {
port = 8080;
}
new HeartBeatServer(port).start();
}
}
单独写一个IdleTriggerHandler,重写userEventTriggered方法,因为客户端是write,那么服务端自然是read,设置的状态就是IdleState.READER_IDLE,源码如下:
package learn.netty.idle;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
/**
* @author stone
* @date 2019/7/31 14:51
*/
@ChannelHandler.Sharable
public class IdleTriggerHandler extends ChannelInboundHandlerAdapter {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleState state = ((IdleStateEvent)evt).state();
if (state == IdleState.READER_IDLE) {
throw new Exception("idle exception");
}
} else {
super.userEventTriggered(ctx, evt);
}
}
}
HeartBeatServerHandler自定义的服务端的handler:
package learn.netty.idle;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;
/**
* @author stone
* @date 2019/7/31 15:02
*/
public class HeartBeatServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
System.out.println(ctx.channel().remoteAddress() + "->Server : " + msg.toString());
} finally {
ReferenceCountUtil.release(msg);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
编写一个客户端handler,这个handler的作用是观察链路连接是否正常,如果连接断了,就进行重连操作。如果channel连接断掉,就会触发channelInactive方法,所有重连动作要在这个方法中操作;当channelInactive被触发时,我们创建一个重新建立连接的任务放到netty的延时队列中去,超时时间到了之后就会激发重连动作:
package learn.netty.idle;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.socket.SocketChannel;
import io.netty.util.Timeout;
import io.netty.util.Timer;
import io.netty.util.TimerTask;
import java.util.concurrent.TimeUnit;
/**
* @author stone
* @date 2019/7/31 15:08
*/
@ChannelHandler.Sharable
public abstract class ConnectionWatchdog extends ChannelInboundHandlerAdapter implements TimerTask, ChannelHandlerHolder {
private final Bootstrap bootstrap;
private final Timer timer;
private final int port;
private final String host;
private volatile boolean reconnect = true;
private int attempts;
public ConnectionWatchdog(Bootstrap bootstrap, Timer timer, int port, String host, boolean reconnect) {
this.bootstrap = bootstrap;
this.timer = timer;
this.port = port;
this.host = host;
this.reconnect = reconnect;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("当前链路已经激活,重新尝试连接次数重置为0");
attempts = 0;
ctx.fireChannelActive();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("链路关闭");
if (reconnect) {
System.out.println("链路关闭,进行重连...");
if (attempts < 12) {
attempts++;
// 重连的时间间隔越来越长
int timeout = 2 << attempts;
System.out.println(attempts);
timer.newTimeout(this, timeout, TimeUnit.MILLISECONDS);
}
}
ctx.fireChannelInactive();
}
@Override
public void run(Timeout timeout) throws Exception {
ChannelFuture future;
// bootstrap 已经初始化结束,只需将handler传入即可
synchronized (bootstrap) {
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(handlers());
}
});
future = bootstrap.connect(host, port);
}
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
boolean succeed = f.isSuccess();
if (!succeed) { // 如果重连失败,再次发出重连事件,最多尝试12次
System.out.println("重连失败");
f.channel().pipeline().fireChannelInactive();
} else {
System.out.println("重连成功");
}
}
});
System.out.println("timeout task : " + attempts);
}
}
HeartBeatClient代码:
package learn.netty.idle;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.HashedWheelTimer;
import java.util.concurrent.TimeUnit;
/**
* @author stone
* @date 2019/7/31 15:34
*/
public class HeartBeatClient {
protected final HashedWheelTimer timer = new HashedWheelTimer();
private Bootstrap boot;
private final ConnectorIdleStateTrigger idleTriggerHandler = new ConnectorIdleStateTrigger();
public void connect(int port, String host) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
boot = new Bootstrap();
boot.group(group).channel(NioSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO));
ConnectionWatchdog watchdog = new ConnectionWatchdog(boot, timer, port, host, true) {
@Override
public ChannelHandler[] handlers() {
return new ChannelHandler[] {
this, // 这里把 ConnectionWatchdog 也添加到了pipeline
new IdleStateHandler(0, 4, 0, TimeUnit.SECONDS),
idleTriggerHandler,
new StringDecoder(),
new StringEncoder(),
new HeartBeatClientHandler()
};
}
};
ChannelFuture f;
// 进行连接
try {
synchronized (boot) {
boot.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(watchdog.handlers());
}
});
f = boot.connect(host, port);
}
f.sync();
} catch (Throwable t) {
throw new Exception("connects to fails", t);
}
}
public static void main(String[] args) throws Exception {
int port = 8080;
if (args != null && args.length > 0) {
try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException e) {
// 采用默认值
}
}
new HeartBeatClient().connect(port, "127.0.0.1");
}
}
HeartBeatClient的作用:
1)创建了ConnectionWatchdog 这个handler;
2)配置serverBoot,并将ConnectionWatchdog 这个handler也加入了pipeline;
3)IdleStateHandler(0, 4, 0, TimeUnit.SECONDS)
如果4s内没有写操作,就进行心跳注册,向服务端发送ping消息。
在ConnectorIdleStateTrigger这个handler中发送ping消息:
package learn.netty.idle;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.CharsetUtil;
/**
* @author stone
* @date 2019/7/31 16:04
*/
@ChannelHandler.Sharable
public class ConnectorIdleStateTrigger extends ChannelInboundHandlerAdapter {
private static final ByteBuf HEARTBEAT_SEQUENCE =
Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Heartbeat", CharsetUtil.UTF_8));
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleState state = ((IdleStateEvent)evt).state();
if (state == IdleState.WRITER_IDLE) {
ctx.writeAndFlush(HEARTBEAT_SEQUENCE.duplicate());
}
} else {
super.userEventTriggered(ctx, evt);
}
}
}
客户端业务处理Handler:
package learn.netty.idle;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;
import java.util.Date;
/**
* @author stone
* @date 2019/7/31 16:10
*/
@ChannelHandler.Sharable
public class HeartBeatClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("激活时间是:" + new Date());
System.out.println("HeartBeatClientHandler channelActive");
ctx.fireChannelActive();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("停止时间是:" + new Date());
System.out.println("HeartBeatClientHandler channelInactive");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String message = (String) msg;
System.out.println(message);
if (message.equals("Heartbeat")) {
ctx.writeAndFlush("has read message from server");
}
ReferenceCountUtil.release(msg);
}
}
测试说明:
- 启动服务端,然后启动客户端,可以在服务端控制台看到接收到的ping信息(心跳机制);
- 关闭服务端,1s钟之后再重启服务端,观察日志查看客户端的重连行为(重连机制)。