Netty源码分析8 - 客户端 connect 原理

// 1. 配置 EventLoopGroup
final static EventLoopGroup group = new NioEventLoopGroup();
public static void main(String[] args) throws Exception {
    // 2. 创建并配置客户端启动辅助类 Bootstrap
    Bootstrap b = new Bootstrap();
    b.group(group)
        .channel(NioSocketChannel.class)
        .option(ChannelOption.TCP_NODELAY, true)
        .handler(new ChannelInitializer<SocketChannel>() {
            @Override
            public void initChannel(SocketChannel ch) throws Exception {
                ChannelPipeline p = ch.pipeline();
                p.addLast(new EchoClientHandler());
            }
        });

    // 4. 阻塞连接服务端
    ChannelFuture f = b.connect("127.0.0.1", 8081).sync();
    f.channel().closeFuture().addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            group.shutdownGracefully();
        }
    });
}

一、代码执行流程梯形图

/*********************************************** 1. 创建 NioEventLoopGroup ***********************************************/
/*********************************************** 2. 创建并设置 Bootstrap ***********************************************/
new Bootstrap()
--> Map<ChannelOption<?>, Object> options = new LinkedHashMap<>()
--> Map<AttributeKey<?>, Object> attrs = new LinkedHashMap<>()
--> BootstrapConfig config = new BootstrapConfig(this)
// 设置 group
AbstractBootstrap.group(group)
--> EventLoopGroup group = group
// 设置 channel
AbstractBootstrap.channel(NioSocketChannel.class)
--> ChannelFactory channelFactory = new ReflectiveChannelFactory(Class<? extends T> clazz) // 设置channel创建工厂,反射建立 channel
// 设置 option
AbstractBootstrap.option(ChannelOption.SO_BACKLOG, 100)
--> Map<ChannelOption<?>, Object> options.put
// 设置 handler
AbstractBootstrap.handler(new ChannelInitializer<SocketChannel>{})
--> ChannelInitializer channelInitializer = new ChannelInitializer<SocketChannel>(){}
--> ChannelHandler handler = channelInitializer

/*********************************************** 3. connect ***********************************************/
Bootstrap.connect(String inetHost, int inetPort)
--> doResolveAndConnect(SocketAddress remoteAddress, SocketAddress localAddress) // localAddress=null
  --> ChannelFuture regFuture = initAndRegister()
    /********** 3.1 创建 NioSocketChannel *********/
    --> Channel channel = channelFactory.newChannel() // channelFactory=ReflectiveChannelFactory
      --> new NioSocketChannel()
        --> newSocket(SelectorProvider provider)
          --> provider.openSocketChannel() // 创建 java.nio.channels.SocketChannel
        --> new NioSocketChannel(null, ch) // parent = null ch = SocketChannel
          --> AbstractChannel(Channel parent) // parent=NioServerSocketChannel
            --> CloseFuture closeFuture = new CloseFuture(this)
            --> ChannelId id = DefaultChannelId.newInstance()
            --> Unsafe unsafe = new NioSocketChannelUnsafe()    // 每一个 Channel 都有一个 Unsafe 对象
              --> ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this)
            --> DefaultChannelPipeline pipeline = new DefaultChannelPipeline(this) // 每一个 Channel 都有一个 ChannelPipeline 对象
              --> this.channel = = this
              --> AbstractChannelHandlerContext tail = new TailContext(this)
                --> boolean inbound = true
                --> boolean outbound = false
                --> this.pipeline = pipeline
              --> AbstractChannelHandlerContext head = new HeadContext(this)
                --> boolean inbound = true
                --> boolean outbound = true
                --> this.pipeline = pipeline
                --> Unsafe unsafe = pipeline.channel().unsafe() // NioSocketChannelUnsafe
              --> head<->tail 组建双链表  // 每一个 ChannelPipeline 对象都有一条 ChannelHandlerContext 组成的双向链表,每一个handler都由ChannelHandlerContext包裹
          --> SelectableChannel ch = channel // channel = java.nio.channels.SocketChannel
          --> int readInterestOp = 1 // SelectionKey.OP_READ
          --> ch.configureBlocking(false) // 配置 SocketChannel 为非阻塞
          --> NioSocketChannelConfig config = new NioSocketChannelConfig(this, socket.socket()) // tcp 参数配置类
            --> RecvByteBufAllocator rcvBufAllocator = new AdaptiveRecvByteBufAllocator()
            --> ByteBufAllocator allocator = PooledByteBufAllocator(preferDirect = true)
            --> int connectTimeoutMillis = 30000
            --> WriteBufferWaterMark writeBufferWaterMark = WriteBufferWaterMark.DEFAULT // low=32 * 1024 hign=64 * 1024
            --> Channel channel = this,即 NioServerSocketChannel
            --> setTcpNoDelay(true) // 只要不是安卓就设置 SocketChannel.tcpNoDelay 为 true
    /********** 3.2 初始化 NioSocketChannel 属性 *********/
    --> Bootstrap.init(Channel channel)
      --> channel.pipeline().addLast(config.handler()) // 添加 handler(ChannelInitializer)
      --> channel.config().setOption((ChannelOption<Object>) option, value) // SocketChannelConfig 设置 option 配置
      --> channel.attr(key).set(e.getValue()) // 设置 attr
    /********** 3.3 执行注册:此时启动Nio线程 + 注册 channel 到 selector *********/
    --> ChannelFuture MultithreadEventLoopGroup.register(Channel channel) // channel=NioSocketChannel
      --> EventExecutor eventLoop = PowerOfTwoEventExecutorChooser.next()
      --> SingleThreadEventLoop.register(Channel channel)
        --> new DefaultChannelPromise(channel, this)
          --> EventExecutor executor = eventLoop
          --> Channel channel = NioSocketChannel
        --> promise.channel().unsafe().register(this, promise) // this=eventLoop
          --> AbstractChannel$AbstractUnsafe.register(this, promise)
            --> EventLoop eventLoop = this  // 同一个 channel 只由一个 EventLoop 来处理
            --> 将 register0(promise) 封装为 task,丢入 eventLoop 任务队列
            --> SingleThreadEventExecutor.execute(Runnable task) // task register0
              --> addTask(Runnable task)
                --> offerTask(Runnable task)
                  --> taskQueue.offer(task) // 如果添加失败(例如队列容量满了),则使用回绝处理器,此处是抛出RejectedExecutionException
              --> startThread()
                --> 封装线程启动task
                  // 该task 是在下一步创建线程并启动之后执行的
                  --> NioEventLoop.Thread thread = 下一步创建出来的线程
                  --> NioEventLoop.run()
                    --> processSelectedKeys() // 处理 NIO 感兴趣的事件
                    --> runAllTasks(long timeoutNanos) // 处理队列中的任务
                      --> fetchFromScheduledTaskQueue() // 将到期的 scheduledTaskQueue 中的 task 加入 taskQueue
                      --> Runnable task = pollTask() // taskQueue.poll()
                      --> safeExecute(task) // task.run() 此时会执行 register0 任务
                      --> afterRunningAllTasks()
                        --> runAllTasksFrom(tailTasks) // 执行 tailTasks 中的所有 task
                --> ThreadPerTaskExecutor.execute(Runnable command) // threadFactory.newThread(command).start() 创建线程并启动
              --> wakeup(boolean inEventLoop) // inEventLoop=false
                --> selector.wakeup() // 唤醒阻塞
  --> Channel channel = regFuture.channel()
  --> ChannelPromise promise = channel.newPromise()
    --> pipeline.newPromise()
      --> new DefaultChannelPromise(channel)
  --> doResolveAndConnect0(channel, remoteAddress, localAddress, promise)
    --> doConnect(remoteAddress, localAddress, connectPromise)
      --> 将 channel.connect 封装为 task,丢入 eventLoop 任务队列
      --> channel.eventLoop().execute(connect 任务) // connect task


register0 task
--> doRegister()
  --> selectionKey = javaChannel().register(Selector sel, // eventLoop().unwrappedSelector()
                                            int ops,      // 0
                                            Object att)   // this
--> pipeline.invokeHandlerAddedIfNeeded() // 会执行一些 handlerAdded() 事件
  --> callHandlerAddedForAllHandlers() // 执行 pendingHandlerCallback 链表
    --> EchoClient$handler.initChannel(final Channel ch)  
      --> pipeline.addLast(new EchoClientHandler())  // 添加 EchoClientHandler
    --> 从 pipeline 删除 EchoClient$handler(因为此时的 EchoClient$handler 的职责已经结束)
--> pipeline.fireChannelRegistered() // 会执行一些 channelRegister() 事件


connect task
--> connect(SocketAddress remoteAddress, ChannelPromise promise)
  --> pipeline.connect(SocketAddress remoteAddress, ChannelPromise promise)
    --> AbstractNioUnsafe.connect(remoteAddress, localAddress, promise)
      --> doConnect(remoteAddress, localAddress)
        --> boolean connected = socketChannel.connect(remoteAddress) // nio, 如果没有立即连接成功,则 connected = false,则需要设置OP_CONNECT
          --> selectionKey().interestOps(SelectionKey.OP_CONNECT)
      --> ScheduledFuture<?> connectTimeoutFuture = eventLoop().schedule(new Runnable() {...}) // 设置连接超时任务,后续若在超时时间内连接成功,则直接connectTimeoutFuture.cancel()



NioEventLoop.run()
--> processSelectedKeysOptimized()
  // 遍历 selectedKeys
  --> selectionKey.attachment() // NioSocketChannel
  --> processSelectedKey(SelectionKey k, AbstractNioChannel ch) // ch = NioSocketChannel
    --> NioUnsafe unsafe = ch.unsafe() // unsafe 实例 = NioSocketchannelUnsafe
    // 判断 k 的键值,此处是 CONNECT
    --> AbstractNioUnsafe.finishConnect()
      --> NioSocketChannel.doFinishConnect() // 检测是否完成连接 javaChannel().finishConnect(),若没有,直接抛出 Error,
      --> AbstractNioUnsafe.fulfillConnectPromise(ChannelPromise promise, boolean wasActive)
        --> pipeline().fireChannelActive()
          --> HeadContext.channelActive(ChannelHandlerContext ctx)
            --> ctx.fireChannelActive() // 传播执行 channelActive() 事件
            --> readIfIsAutoRead()
              --> AbstractChannel.read()
                --> pipeline.read()
                  --> TailContext.read()
                    --> unsafe.beginRead() // unsafe = AbstractUnsafe
                      --> AbstractNioChannel.doBeginRead()
                        --> selectionKey.interestOps(interestOps | readInterestOp) // 绑定 OP_READ 事件
      --> connectTimeoutFuture.cancel // 取消连接超时任务    

总结:

Netty源码分析6 - 服务端启动流程 中,服务端启动成功,客户端就可以进行连接操作了。

总结:

  1. 创建 NioEventLoopGroup
  2. 创建并设置 Bootstrap
  • options、attrs、handler 都是针对 SocketChannel 起作用的;(客户端只有一种 Channel)
  • 设置 channel 时,创建了 ReflectiveChannelFactory,用于反射创建 NioSocketChannel
  1. 连接操作

第一步与 bind 操作一样,都是执行 initAndRegister()

  • 使用 ReflectiveChannelFactory 反射创建 NioSocketChannel
  • 创建 java.nio.channels.SocketChannel,NioSocketChannel 是其包装类
  • 为 NioSocketChannel 设置唯一ID
  • 为 NioSocketChannel 设置 Unsafe 实例(客户端是 NioSocketChannelUnsafe),Unsafe 是真正的进行底层 IO 操作的类 - 每一个 Channel 都有一个 Unsafe 对象
  • 为 NioSocketChannel 设置 ChannelPipeline 对象 - 每一个 Channel 都有一个 ChannelPipeline 对象,每一个 ChannelPipeline 都包含一条由 ChannelHandlerContext 组成的双向链表(这条双向链表至少有两个节点 HeadContext 和 TailContext),每个 ChannelHandlerContext 内部都包裹着一个 ChannelHandler。
  • 设置 java.nio.channels.SocketChannel 为非阻塞
  • 记录感兴趣事件为 OP_READ 事件
  • 只要不是安卓就设置 SocketChannel.tcpNoDelay 为 true
  • 初始化 NioSocketChannel 属性
  • 将 EchoClient$handler(是一个 ChannelInitialzer 类) 添加到 NioSocketChannel 的 pipeline 中
  • 设置 options、attrs 到 NioSocketChannel
  • 执行注册:此时启动Nio线程 + 注册 channel 到 selector
  • 使用 NioEventLoopGroup 的线程选择器从 group 中选出一个 NioEventLoop X
  • 最终调用 NioSocketChannelUnsafe 执行注册,由于当前执行线程不是当前 eventLoop 的那条 NioEventLoop 线程,所以创建注册任务,并加入 X 的 taskQueue 中,然后创建线程,赋值给 X 的 thread 属性,之后启动线程,执行 NIO 死循环
  • NIO 死循环会按照 ioRatio 计算出来的时间比分别执行 “处理 NIO 感兴趣的事件” 和 “处理队列中的任务”(会将到期的 scheduledTaskQueue 中的 task 加入 taskQueue,之后统一从 taskQueue pollTask),此时会执行注册任务
  • 注册任务:
  • 将 NioSocketChannel 中的 java.nio.channels.SocketChannel 注册到 X 的 Selector 上,选择键为0(此时不监听任何事件),attachment 为 NioSocketChannel 本身
  • pipeline.invokeHandlerAddedIfNeeded() 会执行到 EchoClient$handler,首先执行其 initChannel,此时将业务逻辑处理器 ClientHandler 添加到 ChannelPipeline,如下 HeadContext <-> EchoClient$childHandler <-> ClientHandler <-> TailContext,最后删除 EchoClient$childHandler,最终的 ChannelPipeline 链是 HeadContext <-> ClientHandler <-> TailContext
  • 注册完毕之后 执行 channelRegister() 事件
  • connect 连接
  • 与注册操作一样连接操作也是非当前 Channel 所属的 NioEventLoop 线程发起的,所以也要封装为任务,加入到 X 的 taskQueue 中(每加入队列一个任务,都会做一次 selector.wakeup 操作,起到及时执行任务的作用)
  • 连接任务的内容:pipeline.connect(SocketAddress remoteAddress, ChannelPromise promise),最终执行到 AbstractNioUnsafe.connect(remoteAddress, localAddress, promise)
  • 执行 java 底层的 NIO 连接操作:socketChannel.connect(remoteAddress),如果没有立即 connect 成功,则设置当前的 socketChannel 的兴趣键为 OP_CONNECT,并且创建连接超时任务并添加到 eventLoop 的 scheduledTaskQueue(此时会创建优先队列,赋值给 scheduledTaskQueue,后续如果在连接超时时间之内连接成功,连接超时任务会被取消);如果立即连接成功,则执行 AbstractNioUnsafe.fulfillConnectPromise(ChannelPromise promise, boolean wasActive)
  • 为 NioServerSocketChannel 设置感兴趣的监听键为创建 NioServerSocketChannel 时所存储的 ACCEPT 事件

注册连接事件之后,客户端被选取的 NioEventLoop 同样死循环 select() 和 runAllTask(),当到来的感兴趣事件是 OP_CONNECT 时,则首先从 selectedKey.attachment() 获取在注册的时候注册上去的 Channel,之后从 Channel 中获取 Unsafe,最终执行 AbstractNioUnsafe.finishConnect():

  • 检测是否完成连接 javaChannel().finishConnect(),若没有,直接抛出 Error
  • 执行 AbstractNioUnsafe.fulfillConnectPromise(ChannelPromise promise, boolean wasActive)
  • pipeline.fireChannelActive()
  • 执行 channelActive() 事件
  • 绑定 OP_READ 事件到 SocketChannel 上
  • 取消连接超时任务

总结:pipeline.fireChannelActive() 的执行时机:

  • 服务端 bind 完成之后,会触发 channelActive,此时配置的是 OP_ACCEPT 事件
  • 服务端在 SocketChannel 注册完成之后,会触发 channelActive ,此时配置的是 OP_READ 事件
  • 客户端在 SocketChannel 连接完成之后,会触发 channelActive ,此时配置的是 OP_READ 事件
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,923评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,154评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,775评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,960评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,976评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,972评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,893评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,709评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,159评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,400评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,552评论 1 346
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,265评论 5 341
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,876评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,528评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,701评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,552评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,451评论 2 352

推荐阅读更多精彩内容