Netty理论三:Netty线程模型

1、Reactor模式:NIO网络框架的典型模式

Reactor是网络编程中的一种设计模式,reactor会解耦并发请求的服务并分发给对应的事件处理器来处理。目前,许多流行的开源框架都用到了reactor模式,如:netty、node.js、Cindy等,包括java的nio。

何为Reactor线程模型?

Reactor模式是事件驱动的,有一个或多个并发输入源,有一个Service Handler,有多个Request Handlers;这个Service Handler会同步的将输入的请求(Event)多路复用的分发给相应的Request Handler

image.png

Reactor模式的三种形式
1、Reactor 单线程模式:

image.png

这种实现方式,和第一章java NIO中单线程NIO实现是一样的,一个Reactor处理所有的事情。

image.png

2、Reactor 多线程模式:
编解码及业务处理使用线程池,这样的话,可以避免IO阻塞(IO阻塞的代价是非常大的)。

image.png
image.png

3、Reactors 主从模式:
把Reactor分为两个,一个负责接收,一个负责读写,业务处理可以用线程池,在服务端启动时配置(也可以选择不用线程池,这个看具体业务需求)

image.png
image.png

2、Netty中如何使用Reactor模式

  • 单线程Reactor 模式
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
ServerBootstrap serverBootstrap = new ServerBootstrap().group(bossGroup ,bossGroup )
  • 多线程 Reactor 模式
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap().group(bossGroup ,workerGroup )
//Handler使用线程池进行处理
  • 主从Reactors 模式(官方推荐)
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap().group(bossGroup ,workerGroup )

EventLoopGroup初始化是创建创建两个NioEventLoopGroup类型的Reactor线程池bossGroup和workGroup分别用来处理客户端的连接请求(bossGroup)和通道IO事件(workerGroup);
注:new NioEventLoopGroup()默认创建cpu核数*2的线程数

主从模式的好处有:
1、业务解耦:一个reactor用来处理客户端连接,一个reactor用来处理业务
2、安全性:业务解耦以后,就可以在bossGroup中做一些SSL校验、ip黑名单、登录之类的安全性校验
3、性能提升:只有通过安全性校验的客户端才能继续进行业务处理,这样也能提升处理性能,否则大量的无效客户端接入和正常的业务处理混杂在一起,影响业务处理性能。
使用demo:

serverBootstrap.group(boss,worker).handler(new RuleBasedIpFilter()).childHandler(new NettyServerInitializer(this.factoryCode));

3、Netty EventLoop源码解析

1、NioEventLoopGroup整体结构

image.png

EventExecutorGroup视图

image.png

new NioEventLoopGroup源码

protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
        if (nThreads <= 0) {
            throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
        }

        if (executor == null) {
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        }
        //EventExecutorGroup里面有一个EventExecutor数组,保存了多个EventExecutor;
        children = new EventExecutor[nThreads];

        for (int i = 0; i < nThreads; i ++) {
            boolean success = false;
            try {
                //初始化EventExecutor数组,数组是NioEventLoop,见下面
                children[i] = newChild(executor, args);
                success = true;
            } catch (Exception e) {
                // TODO: Think about if this is a good exception type
                throw new IllegalStateException("failed to create a child event loop", e);
            } finally {
                if (!success) {
                    for (int j = 0; j < i; j ++) {
                        children[j].shutdownGracefully();
                    }

                    for (int j = 0; j < i; j ++) {
                        EventExecutor e = children[j];
                        try {
                            while (!e.isTerminated()) {
                                e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                            }
                        } catch (InterruptedException interrupted) {
                            // Let the caller handle the interruption.
                            Thread.currentThread().interrupt();
                            break;
                        }
                    }
                }
            }
        }

        //EventExecutorChooser.next()定义选择EventExecutor的策略;
        chooser = chooserFactory.newChooser(children);

        final FutureListener<Object> terminationListener = new FutureListener<Object>() {
            @Override
            public void operationComplete(Future<Object> future) throws Exception {
                if (terminatedChildren.incrementAndGet() == children.length) {
                    terminationFuture.setSuccess(null);
                }
            }
        };

        for (EventExecutor e: children) {
            e.terminationFuture().addListener(terminationListener);
        }

        Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
        Collections.addAll(childrenSet, children);
        readonlyChildren = Collections.unmodifiableSet(childrenSet);
    }

    @Override
    public EventExecutor next() {
        return chooser.next();
    }

NioEventLoopGroup.class

    @Override
    protected EventLoop newChild(Executor executor, Object... args) throws Exception {
        return new NioEventLoop(this, executor, (SelectorProvider) args[0],
            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
    }

NioEventLoop.class

    NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
        super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
        if (selectorProvider == null) {
            throw new NullPointerException("selectorProvider");
        }
        if (strategy == null) {
            throw new NullPointerException("selectStrategy");
        }
        provider = selectorProvider;
        final SelectorTuple selectorTuple = openSelector();
        //创建selector
        selector = selectorTuple.selector;
        unwrappedSelector = selectorTuple.unwrappedSelector;
        selectStrategy = strategy;
    }
  • EventExecutorGroup里面有一个EventExecutor数组,保存了多个EventExecutor(NIOEventLoop);
  • EventExecutorGroup是不干什么事情的,当收到一个请后,他就调用next()获得一个它里面的EventExecutor,再调用这个executor的方法;
  • EventExecutorChooserFactory.EventExecutorChooser.next()定义选择EventExecutor的策略(有两种,都是轮询);

2、NioEventLoopGroup创建分析

bossGroup

image.png

注:从图中可以看出,一个NioEventLoopGroup中包含多个NioEventLoop,一个NioEventLoop中包含一个Selector,Selector监听NioServerSocketChannel,当NioServerSocketChannel上有客户端channel连接后,触发Acceptor事件,在ServerBootstrapAcceptor handler中转发给workGroup

workerGroup

image.png

当客户端channel初次连接时,将其注册到workGroup中的NioEventLoop上(通过EventExecuorChooser.next()获取workGroup中的一个NioEventLoop),然后NioEventLoop中的Selector不断轮询其所管理的NioSocketChannel,如果其中有读写事件准备好,则由DefaultChannelPipeline处理。

3、ServerBootstrap启动流程分析

image.png

4、ServerBootstrap执行流程分析

image.png
        // 配置服务端的NIO线程组
        // 主线程组, 用于接受客户端的连接,但是不做任何具体业务处理,像老板一样,
        //负责接待客户,不具体服务客户
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        // 工作线程组, 老板线程组会把任务丢给他,让手下线程组去做任务,服务客户
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap(); // (2)
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class) // (3)
             //欲加到NioServerSocketChannel Pipeline的handler
             .handler(new LoggingHandler(LogLevel.INFO))
              //欲加到NioSocketChannel(accept()返回的)Pipeline的handler
             .childHandler(new ChannelInitializer<SocketChannel>() { // (4)
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ch.pipeline().addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
                     ch.pipeline().addLast("decoder", new StringDecoder());
                     ch.pipeline().addLast("encoder", new StringEncoder());
                     ch.pipeline().addLast(new EchoServerHandler());
 
                 }
             })
             .option(ChannelOption.SO_BACKLOG, 128)          // (5)
             .childOption(ChannelOption.SO_KEEPALIVE, true); // (6)

            // 绑定端口,开始接收进来的连接
            ChannelFuture f = b.bind(port).sync(); // (7)
image.png
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,456评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,370评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,337评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,583评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,596评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,572评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,936评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,595评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,850评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,601评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,685评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,371评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,951评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,934评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,167评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,636评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,411评论 2 342

推荐阅读更多精彩内容