Netty之IdleStateHandler源码阅读

如何使用

1.我们构造netty服务端的时候,在childHandler里,先获取到pipeline,然后p.addLast(new IdleStateHandler(30, 0, 0, TimeUnit.SECONDS));

p.addLast(new IdleStateHandler(30, 0, 0, TimeUnit.SECONDS));
public IdleStateHandler(long readerIdleTime, long writerIdleTime,
                        long allIdleTime, TimeUnit unit) {
    this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);
}

2.我们还需要写一个handler,来实现超时后需要做的事.netty把超时和超时后任务的触发解耦了.(这是不是观察者模式的具体应用呢?)

HeartBeatServerHandler extends ChannelInboundHandlerAdapter{
    //没有并发问题,因为每个连接都会单独new一个HeartBeatServerHandler对象.
    private int lossConnectCount = 0;
        
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        System.out.println("已经5秒未收到客户端的消息了!");
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;
            if (event.state() == IdleState.READER_IDLE) {
                lossConnectCount++;
                if (lossConnectCount > 2) {
                    System.out.println("关闭这个不活跃通道!");
                    ctx.channel().close();
                }
            }
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //重置计数器
        lossConnectCount = 0;
        System.out.println("client says: " + msg.toString());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

类注释

Triggers an IdleStateEvent when a Channel has not performed read, write, or both operation for a while.

(触发一个IdleStateEvent当一个Channel一段时间内没有进行读或者写,或者读写)

一些问题

假设netty里没有这个相关的功能,需要我们自己设计一个IdleStateHandler,该怎么做呢?

需求分析:当客户端和服务端建立连接的时候,如果客户端一段时间没有操作,读或者写,那么我们就可以自定义的进行一些操作.

问题1.初始化该执行什么操作?

问题2.如何判断超时?

问题3.读写如何分开判断?

问题4.判断超时该用哪个线程?netty的io线程还是自定义线程?

我们带着这些问题来看看netty的设计.

netty的设计

我们先看一下整体的流程.

1.当我们new IdleStateHandler(30, 0, 0, TimeUnit.SECONDS)时,发生了什么?

这个就是把handler加入到pipeline里,后面有io事件触发时,就被handler拦截.然后这个构造器里会初始化一些值.

核心的就是三个时间:

readerIdleTimeNanos

writerIdleTimeNanos

allIdleTimeNanos

  public IdleStateHandler(boolean observeOutput,
                            long readerIdleTime, long writerIdleTime, long allIdleTime,
                            TimeUnit unit) {
        ObjectUtil.checkNotNull(unit, "unit");

        this.observeOutput = observeOutput;
                //会有个和最小时间比较的逻辑.
        if (readerIdleTime <= 0) {
            readerIdleTimeNanos = 0;
        } else {
            readerIdleTimeNanos = Math.max(unit.toNanos(readerIdleTime), MIN_TIMEOUT_NANOS);
        }
        if (writerIdleTime <= 0) {
            writerIdleTimeNanos = 0;
        } else {
            writerIdleTimeNanos = Math.max(unit.toNanos(writerIdleTime), MIN_TIMEOUT_NANOS);
        }
        if (allIdleTime <= 0) {
            allIdleTimeNanos = 0;
        } else {
            allIdleTimeNanos = Math.max(unit.toNanos(allIdleTime), MIN_TIMEOUT_NANOS);
        }
    }

2.客户端第一次创建连接的时候发生了什么?

//IdleStateHandler的channelActive()方法在socket通道建立时被触发
//ctx的传递要搞清楚
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
    // This method will be invoked only if this handler was added
    // before channelActive() event is fired.  If a user adds this handler
    // after the channelActive() event, initialize() will be called by beforeAdd().
    initialize(ctx);//schedule
    super.channelActive(ctx);//传播事件
}

这里相当于一个任务的生产者.任务的执行就委托给io线程了.具体看SingleThreadEventExecutor里面的逻辑

//重要的入口,会开启定时任务
private void initialize(ChannelHandlerContext ctx) {
    // Avoid the case where destroy() is called before scheduling timeouts.
    // See: https://github.com/netty/netty/issues/143
    switch (state) {
        case 1:
        case 2:
            return;
    }

    state = 1;
    initOutputChanged(ctx);
        //
    lastReadTime = lastWriteTime = ticksInNanos();
    if (readerIdleTimeNanos > 0) {
        readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
                readerIdleTimeNanos, TimeUnit.NANOSECONDS);
    }
    if (writerIdleTimeNanos > 0) {
        writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
                writerIdleTimeNanos, TimeUnit.NANOSECONDS);
    }
    if (allIdleTimeNanos > 0) {
        allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
                allIdleTimeNanos, TimeUnit.NANOSECONDS);
    }
}

获取execut然后执行schedule.

ScheduledFuture<?> schedule(ChannelHandlerContext ctx, Runnable task, long delay, TimeUnit unit) {
    //从cts获取执行器.然后调度.
    return ctx.executor().schedule(task, delay, unit);
}

AbstractScheduledEventExecutor里.

//调度.
private <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
    //当前的线程是否eventLoop里的线程.
    if (inEventLoop()) {
        scheduleFromEventLoop(task);
    } else {//什么时候会出现执行的线程跟绑定的线程不一致呢?
        //获取deadline
        final long deadlineNanos = task.deadlineNanos();
        // task will add itself to scheduled task queue when run if not expired
        if (beforeScheduledTaskSubmitted(deadlineNanos)) {
            execute(task);
        } else {
            lazyExecute(task);
            // Second hook after scheduling to facilitate race-avoidance
            if (afterScheduledTaskSubmitted(deadlineNanos)) {
                execute(WAKEUP_TASK);
            }
        }
    }
    return task;
}

就到了io线程的

IdleStateHandler

这个类继承了ChannelDuplexHandler.(这个类细节比较多,后面单独拎出来讲一下)

ChannelHandler implementation which represents a combination out of a ChannelInboundHandler and the ChannelOutboundHandler. It is a good starting point if your ChannelHandler implementation needs to intercept operations and also state updates.

内部类

AbstractIdleTask
private abstract static class AbstractIdleTask implements Runnable {
        //需要ctx来获取executor.
    private final ChannelHandlerContext ctx;

    AbstractIdleTask(ChannelHandlerContext ctx) {
        this.ctx = ctx;
    }
        //模板的run方法.
    @Override
    public void run() {
        if (!ctx.channel().isOpen()) {
            return;
        }
        run(ctx);
    }

    protected abstract void run(ChannelHandlerContext ctx);
}

下面的三个类无非就是实现自己的run方法.把变化的点抽象出来,由子类来实现.其实也就是初始化的时间的不同,其他都是一样的.当chanelIdle的时候,就会fireUserEventTriggered,这时候就完成一次超时的处理了.

AllIdleTimeoutTask
@Override
protected void run(ChannelHandlerContext ctx) {
    long nextDelay = allIdleTimeNanos;
    if (!reading) {
        nextDelay -= ticksInNanos() - Math.max(lastReadTime, lastWriteTime);
    }
    if (nextDelay <= 0) {
        // Both reader and writer are idle - set a new timeout and
        // notify the callback.
        allIdleTimeout = schedule(ctx, this, allIdleTimeNanos, TimeUnit.NANOSECONDS);

        boolean first = firstAllIdleEvent;
        firstAllIdleEvent = false;

        try {
            if (hasOutputChanged(ctx, first)) {
                return;
            }

            IdleStateEvent event = newIdleStateEvent(IdleState.ALL_IDLE, first);
            channelIdle(ctx, event);
        } catch (Throwable t) {
            ctx.fireExceptionCaught(t);
        }
    } else {
        // Either read or write occurred before the timeout - set a new
        // timeout with shorter delay.
        allIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
    }
}
ReaderIdleTimeoutTask
@Override
protected void run(ChannelHandlerContext ctx) {
    //下一次超时的时间.
    long nextDelay = readerIdleTimeNanos;
    //刚开始读的时候是true,读完变成false.
    if (!reading) {
        //下一次超时的时间减去当前时间和上一次读的时间差. why?
        nextDelay -= (ticksInNanos() - lastReadTime);
    }
        //如果小于0,已经超时了
    if (nextDelay <= 0) {
        // Reader is idle - set a new timeout and notify the callback.
        readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);
                //
        boolean first = firstReaderIdleEvent;
        //
        firstReaderIdleEvent = false;
                //
        try {
                        //创建一个event
            IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
            channelIdle(ctx, event);
        } catch (Throwable t) {
            ctx.fireExceptionCaught(t);
        }
    } else {
        // Read occurred before the timeout - set a new timeout with shorter delay.
        readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
    }
}

channel空闲了.fireUserEventTriggered,而我们的业务处理刚好实现了这个方法.

    /**
     * Is called when an {@link IdleStateEvent} should be fired. This implementation calls
     * {@link ChannelHandlerContext#fireUserEventTriggered(Object)}.
     */
    protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
        //see.
        ctx.fireUserEventTriggered(evt);
    }
WriterIdleTimeoutTask
@Override
protected void run(ChannelHandlerContext ctx) {

    long lastWriteTime = IdleStateHandler.this.lastWriteTime;
    long nextDelay = writerIdleTimeNanos - (ticksInNanos() - lastWriteTime);
    //已经超时.
    if (nextDelay <= 0) {
        // Writer is idle - set a new timeout and notify the callback.
        writerIdleTimeout = schedule(ctx, this, writerIdleTimeNanos, TimeUnit.NANOSECONDS);

        boolean first = firstWriterIdleEvent;
        firstWriterIdleEvent = false;

        try {
            if (hasOutputChanged(ctx, first)) {
                return;
            }
                        //
            IdleStateEvent event = newIdleStateEvent(IdleState.WRITER_IDLE, first);
            //已经idle了.
            channelIdle(ctx, event);
        } catch (Throwable t) {
            ctx.fireExceptionCaught(t);
        }
    } else {
        // Write occurred before the timeout - set a new timeout with shorter delay.
        writerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
    }
}

ReadTimeoutHandler

ReadTimeoutHandler extends IdleStateHandler

读超时的handler,会报ReadTimeoutException的错,当一定时间内没有读到数据.

WriteTimeoutHandler

当写操作不能在一定的时间完成的化,报WriteTimeoutException错.

这个类并没有继承IdleStateHandler,就不在这里讲了,有兴趣的可以去看看,也很简单.

IdleStateEvent

可以理解为netty内部把这个空闲状态事件封装好了,传个最终的业务调用方法.

源码没什么特殊的逻辑就不贴了.

IdleState

简单的枚举值.

public enum IdleState {
    /**
     * No data was received for a while.
     */
    READER_IDLE,
    /**
     * No data was sent for a while.
     */
    WRITER_IDLE,
    /**
     * No data was either received or sent for a while.
     */
    ALL_IDLE
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,193评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,306评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,130评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,110评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,118评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,085评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,007评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,844评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,283评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,508评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,667评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,395评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,985评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,630评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,797评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,653评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,553评论 2 352

推荐阅读更多精彩内容