如何使用
1.我们构造netty服务端的时候,在childHandler里,先获取到pipeline,然后p.addLast(new IdleStateHandler(30, 0, 0, TimeUnit.SECONDS));
p.addLast(new IdleStateHandler(30, 0, 0, TimeUnit.SECONDS));
public IdleStateHandler(long readerIdleTime, long writerIdleTime,
long allIdleTime, TimeUnit unit) {
this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);
}
2.我们还需要写一个handler,来实现超时后需要做的事.netty把超时和超时后任务的触发解耦了.(这是不是观察者模式的具体应用呢?)
HeartBeatServerHandler extends ChannelInboundHandlerAdapter{
//没有并发问题,因为每个连接都会单独new一个HeartBeatServerHandler对象.
private int lossConnectCount = 0;
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
System.out.println("已经5秒未收到客户端的消息了!");
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.READER_IDLE) {
lossConnectCount++;
if (lossConnectCount > 2) {
System.out.println("关闭这个不活跃通道!");
ctx.channel().close();
}
}
} else {
super.userEventTriggered(ctx, evt);
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//重置计数器
lossConnectCount = 0;
System.out.println("client says: " + msg.toString());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
类注释
Triggers an IdleStateEvent when a Channel has not performed read, write, or both operation for a while.
(触发一个IdleStateEvent当一个Channel一段时间内没有进行读或者写,或者读写)
一些问题
假设netty里没有这个相关的功能,需要我们自己设计一个IdleStateHandler,该怎么做呢?
需求分析:当客户端和服务端建立连接的时候,如果客户端一段时间没有操作,读或者写,那么我们就可以自定义的进行一些操作.
问题1.初始化该执行什么操作?
问题2.如何判断超时?
问题3.读写如何分开判断?
问题4.判断超时该用哪个线程?netty的io线程还是自定义线程?
我们带着这些问题来看看netty的设计.
netty的设计
我们先看一下整体的流程.
1.当我们new IdleStateHandler(30, 0, 0, TimeUnit.SECONDS)时,发生了什么?
这个就是把handler加入到pipeline里,后面有io事件触发时,就被handler拦截.然后这个构造器里会初始化一些值.
核心的就是三个时间:
readerIdleTimeNanos
writerIdleTimeNanos
allIdleTimeNanos
public IdleStateHandler(boolean observeOutput,
long readerIdleTime, long writerIdleTime, long allIdleTime,
TimeUnit unit) {
ObjectUtil.checkNotNull(unit, "unit");
this.observeOutput = observeOutput;
//会有个和最小时间比较的逻辑.
if (readerIdleTime <= 0) {
readerIdleTimeNanos = 0;
} else {
readerIdleTimeNanos = Math.max(unit.toNanos(readerIdleTime), MIN_TIMEOUT_NANOS);
}
if (writerIdleTime <= 0) {
writerIdleTimeNanos = 0;
} else {
writerIdleTimeNanos = Math.max(unit.toNanos(writerIdleTime), MIN_TIMEOUT_NANOS);
}
if (allIdleTime <= 0) {
allIdleTimeNanos = 0;
} else {
allIdleTimeNanos = Math.max(unit.toNanos(allIdleTime), MIN_TIMEOUT_NANOS);
}
}
2.客户端第一次创建连接的时候发生了什么?
//IdleStateHandler的channelActive()方法在socket通道建立时被触发
//ctx的传递要搞清楚
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// This method will be invoked only if this handler was added
// before channelActive() event is fired. If a user adds this handler
// after the channelActive() event, initialize() will be called by beforeAdd().
initialize(ctx);//schedule
super.channelActive(ctx);//传播事件
}
这里相当于一个任务的生产者.任务的执行就委托给io线程了.具体看SingleThreadEventExecutor里面的逻辑
//重要的入口,会开启定时任务
private void initialize(ChannelHandlerContext ctx) {
// Avoid the case where destroy() is called before scheduling timeouts.
// See: https://github.com/netty/netty/issues/143
switch (state) {
case 1:
case 2:
return;
}
state = 1;
initOutputChanged(ctx);
//
lastReadTime = lastWriteTime = ticksInNanos();
if (readerIdleTimeNanos > 0) {
readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
readerIdleTimeNanos, TimeUnit.NANOSECONDS);
}
if (writerIdleTimeNanos > 0) {
writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
writerIdleTimeNanos, TimeUnit.NANOSECONDS);
}
if (allIdleTimeNanos > 0) {
allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
allIdleTimeNanos, TimeUnit.NANOSECONDS);
}
}
获取execut然后执行schedule.
ScheduledFuture<?> schedule(ChannelHandlerContext ctx, Runnable task, long delay, TimeUnit unit) {
//从cts获取执行器.然后调度.
return ctx.executor().schedule(task, delay, unit);
}
AbstractScheduledEventExecutor里.
//调度.
private <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
//当前的线程是否eventLoop里的线程.
if (inEventLoop()) {
scheduleFromEventLoop(task);
} else {//什么时候会出现执行的线程跟绑定的线程不一致呢?
//获取deadline
final long deadlineNanos = task.deadlineNanos();
// task will add itself to scheduled task queue when run if not expired
if (beforeScheduledTaskSubmitted(deadlineNanos)) {
execute(task);
} else {
lazyExecute(task);
// Second hook after scheduling to facilitate race-avoidance
if (afterScheduledTaskSubmitted(deadlineNanos)) {
execute(WAKEUP_TASK);
}
}
}
return task;
}
就到了io线程的
IdleStateHandler
这个类继承了ChannelDuplexHandler.(这个类细节比较多,后面单独拎出来讲一下)
ChannelHandler implementation which represents a combination out of a ChannelInboundHandler and the ChannelOutboundHandler. It is a good starting point if your ChannelHandler implementation needs to intercept operations and also state updates.
内部类
AbstractIdleTask
private abstract static class AbstractIdleTask implements Runnable {
//需要ctx来获取executor.
private final ChannelHandlerContext ctx;
AbstractIdleTask(ChannelHandlerContext ctx) {
this.ctx = ctx;
}
//模板的run方法.
@Override
public void run() {
if (!ctx.channel().isOpen()) {
return;
}
run(ctx);
}
protected abstract void run(ChannelHandlerContext ctx);
}
下面的三个类无非就是实现自己的run方法.把变化的点抽象出来,由子类来实现.其实也就是初始化的时间的不同,其他都是一样的.当chanelIdle的时候,就会fireUserEventTriggered,这时候就完成一次超时的处理了.
AllIdleTimeoutTask
@Override
protected void run(ChannelHandlerContext ctx) {
long nextDelay = allIdleTimeNanos;
if (!reading) {
nextDelay -= ticksInNanos() - Math.max(lastReadTime, lastWriteTime);
}
if (nextDelay <= 0) {
// Both reader and writer are idle - set a new timeout and
// notify the callback.
allIdleTimeout = schedule(ctx, this, allIdleTimeNanos, TimeUnit.NANOSECONDS);
boolean first = firstAllIdleEvent;
firstAllIdleEvent = false;
try {
if (hasOutputChanged(ctx, first)) {
return;
}
IdleStateEvent event = newIdleStateEvent(IdleState.ALL_IDLE, first);
channelIdle(ctx, event);
} catch (Throwable t) {
ctx.fireExceptionCaught(t);
}
} else {
// Either read or write occurred before the timeout - set a new
// timeout with shorter delay.
allIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
}
}
ReaderIdleTimeoutTask
@Override
protected void run(ChannelHandlerContext ctx) {
//下一次超时的时间.
long nextDelay = readerIdleTimeNanos;
//刚开始读的时候是true,读完变成false.
if (!reading) {
//下一次超时的时间减去当前时间和上一次读的时间差. why?
nextDelay -= (ticksInNanos() - lastReadTime);
}
//如果小于0,已经超时了
if (nextDelay <= 0) {
// Reader is idle - set a new timeout and notify the callback.
readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);
//
boolean first = firstReaderIdleEvent;
//
firstReaderIdleEvent = false;
//
try {
//创建一个event
IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
channelIdle(ctx, event);
} catch (Throwable t) {
ctx.fireExceptionCaught(t);
}
} else {
// Read occurred before the timeout - set a new timeout with shorter delay.
readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
}
}
channel空闲了.fireUserEventTriggered,而我们的业务处理刚好实现了这个方法.
/**
* Is called when an {@link IdleStateEvent} should be fired. This implementation calls
* {@link ChannelHandlerContext#fireUserEventTriggered(Object)}.
*/
protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
//see.
ctx.fireUserEventTriggered(evt);
}
WriterIdleTimeoutTask
@Override
protected void run(ChannelHandlerContext ctx) {
long lastWriteTime = IdleStateHandler.this.lastWriteTime;
long nextDelay = writerIdleTimeNanos - (ticksInNanos() - lastWriteTime);
//已经超时.
if (nextDelay <= 0) {
// Writer is idle - set a new timeout and notify the callback.
writerIdleTimeout = schedule(ctx, this, writerIdleTimeNanos, TimeUnit.NANOSECONDS);
boolean first = firstWriterIdleEvent;
firstWriterIdleEvent = false;
try {
if (hasOutputChanged(ctx, first)) {
return;
}
//
IdleStateEvent event = newIdleStateEvent(IdleState.WRITER_IDLE, first);
//已经idle了.
channelIdle(ctx, event);
} catch (Throwable t) {
ctx.fireExceptionCaught(t);
}
} else {
// Write occurred before the timeout - set a new timeout with shorter delay.
writerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
}
}
ReadTimeoutHandler
ReadTimeoutHandler extends IdleStateHandler
读超时的handler,会报ReadTimeoutException的错,当一定时间内没有读到数据.
WriteTimeoutHandler
当写操作不能在一定的时间完成的化,报WriteTimeoutException错.
这个类并没有继承IdleStateHandler,就不在这里讲了,有兴趣的可以去看看,也很简单.
IdleStateEvent
可以理解为netty内部把这个空闲状态事件封装好了,传个最终的业务调用方法.
源码没什么特殊的逻辑就不贴了.
IdleState
简单的枚举值.
public enum IdleState {
/**
* No data was received for a while.
*/
READER_IDLE,
/**
* No data was sent for a while.
*/
WRITER_IDLE,
/**
* No data was either received or sent for a while.
*/
ALL_IDLE
}