rocketmq源码2-通信-客户端与服务端

一 概述

  • rocketmq的namesrv,broker作为服务端接收请求都是使用NettyRemotingServer处理
  • rocketmq的client,broker作为客户端都是使用NettyRemotingClient和服务端建立连接,然后收发请求报文


    类关系.png

二 NettyRemotingAbstract

2.1 构造函数

  • 初始化semaphoreOneway,限制Oneway请求的并发数
  • 初始化semaphoreAsync,限制Async请求的并发数
public NettyRemotingAbstract(final int permitsOneway, final int permitsAsync) {
    this.semaphoreOneway = new Semaphore(permitsOneway, true);
    this.semaphoreAsync = new Semaphore(permitsAsync, true);
}

2.2 NettyEventExecutor

  • 接受NettyEvent事件,调用事件回调函数
  • 初始化
    protected final NettyEventExecutor nettyEventExecutor = new NettyEventExecutor();
  • 注册事件检测处理函数NettyConnectManageHandler,在不同的channel事件中发送NettyEventType事件
public enum NettyEventType {
    CONNECT,//连接
    CLOSE,//关闭
    IDLE,//空闲
    EXCEPTION//异常
}
  • NettyEventType事件发送,存储在阻塞队列LinkedBlockingQueue中
public void putNettyEvent(final NettyEvent event) {
    this.nettyEventExecutor.putNettyEvent(event);
}

public void putNettyEvent(final NettyEvent event) {
    if (this.eventQueue.size() <= maxSize) {
        this.eventQueue.add(event);
    } else {
        log.warn("event queue size[{}] enough, so drop this event {}", this.eventQueue.size(), event.toString());
    }
}
  • NettyEventType事件处理,不同事件调用对应的listener回调,初始化时注册。final ChannelEventListener listener = NettyRemotingAbstract.this.getChannelEventListener();
switch (event.getType()) {
    case IDLE:
        listener.onChannelIdle(event.getRemoteAddr(), event.getChannel());
        break;
    case CLOSE:
        listener.onChannelClose(event.getRemoteAddr(), event.getChannel());
        break;
    case CONNECT:
        listener.onChannelConnect(event.getRemoteAddr(), event.getChannel());
        break;
    case EXCEPTION:
        listener.onChannelException(event.getRemoteAddr(), event.getChannel());
        break;
    default:
        break;

}

2.3 收包处理

2.3.1 processMessageReceived

  • 收包入口,根据报文类型cmd.getType()是请求报文还是响应报文执行不同处理
public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
    final RemotingCommand cmd = msg;
    if (cmd != null) {
        switch (cmd.getType()) {
            case REQUEST_COMMAND:
                processRequestCommand(ctx, cmd);
                break;
            case RESPONSE_COMMAND:
                processResponseCommand(ctx, cmd);
                break;
            default:
                break;
        }
    }
}

2.3.2 processRequestCommand

  • 请求报文处理
  • processorTable存储请求码对应的处理函数NettyRequestProcessor,ExecutorService则是执行处理函数的线程池,异步处理。
protected final HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable =
        new HashMap<Integer, Pair<NettyRequestProcessor, ExecutorService>>(64);
  • Pair<NettyRequestProcessor, ExecutorService> defaultRequestProcessor默认处理函数,处理不在processorTable中的请求码。
  • 根据请求码获取对应请求函数,不存在则响应RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED
  • 存在请求码处理函数,创建线程执行任务final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);,在线程池中执行
  • RPCHook rpcHook = NettyRemotingAbstract.this.getRPCHook(); 获取钩子函数,在请求码处理前置钩子doBeforeRequest和后置钩子doAfterResponse处理。
  • 钩子处理之间,调用请求处理函数pair.getObject1().processRequest(ctx, cmd);
  • 非oneway类型请求,则使用channel发送请求处理结果的响应
  • 使用从请求报文中获取的请求码设置响应报文response.setOpaque(opaque);,表示是哪个请求的响应

2.3.3 processRequestCommand

  • 响应报文处理
  • ConcurrentMap<Integer /* opaque */, ResponseFuture> responseTable 请求报文id为key存储请求对应的ResponseFuture
  • 从响应报文中获取请求id, 获取对应的ResponseFuture
final int opaque = cmd.getOpaque();
final ResponseFuture responseFuture = responseTable.get(opaque);
  • 注册了响应回调函数,
    获取回调执行线程池ExecutorService executor = this.getCallbackExecutor();
    使用回调线程池执行回调函数或在响应请求处理线程中执行回调函数.
    使用executeCallbackOnlyOnce控制并发仅触发一次回调函数
public void executeInvokeCallback() {
    if (invokeCallback != null) {
        if (this.executeCallbackOnlyOnce.compareAndSet(false, true)) {
            invokeCallback.operationComplete(this);
        }
    }
}
  • 未注册响应回调函数,则释放oneway或async的并发信号量

2.4 发包处理

2.4.1 invokeSyncImpl同步发包

  • 初始化R esponseFuture,存入responseTable表中
inal ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, null, null);
this.responseTable.put(opaque, responseFuture);
  • 发送请求,并注册响应报文监听处理回调函数。
    响应处理函数中,调用responseFuture.putResponse(null);,使用countDownLatch通知同步等待的线程。
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture f) throws Exception {
        if (f.isSuccess()) {
            responseFuture.setSendRequestOK(true);
            return;
        } else {
            responseFuture.setSendRequestOK(false);
        }

        responseTable.remove(opaque);
        responseFuture.setCause(f.cause());
        responseFuture.putResponse(null);
        log.warn("send a request command to channel <" + addr + "> failed.");
    }
});

public void putResponse(final RemotingCommand responseCommand) {
    this.responseCommand = responseCommand;
    this.countDownLatch.countDown();
}
  • 使用countDownLatch等待响应结果,超时则抛异常
RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);

public RemotingCommand waitResponse(final long timeoutMillis) throws InterruptedException {
    this.countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS);
    return this.responseCommand;
}

2.4.2 invokeAsyncImpl异步发包

  • 获取并发控制信号量
    boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
  • 处理流程与同步发包相似.
  • 区别1,发送报文后不等待响应,直接返回
  • 区别2,响应报文监听回调中,executeInvokeCallback(responseFuture);调用入参响应回调函数。函数中释放并发控制信号量

2.4.3 invokeOnewayImpl单向发包

  • 获取并发控制信号量boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
  • 响应监听回调函数中仅做一件事,释放并发控制信号量

2.4.4 scanResponseTable

  • 定时任务扫描ResponseTable,处理超时请求
    (rep.getBeginTimestamp() + rep.getTimeoutMillis() + 1000) <= System.currentTimeMillis()
  • 释放并发信号量,删除responseTable表内存储项
  • 执行响应回调函数executeInvokeCallback(rf);

三 NettyRemotingServer

3.1 实例化

  • 初始化属性
public NettyRemotingServer(final NettyServerConfig nettyServerConfig,
    final ChannelEventListener channelEventListener) {
    super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());
    this.serverBootstrap = new ServerBootstrap();
    this.nettyServerConfig = nettyServerConfig;
//channel事件监听回调
    this.channelEventListener = channelEventListener;

    int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads();
    if (publicThreadNums <= 0) {
        publicThreadNums = 4;
    }
//默认请求码处理线程池
    this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
        private AtomicInteger threadIndex = new AtomicInteger(0);

        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, "NettyServerPublicExecutor_" + this.threadIndex.incrementAndGet());
        }
    });
//连接请求分发器
//服务端监听端口,获取到连接请求后,异步线程创建子连接
    this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() {
        private AtomicInteger threadIndex = new AtomicInteger(0);

        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, String.format("NettyBoss_%d", this.threadIndex.incrementAndGet()));
        }
    });
//子连接io请求分发器,异步线程收发报文
    if (useEpoll()) {
        this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
            private AtomicInteger threadIndex = new AtomicInteger(0);
            private int threadTotal = nettyServerConfig.getServerSelectorThreads();

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
            }
        });
    } else {
        this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
            private AtomicInteger threadIndex = new AtomicInteger(0);
            private int threadTotal = nettyServerConfig.getServerSelectorThreads();

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
            }
        });
    }
//支持TLS(是“Transport Layer Security”的缩写),中文叫做“传输层安全协议”。
    TlsMode tlsMode = TlsSystemConfig.tlsMode;
    log.info("Server is running in TLS {} mode", tlsMode.getName());

    if (tlsMode != TlsMode.DISABLED) {
        try {
            sslContext = TlsHelper.buildSslContext(false);
            log.info("SSLContext created for server");
        } catch (CertificateException e) {
            log.error("Failed to create SSLContext for server", e);
        } catch (IOException e) {
            log.error("Failed to create SSLContext for server", e);
        }
    }
}

3.1 启动

  • 报文收发完成后,异步线程用于报文解析,如编解码等
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
    nettyServerConfig.getServerWorkerThreads(),
    new ThreadFactory() {

        private AtomicInteger threadIndex = new AtomicInteger(0);

        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
        }
    });
  • netty服务端初始化
ServerBootstrap childHandler =
//配置连接请求分发器,io请求分发器
    this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
        .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
//配置主channel属性,accept queue长度1024,地址重用,关闭keepalive
        .option(ChannelOption.SO_BACKLOG, 1024)
        .option(ChannelOption.SO_REUSEADDR, true)
        .option(ChannelOption.SO_KEEPALIVE, false)
//配置子channel属性, 收发包缓冲区65535, 配置TCP_NODELAY立即响应
        .childOption(ChannelOption.TCP_NODELAY, true)
        .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
        .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
//监听端口
        .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
//报文处理函数
        .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            public void initChannel(SocketChannel ch) throws Exception {
                ch.pipeline()
//tls安全处理
                    .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME,
                        new HandshakeHandler(TlsSystemConfig.tlsMode))
                    .addLast(defaultEventExecutorGroup,
                        new NettyEncoder(),//编码
                        new NettyDecoder(),//解码
                        new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
//连接状态监听,发送NettyEventType事件
                        new NettyConnectManageHandler(),
//报文处理函数
                        new NettyServerHandler()
                    );
            }
        });

if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
    childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
}
  • 启动netty server
try {
    ChannelFuture sync = this.serverBootstrap.bind().sync();
    InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
    this.port = addr.getPort();
} catch (InterruptedException e1) {
    throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
}
  • 启动NettyEventType事件处理函数,不同事件调用channelEventListener对应的回调函数
if (this.channelEventListener != null) {
    this.nettyEventExecutor.start();
}
  • 定时任务扫描responseTable,处理超时无响应的请求。
this.timer.scheduleAtFixedRate(new TimerTask() {

    @Override
    public void run() {
        try {
            NettyRemotingServer.this.scanResponseTable();
        } catch (Throwable e) {
            log.error("scanResponseTable exception", e);
        }
    }
}, 1000 * 3, 1000);
  • 注册请求码requestCode对应的处理函数NettyRequestProcessor,及调用处理函数的线程池ExecutorService。若ExecutorService为null,则使用默认线程池publicExecutor
public void registerProcessor(int requestCode, NettyRequestProcessor processor, ExecutorService executor) {
    ExecutorService executorThis = executor;
    if (null == executor) {
        executorThis = this.publicExecutor;
    }

    Pair<NettyRequestProcessor, ExecutorService> pair = new Pair<NettyRequestProcessor, ExecutorService>(processor, executorThis);
    this.processorTable.put(requestCode, pair);
}
  • 注册默认请求码处理函数
public void registerDefaultProcessor(NettyRequestProcessor processor, ExecutorService executor) {
    this.defaultRequestProcessor = new Pair<NettyRequestProcessor, ExecutorService>(processor, executor);
}
  • 注册请求处理函数处理过程中的钩子函数,请求处理前后执行对应钩子函数。
public interface RPCHook {
    void doBeforeRequest(final String remoteAddr, final RemotingCommand request);

    void doAfterResponse(final String remoteAddr, final RemotingCommand request,
        final RemotingCommand response);
}

public void registerRPCHook(RPCHook rpcHook) {
    this.rpcHook = rpcHook;
}

四 NettyRemotingClient

4.1 实例化

public NettyRemotingClient(final NettyClientConfig nettyClientConfig,
    final ChannelEventListener channelEventListener) {
    super(nettyClientConfig.getClientOnewaySemaphoreValue(), nettyClientConfig.getClientAsyncSemaphoreValue());
    this.nettyClientConfig = nettyClientConfig;
//连接事件回调
    this.channelEventListener = channelEventListener;

    int publicThreadNums = nettyClientConfig.getClientCallbackExecutorThreads();
    if (publicThreadNums <= 0) {
        publicThreadNums = 4;
    }
//默认请求码处理线程池
    this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
        private AtomicInteger threadIndex = new AtomicInteger(0);

        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, "NettyClientPublicExecutor_" + this.threadIndex.incrementAndGet());
        }
    });
//io事件分发器
    this.eventLoopGroupWorker = new NioEventLoopGroup(1, new ThreadFactory() {
        private AtomicInteger threadIndex = new AtomicInteger(0);

        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, String.format("NettyClientSelector_%d", this.threadIndex.incrementAndGet()));
        }
    });
//安全协议处理
    if (nettyClientConfig.isUseTLS()) {
        try {
            sslContext = TlsHelper.buildSslContext(true);
            log.info("SSL enabled for client");
        } catch (IOException e) {
            log.error("Failed to create SSLContext", e);
        } catch (CertificateException e) {
            log.error("Failed to create SSLContext", e);
            throw new RuntimeException("Failed to create SSLContext", e);
        }
    }
}

4.2 启动

public void start() {
//报文解析处理线程池
    this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
        nettyClientConfig.getClientWorkerThreads(),
        new ThreadFactory() {

            private AtomicInteger threadIndex = new AtomicInteger(0);

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet());
            }
        });
//netty client初始化
    Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
//channel配置,收发包缓冲区65535, 关闭keepalive,连接超时60秒,配置TCP_NODELAY立即响应
        .option(ChannelOption.TCP_NODELAY, true)
        .option(ChannelOption.SO_KEEPALIVE, false)
        .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
        .option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
        .option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
//收发包处理函数注册
        .handler(new ChannelInitializer<SocketChannel>() {
            @Override
            public void initChannel(SocketChannel ch) throws Exception {
                ChannelPipeline pipeline = ch.pipeline();
                if (nettyClientConfig.isUseTLS()) {
                    if (null != sslContext) {
                        pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));
                        log.info("Prepend SSL handler");
                    } else {
                        log.warn("Connections are insecure as SSLContext is null!");
                    }
                }
                pipeline.addLast(
                    defaultEventExecutorGroup,
                    new NettyEncoder(),
                    new NettyDecoder(),
                    new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
                    new NettyConnectManageHandler(),
                    new NettyClientHandler());
            }
        });
//定时任务,处理超时无响应的请求
    this.timer.scheduleAtFixedRate(new TimerTask() {
        @Override
        public void run() {
            try {
                NettyRemotingClient.this.scanResponseTable();
            } catch (Throwable e) {
                log.error("scanResponseTable exception", e);
            }
        }
    }, 1000 * 3, 1000);
//启动连接事件处理线程
    if (this.channelEventListener != null) {
        this.nettyEventExecutor.start();
    }
}

4.3 namesrv管理

  • AtomicReference<List<String>> namesrvAddrList存储namesrv地址列表
  • AtomicReference<String> namesrvAddrChoosed = new AtomicReference<String>();存储当前使用的namesrv地址
  • namesrv地址更新
public void updateNameServerAddressList(List<String> addrs) {
    List<String> old = this.namesrvAddrList.get();
    boolean update = false;

    if (!addrs.isEmpty()) {
        if (null == old) {
            update = true;
        } else if (addrs.size() != old.size()) {
            update = true;
        } else {
            for (int i = 0; i < addrs.size() && !update; i++) {
                if (!old.contains(addrs.get(i))) {
                    update = true;
                }
            }
        }

        if (update) {
            Collections.shuffle(addrs);
            log.info("name server address updated. NEW : {} , OLD: {}", addrs, old);
            this.namesrvAddrList.set(addrs);
        }
    }
}
  • 获取与namesrv通信的channel,用于发送请求
private Channel getAndCreateNameserverChannel() throws InterruptedException {
//优先使用上次使用的namesrv
        String addr = this.namesrvAddrChoosed.get();
        if (addr != null) {
//从缓存的客户端channel表中获取
            ChannelWrapper cw = this.channelTables.get(addr);
            if (cw != null && cw.isOK()) {
                return cw.getChannel();
            }
        }

        final List<String> addrList = this.namesrvAddrList.get();
        if (this.lockNamesrvChannel.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
            try {
//加锁后二次判断,避免并发
                addr = this.namesrvAddrChoosed.get();
                if (addr != null) {
                    ChannelWrapper cw = this.channelTables.get(addr);
                    if (cw != null && cw.isOK()) {
                        return cw.getChannel();
                    }
                }

                if (addrList != null && !addrList.isEmpty()) {
                    for (int i = 0; i < addrList.size(); i++) {
//使用namesrvIndex,轮询方式获取下一个可用的namesrv地址
                        int index = this.namesrvIndex.incrementAndGet();
                        index = Math.abs(index);
                        index = index % addrList.size();
                        String newAddr = addrList.get(index);

                        this.namesrvAddrChoosed.set(newAddr);
                        log.info("new name server is chosen. OLD: {} , NEW: {}. namesrvIndex = {}", addr, newAddr, namesrvIndex);
//和服务端建立netty连接,保存客户端channel。
                        Channel channelNew = this.createChannel(newAddr);
                        if (channelNew != null)
                            return channelNew;
                    }
                }
            } catch (Exception e) {
                log.error("getAndCreateNameserverChannel: create name server channel exception", e);
            } finally {
                this.lockNamesrvChannel.unlock();
            }
        } else {
            log.warn("getAndCreateNameserverChannel: try to lock name server, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
        }

        return null;
    }

4.4 发送请求

  • 获取目标地址连接的客户端channel
    final Channel channel = this.getAndCreateChannel(addr);
  • sync,oneway,async三种发送请求类型
  • 发送前先调用钩子函数this.rpcHook.doBeforeRequest(addr, request);
  • 实际发送使用NettyRemotingAbstract中的发送函数

4.5 建立channel连接

  • ConcurrentMap<String /* addr */, ChannelWrapper> channelTables存储和目标服务端地址addr建立netty tcp连接的客户端
  • 建立连接
private Channel createChannel(final String addr) throws InterruptedException {
// channelTables检查是否已存在,存在则先关闭连接
    ChannelWrapper cw = this.channelTables.get(addr);
    if (cw != null && cw.isOK()) {
        cw.getChannel().close();
        channelTables.remove(addr);
    }

    if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
        try {
            boolean createNewConnection;
//加锁后再次检查,若存在则先关闭连接
            cw = this.channelTables.get(addr);
            if (cw != null) {

                if (cw.isOK()) {
                    cw.getChannel().close();
                    this.channelTables.remove(addr);
                    createNewConnection = true;
                } else if (!cw.getChannelFuture().isDone()) {
                    createNewConnection = false;
                } else {
                    this.channelTables.remove(addr);
                    createNewConnection = true;
                }
            } else {
                createNewConnection = true;
            }

            if (createNewConnection) {
//connect发起连接,并保存channel信息
                ChannelFuture channelFuture = this.bootstrap.connect(RemotingHelper.string2SocketAddress(addr));
                log.info("createChannel: begin to connect remote host[{}] asynchronously", addr);
                cw = new ChannelWrapper(channelFuture);
                this.channelTables.put(addr, cw);
            }
        } catch (Exception e) {
            log.error("createChannel: create channel exception", e);
        } finally {
            this.lockChannelTables.unlock();
        }
    } else {
        log.warn("createChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
    }

    if (cw != null) {
//等待连接建立成功,超时无响应则认为建立连接失败
        ChannelFuture channelFuture = cw.getChannelFuture();
        if (channelFuture.awaitUninterruptibly(this.nettyClientConfig.getConnectTimeoutMillis())) {
            if (cw.isOK()) {
                log.info("createChannel: connect remote host[{}] success, {}", addr, channelFuture.toString());
                return cw.getChannel();
            } else {
                log.warn("createChannel: connect remote host[" + addr + "] failed, " + channelFuture.toString(), channelFuture.cause());
            }
        } else {
            log.warn("createChannel: connect remote host[{}] timeout {}ms, {}", addr, this.nettyClientConfig.getConnectTimeoutMillis(),
                channelFuture.toString());
        }
    }

    return null;
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,001评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,210评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,874评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,001评论 1 291
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,022评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,005评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,929评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,742评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,193评论 1 309
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,427评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,583评论 1 346
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,305评论 5 342
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,911评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,564评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,731评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,581评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,478评论 2 352