Netty源码分析(五)EventLoop

如果说Netty的核心是它的reactor模式,那么EventLoop就是reactor的核心。通过EventLoop的轮询,netty能够高效的在任务中切换。前面几节都讲的Nio相关类,这里就以NioEventLoop为核心分析下,它是如何执行的。

NioEventLoop的继承关系

image.png

本质上,EventLoop还是一个Executor,既然是Executor,那么就从execute()函数看起

@Override
public void execute(Runnable task) {
    if (task == null) {
        throw new NullPointerException("task");
    }
    
    //判断eventLoop线程是否启动
    boolean inEventLoop = inEventLoop();
        if (inEventLoop) {
            addTask(task);
    } else {
            startThread();//启动eventLoop线程
            addTask(task);
            if (isShutdown() && removeTask(task)) {
                reject();
            }
    }

    if (!addTaskWakesUp && wakesUpForTask(task)) {
        wakeup(inEventLoop);
    }
}
private void startThread() {
      //判断线程状态
    if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
        if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
            doStartThread();
        }
    }
}

private void doStartThread() {
    ...
    executor.execute(new Runnable() {
        @Override
        public void run() {
            thread = Thread.currentThread();//当前线程作为整个EventLoop的主线程
            ...
            try {
                SingleThreadEventExecutor.this.run();
                success = true;
            } catch (Throwable t) {
                ...
            } finally {
                ...
                try {
                    // Run all remaining tasks and shutdown hooks.
                    // 执行剩余的所有任务
                    for (;;) {
                          if (confirmShutdown()) {
                                break;
                          }
                    }
                } finally {
                        try {
                            cleanup();
                        } finally {
                            ...
                        }
                    }
                }
            }
        });
    }

@Override
protected void run() {
        for (;;) {
            try {
                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                    case SelectStrategy.CONTINUE:
                        continue;
                    case SelectStrategy.SELECT:
                        select(wakenUp.getAndSet(false));
                        ...
                }

                cancelledKeys = 0;
                needsToSelectAgain = false;
                final int ioRatio = this.ioRatio;
               //轮询完SelectKey后执行task
               //ioRatio用于设置执行task时间,其语义是io执行时间与任务执行时间之比,如果ioRatio是50(默认),则表示任务执行时间是io执行时间的一半
                if (ioRatio == 100) {
                    try {
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        runAllTasks();
                    }
                } else {
                    final long ioStartTime = System.nanoTime();
                    try {
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        final long ioTime = System.nanoTime() - ioStartTime;
                        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
            ...
        }
    }

整个run函数可以抽象为select-> processSelectedKeys->runTasks->select...

再看select()做了什么:

private void select(boolean oldWakenUp) throws IOException {
  Selector selector = this.selector;
  try {
    int selectCnt = 0;//计数器
    long currentTimeNanos = System.nanoTime();
    
    //最近一个任务的截止时间
    long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
    for (;;) {
      long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
      if (timeoutMillis <= 0) {//如果截止时间小于0.5ms
        if (selectCnt == 0) {//并且一次select都没有执行过,那么执行一次select并中断
          selector.selectNow();
          selectCnt = 1;
        }
        break;
      }
      //如果有新增任务,则执行select并中断
      if (hasTasks() && wakenUp.compareAndSet(false, true)) {
        selector.selectNow();
        selectCnt = 1;
        break;
      }
      

      //阻塞式select,超时时间为最近一个任务截止时间
      int selectedKeys = selector.select(timeoutMillis);
      selectCnt ++;
      //如果有事件上报或有任务/定时任务则退出
      if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
        break;
      }
       ...

      long time = System.nanoTime();
      if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
      // timeoutMillis elapsed without anything selected.
        selectCnt = 1;
      } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
        selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
          ...
          //用于解决nio空轮询bug
          //如果select次数超过阈值则重新生成selector
          rebuildSelector();
           selector = this.selector;
           // Select again to populate selectedKeys.
          selector.selectNow();
          selectCnt = 1;
          break;
        }

        currentTimeNanos = time;
      }
      ...
    } catch (CancelledKeyException e) {
      ...
    }
  }

protected long delayNanos(long currentTimeNanos) {
  ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
  if (scheduledTask == null) {
    return SCHEDULE_PURGE_INTERVAL;
  }

  return scheduledTask.delayNanos(currentTimeNanos);
}

processSelectedKey用于具体事件操作

int readyOps = k.readyOps();
//connect事件
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
    int ops = k.interestOps();
    ops &= ~SelectionKey.OP_CONNECT;
    k.interestOps(ops);
    unsafe.finishConnect();
}

//写事件
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
    ch.unsafe().forceFlush();
}

//读事件
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
    unsafe.read();
    if (!ch.isOpen()) {
      return;
    }
}

runAllTasks用于执行所有可执行任务,包括定时任务

//这里说明runAllTasks(long timeoutNanos)多了一个截止时间,如果运行超过该时间自动中断
protected boolean runAllTasks() {
    assert inEventLoop();
    boolean fetchedAll;
    boolean ranAtLeastOne = false;

    do {
        //找到所有定时任务中可以执行的
        fetchedAll = fetchFromScheduledTaskQueue();
        //执行所有任务
        if (runAllTasksFrom(taskQueue)) {
            ranAtLeastOne = true;
        }
    } while (!fetchedAll);
    if (ranAtLeastOne) {
        lastExecutionTime = ScheduledFutureTask.nanoTime();
    }
    afterRunningAllTasks();
    return ranAtLeastOne;
}

//把所有可执行的定时任务放到非定时任务的队列中
private boolean fetchFromScheduledTaskQueue() {
    long nanoTime = AbstractScheduledEventExecutor.nanoTime();
    Runnable scheduledTask  = pollScheduledTask(nanoTime);
    while (scheduledTask != null) {
        if (!taskQueue.offer(scheduledTask)) {//如果放入taskQueue失败,则重新放回定时任务队列,返回false中断,等下次循环继续执行
            scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);
            return false;
        }
        scheduledTask  = pollScheduledTask(nanoTime);
    }
    return true;
}

总结下
1,EventLoop本身是个线程池,拥有线程池属性。EventLoop可以包含定时任务和非定制任务2种
2,EventLoop本身也是一个循环线程,该线程流程是
1)捕捉是否有io事件上报
2)处理这些事件
3)处理线程池中业务逻辑任务
4)返回第一步

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,686评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,668评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,160评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,736评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,847评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,043评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,129评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,872评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,318评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,645评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,777评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,470评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,126评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,861评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,095评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,589评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,687评论 2 351

推荐阅读更多精彩内容