Netty之IdleStateHandler源码阅读

如何使用

1.我们构造netty服务端的时候,在childHandler里,先获取到pipeline,然后p.addLast(new IdleStateHandler(30, 0, 0, TimeUnit.SECONDS));

p.addLast(new IdleStateHandler(30, 0, 0, TimeUnit.SECONDS));
public IdleStateHandler(long readerIdleTime, long writerIdleTime,
                        long allIdleTime, TimeUnit unit) {
    this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);
}

2.我们还需要写一个handler,来实现超时后需要做的事.netty把超时和超时后任务的触发解耦了.(这是不是观察者模式的具体应用呢?)

HeartBeatServerHandler extends ChannelInboundHandlerAdapter{
    //没有并发问题,因为每个连接都会单独new一个HeartBeatServerHandler对象.
    private int lossConnectCount = 0;
        
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        System.out.println("已经5秒未收到客户端的消息了!");
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;
            if (event.state() == IdleState.READER_IDLE) {
                lossConnectCount++;
                if (lossConnectCount > 2) {
                    System.out.println("关闭这个不活跃通道!");
                    ctx.channel().close();
                }
            }
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //重置计数器
        lossConnectCount = 0;
        System.out.println("client says: " + msg.toString());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

类注释

Triggers an IdleStateEvent when a Channel has not performed read, write, or both operation for a while.

(触发一个IdleStateEvent当一个Channel一段时间内没有进行读或者写,或者读写)

一些问题

假设netty里没有这个相关的功能,需要我们自己设计一个IdleStateHandler,该怎么做呢?

需求分析:当客户端和服务端建立连接的时候,如果客户端一段时间没有操作,读或者写,那么我们就可以自定义的进行一些操作.

问题1.初始化该执行什么操作?

问题2.如何判断超时?

问题3.读写如何分开判断?

问题4.判断超时该用哪个线程?netty的io线程还是自定义线程?

我们带着这些问题来看看netty的设计.

netty的设计

我们先看一下整体的流程.

1.当我们new IdleStateHandler(30, 0, 0, TimeUnit.SECONDS)时,发生了什么?

这个就是把handler加入到pipeline里,后面有io事件触发时,就被handler拦截.然后这个构造器里会初始化一些值.

核心的就是三个时间:

readerIdleTimeNanos

writerIdleTimeNanos

allIdleTimeNanos

  public IdleStateHandler(boolean observeOutput,
                            long readerIdleTime, long writerIdleTime, long allIdleTime,
                            TimeUnit unit) {
        ObjectUtil.checkNotNull(unit, "unit");

        this.observeOutput = observeOutput;
                //会有个和最小时间比较的逻辑.
        if (readerIdleTime <= 0) {
            readerIdleTimeNanos = 0;
        } else {
            readerIdleTimeNanos = Math.max(unit.toNanos(readerIdleTime), MIN_TIMEOUT_NANOS);
        }
        if (writerIdleTime <= 0) {
            writerIdleTimeNanos = 0;
        } else {
            writerIdleTimeNanos = Math.max(unit.toNanos(writerIdleTime), MIN_TIMEOUT_NANOS);
        }
        if (allIdleTime <= 0) {
            allIdleTimeNanos = 0;
        } else {
            allIdleTimeNanos = Math.max(unit.toNanos(allIdleTime), MIN_TIMEOUT_NANOS);
        }
    }

2.客户端第一次创建连接的时候发生了什么?

//IdleStateHandler的channelActive()方法在socket通道建立时被触发
//ctx的传递要搞清楚
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
    // This method will be invoked only if this handler was added
    // before channelActive() event is fired.  If a user adds this handler
    // after the channelActive() event, initialize() will be called by beforeAdd().
    initialize(ctx);//schedule
    super.channelActive(ctx);//传播事件
}

这里相当于一个任务的生产者.任务的执行就委托给io线程了.具体看SingleThreadEventExecutor里面的逻辑

//重要的入口,会开启定时任务
private void initialize(ChannelHandlerContext ctx) {
    // Avoid the case where destroy() is called before scheduling timeouts.
    // See: https://github.com/netty/netty/issues/143
    switch (state) {
        case 1:
        case 2:
            return;
    }

    state = 1;
    initOutputChanged(ctx);
        //
    lastReadTime = lastWriteTime = ticksInNanos();
    if (readerIdleTimeNanos > 0) {
        readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
                readerIdleTimeNanos, TimeUnit.NANOSECONDS);
    }
    if (writerIdleTimeNanos > 0) {
        writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
                writerIdleTimeNanos, TimeUnit.NANOSECONDS);
    }
    if (allIdleTimeNanos > 0) {
        allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
                allIdleTimeNanos, TimeUnit.NANOSECONDS);
    }
}

获取execut然后执行schedule.

ScheduledFuture<?> schedule(ChannelHandlerContext ctx, Runnable task, long delay, TimeUnit unit) {
    //从cts获取执行器.然后调度.
    return ctx.executor().schedule(task, delay, unit);
}

AbstractScheduledEventExecutor里.

//调度.
private <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
    //当前的线程是否eventLoop里的线程.
    if (inEventLoop()) {
        scheduleFromEventLoop(task);
    } else {//什么时候会出现执行的线程跟绑定的线程不一致呢?
        //获取deadline
        final long deadlineNanos = task.deadlineNanos();
        // task will add itself to scheduled task queue when run if not expired
        if (beforeScheduledTaskSubmitted(deadlineNanos)) {
            execute(task);
        } else {
            lazyExecute(task);
            // Second hook after scheduling to facilitate race-avoidance
            if (afterScheduledTaskSubmitted(deadlineNanos)) {
                execute(WAKEUP_TASK);
            }
        }
    }
    return task;
}

就到了io线程的

IdleStateHandler

这个类继承了ChannelDuplexHandler.(这个类细节比较多,后面单独拎出来讲一下)

ChannelHandler implementation which represents a combination out of a ChannelInboundHandler and the ChannelOutboundHandler. It is a good starting point if your ChannelHandler implementation needs to intercept operations and also state updates.

内部类

AbstractIdleTask
private abstract static class AbstractIdleTask implements Runnable {
        //需要ctx来获取executor.
    private final ChannelHandlerContext ctx;

    AbstractIdleTask(ChannelHandlerContext ctx) {
        this.ctx = ctx;
    }
        //模板的run方法.
    @Override
    public void run() {
        if (!ctx.channel().isOpen()) {
            return;
        }
        run(ctx);
    }

    protected abstract void run(ChannelHandlerContext ctx);
}

下面的三个类无非就是实现自己的run方法.把变化的点抽象出来,由子类来实现.其实也就是初始化的时间的不同,其他都是一样的.当chanelIdle的时候,就会fireUserEventTriggered,这时候就完成一次超时的处理了.

AllIdleTimeoutTask
@Override
protected void run(ChannelHandlerContext ctx) {
    long nextDelay = allIdleTimeNanos;
    if (!reading) {
        nextDelay -= ticksInNanos() - Math.max(lastReadTime, lastWriteTime);
    }
    if (nextDelay <= 0) {
        // Both reader and writer are idle - set a new timeout and
        // notify the callback.
        allIdleTimeout = schedule(ctx, this, allIdleTimeNanos, TimeUnit.NANOSECONDS);

        boolean first = firstAllIdleEvent;
        firstAllIdleEvent = false;

        try {
            if (hasOutputChanged(ctx, first)) {
                return;
            }

            IdleStateEvent event = newIdleStateEvent(IdleState.ALL_IDLE, first);
            channelIdle(ctx, event);
        } catch (Throwable t) {
            ctx.fireExceptionCaught(t);
        }
    } else {
        // Either read or write occurred before the timeout - set a new
        // timeout with shorter delay.
        allIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
    }
}
ReaderIdleTimeoutTask
@Override
protected void run(ChannelHandlerContext ctx) {
    //下一次超时的时间.
    long nextDelay = readerIdleTimeNanos;
    //刚开始读的时候是true,读完变成false.
    if (!reading) {
        //下一次超时的时间减去当前时间和上一次读的时间差. why?
        nextDelay -= (ticksInNanos() - lastReadTime);
    }
        //如果小于0,已经超时了
    if (nextDelay <= 0) {
        // Reader is idle - set a new timeout and notify the callback.
        readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);
                //
        boolean first = firstReaderIdleEvent;
        //
        firstReaderIdleEvent = false;
                //
        try {
                        //创建一个event
            IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
            channelIdle(ctx, event);
        } catch (Throwable t) {
            ctx.fireExceptionCaught(t);
        }
    } else {
        // Read occurred before the timeout - set a new timeout with shorter delay.
        readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
    }
}

channel空闲了.fireUserEventTriggered,而我们的业务处理刚好实现了这个方法.

    /**
     * Is called when an {@link IdleStateEvent} should be fired. This implementation calls
     * {@link ChannelHandlerContext#fireUserEventTriggered(Object)}.
     */
    protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
        //see.
        ctx.fireUserEventTriggered(evt);
    }
WriterIdleTimeoutTask
@Override
protected void run(ChannelHandlerContext ctx) {

    long lastWriteTime = IdleStateHandler.this.lastWriteTime;
    long nextDelay = writerIdleTimeNanos - (ticksInNanos() - lastWriteTime);
    //已经超时.
    if (nextDelay <= 0) {
        // Writer is idle - set a new timeout and notify the callback.
        writerIdleTimeout = schedule(ctx, this, writerIdleTimeNanos, TimeUnit.NANOSECONDS);

        boolean first = firstWriterIdleEvent;
        firstWriterIdleEvent = false;

        try {
            if (hasOutputChanged(ctx, first)) {
                return;
            }
                        //
            IdleStateEvent event = newIdleStateEvent(IdleState.WRITER_IDLE, first);
            //已经idle了.
            channelIdle(ctx, event);
        } catch (Throwable t) {
            ctx.fireExceptionCaught(t);
        }
    } else {
        // Write occurred before the timeout - set a new timeout with shorter delay.
        writerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
    }
}

ReadTimeoutHandler

ReadTimeoutHandler extends IdleStateHandler

读超时的handler,会报ReadTimeoutException的错,当一定时间内没有读到数据.

WriteTimeoutHandler

当写操作不能在一定的时间完成的化,报WriteTimeoutException错.

这个类并没有继承IdleStateHandler,就不在这里讲了,有兴趣的可以去看看,也很简单.

IdleStateEvent

可以理解为netty内部把这个空闲状态事件封装好了,传个最终的业务调用方法.

源码没什么特殊的逻辑就不贴了.

IdleState

简单的枚举值.

public enum IdleState {
    /**
     * No data was received for a while.
     */
    READER_IDLE,
    /**
     * No data was sent for a while.
     */
    WRITER_IDLE,
    /**
     * No data was either received or sent for a while.
     */
    ALL_IDLE
}
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容