RocketMQ源码解析(一)——通信

RocketMQ使用Netty进行底层通信,包括NameServerBroker(Master/Slave)ProducerConsumer4个角色,接下来让我们分析一下它是如何实现的。

基本概念

rocketmq-remoting模块承担了远程通信的任务,相关代码都存在于org.apache.rocketmq.remoting包下面。

虽然Netty本身已经对许多网络问题进行了处理,并且大幅简化了网络编程(相对于使用原生Java NIO API),但是我们在开发时大多情况下仍然是在应用层进行的。因此,我们在采用TCP作为传输层协议时,需要考虑TCP流协议的特性,自定义一个应用层协议以便解决所谓的“粘包”,“半包”问题(当然这两个术语都是不严谨的,此处因其知名度借用)。虽然可以直接使用最常接触的HTTP协议等应用层协议,但是其头部信息对于常规应用来说都是无用的,考虑到通信性能等因素,自定义应用层协议是最好的选择。而Netty本身提供了通用的编码器以及解码器,用于解决TCP传输问题,例如LengthFieldBasedFrameDecoder,但是我们在其上抽象出一层专用的通信API是更好的选择。同时,当我们不使用Netty作为底层通信框架时,其他模块也是无感的。

分析完不直接使用Netty进行编码的原因后,让我们看一下RocketMQ提供的通信API。

remoting通信类结构如下:

RemotingServerRemotingClient接口都提供了同步调用,异步调用以及单向调用三种通信方式,因此可以满足几乎所有的通信需求。在此基础上,使用Netty作为具体实现,完成高性能的通信过程。

协议设计与编解码

ClientServer之间完成一次消息发送时,需要对发送的消息进行一个协议约定,因此就有必要自定义RocketMQ的消息协议。同时,为了高效地在网络中传输消息和对收到的消息读取,就需要对消息进行编解码。在RocketMQ中,RemotingCommand这个类在消息传输过程中对所有数据内容的封装,不但包含了所有的数据结构,还包含了编码解码操作。

下面是RocketMQ所使用协议的头部字段:

Header字段 类型 Request说明 Response说明
code int 请求操作码,应答方根据不同的请求码进行不同的业务处理 应答响应码。0表示成功,非0则表示各种错误
language LanguageCode 请求方实现的语言 应答方实现的语言
version int 请求方程序的版本 应答方程序的版本
opaque int 相当于reqeustId,在同一个连接上的不同请求标识码,与响应消息中的相对应 应答不做修改直接返回
flag int 区分是普通RPC还是onewayRPC得标志 区分是普通RPC还是onewayRPC得标志
remark String 传输自定义文本信息 传输自定义文本信息
extFields HashMap<String, String> 请求自定义扩展信息 响应自定义扩展信息

对其进行编码以后,一个完整的消息结构如下所示:

rocketmq_design_4.png

可见传输内容主要可以分为以下4部分:

  1. 消息长度:总长度,四个字节存储,占用一个int类型;

  2. 序列化类型&消息头长度:同样占用一个int类型,第一个字节表示序列化类型,后面三个字节表示消息头长度;

  3. 消息头数据:经过序列化后的消息头数据;

  4. 消息主体数据:消息主体的二进制字节数据内容;

Refactor多线程设计

RocketMQ的RPC通信采用Netty组件作为底层通信库,同样也遵循了Reactor多线程模型,同时又在这之上做了一些扩展和优化。

NettyChannelPipeline链中包含编码器,解码器,空闲状态处理器,连接管理器以及核心的业务处理器。其中编码器和解码器用于RemotingCommand的编解码;空闲状态处理器用于检测空闲连接,如果某个客户端在给定时间内未与服务端通信(无论读写),那么将会关闭这个空闲连接;连接管理器用于记录客户端连接日志,同时也作为事件源,发出连接事件,空闲时间,连接关闭事件,异常事件等,用户可以自定义监听器处理这些事件。

上面的框图中可以大致了解RocketMQNettyRemotingServerReactor多线程模型。一个Reactor主线程(eventLoopGroupBoss,即为上面的Netty_Reactor_Master)负责监听TCP网络连接请求,建立好连接,创建SocketChannel,并注册到Selector上。RocketMQ的源码中会自动根据OS的类型选择NIOEpoll,也可以通过参数配,然后监听真正的网络数据。拿到网络数据后,再丢给Worker线程池(eventLoopGroupSelector,即为上面的Netty_Reactor_Pool,源码中默认设置线程数为3),在真正执行业务逻辑之前需要进行SSL验证、编解码、空闲检查、网络连接管理,这些工作交给defaultEventExecutorGroup(即为上面的Netty_Worker_Pool,源码中默认设置线程数为8)去做。而处理业务操作放在业务线程池中执行,根据RomotingCommand的业务请求码code去processorTable这个本地缓存变量中找到对应的processor,然后封装成task任务后,提交给对应的业务processor处理线程池来执行(以处理请求消息为例,即为上面的Remoting_Worker_Pool)。从入口到业务逻辑的几个步骤中线程池一直再增加,这跟每一步逻辑复杂性相关,越复杂,需要的并发通道越宽。

线程数 线程名 线程具体说明
1 NettyBoss_%d Reactor 主线程
N NettyServerEPOLLSelector_%d_%d Reactor 线程池
M1 NettyServerCodecThread_%d Worker线程池
M2 RemotingExecutorThread_%d 业务processor处理线程池

从我个人的观点来看,boss以及selector线程池是必需的,这也是Netty经典的线程处理模型(由于Netty使用了Reactor模型),但是M1线程池却不是必需的,甚至会在一定程度上降低并发性能。参照Netty的线程模型,它希望一个Channel的所有数据发送接收都有同一条线程处理,以此最大程度上减少上下文切换带来的性能开销,而在RocketMQ的线程模型中,这一点反而被违背了,Reactor线程池在此处几乎没有任何作用,ChannelPipeline中的自定义处理器全部交由Worker线程池处理,Reactor线程池只负责消息的接受发送以及最后的异常兜底(此步也由NettyConnectManageHandle代为处理了),之后消息的后续处理全部需要进行一次线程切换,由Worker线程池中的线程进行处理,而这也是完全不必要的。因为NIO的高性能,I/O在此处并不是扯性能后腿的地方。

了解了基本信息后,让我们正式开始源码解析。

RemotingCommand

RemotingCommand类定义了RocketMQ所用协议的内容,我们首先需要理解这个类的内容,以便之后源码的分析。

    private int code;
    private LanguageCode language = LanguageCode.JAVA;
    private int version = 0;
    private int opaque = requestId.getAndIncrement();
    private int flag = 0;
    private String remark;
    private HashMap<String, String> extFields;
    private transient CommandCustomHeader customHeader;

    private SerializeType serializeTypeCurrentRPC = serializeTypeConfigInThisServer;

    private transient byte[] body;

RemotingCommand类的私有字段如上所示,大部分字段都在之前的协议Header表中进行了介绍,此处多出的第一个字段customHeader用来简化RemotingCommand的构造,它的类型为CommandCustomHeader接口,你可以实现这个接口用来自定义额外的协议头信息,这些信息最终将会编码到extFields字段中;第二个字段serializeTypeCurrentRPC表示RemotingCommand类的序列化方式,当前实现存在json序列化以及紧凑型序列化两种方式,第二种方法将协议头字段紧密排列在一起,以减少数据流的大小,默认使用json序列化方式。

RemotingCommand类的构造方法的访问控制符为protected,因此我们无法通过构造方法构造它的对象,只能使用其提供的几个静态工厂方法,当然这几个方法也提供了极大的便利。

  1. createRequestCommand(int code, CommandCustomHeader customHeader)
  2. createResponseCommand(Class<? extends CommandCustomHeader> classHeader)
  3. createResponseCommand(int code, String remark,
    Class<? extends CommandCustomHeader> classHeader)
  4. createResponseCommand(int code, String remark)

其中第一个用来构造请求数据,剩余三个用于构造回复数据。当你的回复数据需要自定义请求头信息时,你需要先调用第二个方法或者第三个方法,然后调用RemotingCommand#readCustomHeader方法获取自定义请求头的实例,最后将你自定义的字段信息写入这个实例中。当然,这个过程很有画蛇添足的感觉,直接传入一个已经设置好数据的CommandCustomHeader接口实例可以省略上面的这些步骤,而非其类型。在此处引入这些麻烦过程的意义在我看来是强制用户自定义的CommandCustomHeader接口实例需要一个公有的无参构造方法,以便之后解码时无法构造实例,不过这个要求可以通过约定以及相应的检查,而非加大构造实例的负担。

编码

为了可以在网络中传输数据,需要将RemotingCommand类实例编码为字节流,RemotingCommand类提供了相应的编码方法encodeHeader(),将协议头编码为字节流。

    public ByteBuffer encodeHeader() {
        return encodeHeader(this.body != null ? this.body.length : 0);
    }

    public ByteBuffer encodeHeader(final int bodyLength) {
        // 1> header length size
        int length = 4;

        // 2> header data length
        byte[] headerData;
        headerData = this.headerEncode();

        length += headerData.length;

        // 3> body data length
        length += bodyLength;

        ByteBuffer result = ByteBuffer.allocate(4 + length - bodyLength);

        // length
        result.putInt(length);

        // header length
        result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));

        // header data
        result.put(headerData);

        result.flip();

        return result;
    }

    public static byte[] markProtocolType(int source, SerializeType type) {
        byte[] result = new byte[4];

        result[0] = type.getCode();
        result[1] = (byte) ((source >> 16) & 0xFF);
        result[2] = (byte) ((source >> 8) & 0xFF);
        result[3] = (byte) (source & 0xFF);
        return result;
    }

之前在协议设计一节中已经介绍了编码后的数据结构。首先为4字节的数据流大小,包括协议头以及协议体的总体长度,在上面的方法中为临时变量length。接下来的四字节为协议头长度以及序列化所用的方法的合并,第一个字节表示序列化方法,后三个字节表示协议头长度,因为协议头包括String以及HashMap<String, String>两种不定长的字段。最后是协议头数据,协议体本身已经是byte[],无需编码。

让我们看一下核心方法headerEncode()的实现。

    private byte[] headerEncode() {
        this.makeCustomHeaderToNet();
        if (SerializeType.ROCKETMQ == serializeTypeCurrentRPC) {
            return RocketMQSerializable.rocketMQProtocolEncode(this);
        } else {
            return RemotingSerializable.encode(this);
        }
    }

    public void makeCustomHeaderToNet() {
        if (this.customHeader != null) {
            Field[] fields = getClazzFields(customHeader.getClass());
            if (null == this.extFields) {
                this.extFields = new HashMap<String, String>();
            }

            for (Field field : fields) {
                if (!Modifier.isStatic(field.getModifiers())) {
                    String name = field.getName();
                    if (!name.startsWith("this")) {
                        Object value = null;
                        try {
                            field.setAccessible(true);
                            value = field.get(this.customHeader);
                        } catch (Exception e) {
                            log.error("Failed to access field [{}]", name, e);
                        }

                        if (value != null) {
                            this.extFields.put(name, value.toString());
                        }
                    }
                }
            }
        }
    }

makeCustomHeaderToNet()方法用于将自定义的CommandCustomHeader实例数据放入extFields字段中。然后根据不同的序列化方法对协议头字段进行序列化,包括json序列化以及RocketMQ自定义的序列化两种方式。

关于json序列化方式没有什么好介绍的,RocketMQ使用fastjson作为底层json框架,这个框架目前拥有许多问题,作者也没有足够精力进行解决,如果需要借用rocketmq-remoting模块,最好换用其他稳定性较高的json框架。RocketMQ自定义的序列化方式相比json更加高效,如下所示:

字段 长度(字节)
code 2
language 1
version 2
opaque 4
flag 4
remark 不定长
extFields 不定长

关于最后两个不定长字段,如果存在数据,那么先写入长度,再写入数据。即remark len | remark data | extFields len | extFields data

这种序列化方式相对于json来说省去了字段名以及json的格式,只传输真正的数据,因此每一次的数据传输都更加精简,无论性能还是稳定性都获得了提升。

完整的编码过程在NettyEncoder类中,如下所示:

    public void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out)
        throws Exception {
        try {
            ByteBuffer header = remotingCommand.encodeHeader();
            out.writeBytes(header);
            byte[] body = remotingCommand.getBody();
            if (body != null) {
                out.writeBytes(body);
            }
        } catch (Exception e) {
            log.error("encode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);
            if (remotingCommand != null) {
                log.error(remotingCommand.toString());
            }
            RemotingUtil.closeChannel(ctx.channel());
        }
    }

首先对协议头编码写入,然后直接写入协议体内容。

解码

了解了编码的过程后,解码的过程则很容易理解。解码方法如下所示:

    public static RemotingCommand decode(final ByteBuffer byteBuffer) {
        int length = byteBuffer.limit();
        int oriHeaderLen = byteBuffer.getInt();
        int headerLength = getHeaderLength(oriHeaderLen);

        byte[] headerData = new byte[headerLength];
        byteBuffer.get(headerData);

        RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));

        int bodyLength = length - 4 - headerLength;
        byte[] bodyData = null;
        if (bodyLength > 0) {
            bodyData = new byte[bodyLength];
            byteBuffer.get(bodyData);
        }
        cmd.body = bodyData;

        return cmd;
    }

    public static int getHeaderLength(int length) {
        return length & 0xFFFFFF;
    }

在解码过程中,我们发现数据的前4个字节为协议头长度,而编码时前4个字节为消息的总长度。这并不是代码写错了,而是RocketMQ使用Netty提供了解码工具类LengthFieldBasedFrameDecoder。如下所示:

public NettyDecoder() {
    super(FRAME_MAX_LENGTH, 0, 4, 0, 4);
}

在这个解码器的构造过程中,指明了消息长度为前4字节,并且最后会丢弃前4字节。具体细节请自行查阅LengthFieldBasedFrameDecoder类的文档。

    private static RemotingCommand headerDecode(byte[] headerData, SerializeType type) {
        switch (type) {
            case JSON:
                RemotingCommand resultJson = RemotingSerializable.decode(headerData, RemotingCommand.class);
                resultJson.setSerializeTypeCurrentRPC(type);
                return resultJson;
            case ROCKETMQ:
                RemotingCommand resultRMQ = RocketMQSerializable.rocketMQProtocolDecode(headerData);
                resultRMQ.setSerializeTypeCurrentRPC(type);
                return resultRMQ;
            default:
                break;
        }

        return null;
    }

    public static SerializeType getProtocolType(int source) {
        return SerializeType.valueOf((byte) ((source >> 24) & 0xFF));
    }

协议头的反序列化过程如上所示,只是根据编码所用的序列化方法反序列化。如果你需要获取自定义的CommandCustomHeader实例,需要调用decodeCommandCustomHeader()方法。具体过程只是使用java反射,此处不多赘述了。

完成的解码过程在NettyDecoder方法中,如下所示:

    @Override
    public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
        ByteBuf frame = null;
        try {
            frame = (ByteBuf) super.decode(ctx, in);
            if (null == frame) {
                return null;
            }

            ByteBuffer byteBuffer = frame.nioBuffer();

            return RemotingCommand.decode(byteBuffer);
        } catch (Exception e) {
            log.error("decode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);
            RemotingUtil.closeChannel(ctx.channel());
        } finally {
            if (null != frame) {
                frame.release();
            }
        }

        return null;
    }

服务端

服务端NettyRemotingServer的构造方法有两个,其中一个可以指定之前提到的事件监听器,默认为空,具体如下:

public NettyRemotingServer(final NettyServerConfig nettyServerConfig) {
    this(nettyServerConfig, null);
}

public NettyRemotingServer(final NettyServerConfig nettyServerConfig,
    final ChannelEventListener channelEventListener) {
    // 256 64
    super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());
    this.serverBootstrap = new ServerBootstrap();
    this.nettyServerConfig = nettyServerConfig;
    this.channelEventListener = channelEventListener;

    // 0
    int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads();
    if (publicThreadNums <= 0) {
        publicThreadNums = 4;
    }

    this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
        private AtomicInteger threadIndex = new AtomicInteger(0);

        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, "NettyServerPublicExecutor_" + this.threadIndex.incrementAndGet());
        }
    });

    // linux环境使用epoll
    if (useEpoll()) {
        // 1  boss eventLoopGroup,负责监听accept请求
        this.eventLoopGroupBoss = new EpollEventLoopGroup(1, new ThreadFactory() {
            private AtomicInteger threadIndex = new AtomicInteger(0);

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, String.format("NettyEPOLLBoss_%d", this.threadIndex.incrementAndGet()));
            }
        });

        // 3  selector eventLoopGroup,收发网络数据
        this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
            private AtomicInteger threadIndex = new AtomicInteger(0);
            private int threadTotal = nettyServerConfig.getServerSelectorThreads();

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
            }
        });
    } else {
        this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() {
            private AtomicInteger threadIndex = new AtomicInteger(0);

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, String.format("NettyNIOBoss_%d", this.threadIndex.incrementAndGet()));
            }
        });

        this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
            private AtomicInteger threadIndex = new AtomicInteger(0);
            private int threadTotal = nettyServerConfig.getServerSelectorThreads();

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
            }
        });
    }

    loadSslContext();
}

核心过程就是ServerBootstrap所需组件的构造,默认使用NioEventLoopGroup,使用一个boss线程处理TCP网络请求,三个worker线程处理消息收发。你也可以使用epoll代替默认的nio,如果你所使用的平台为linux并且支持epoll,同时你还需要主动配置以启用它。

private boolean useEpoll() {
    return RemotingUtil.isLinuxPlatform()
        && nettyServerConfig.isUseEpollNativeSelector()
        && Epoll.isAvailable();
}

/**
 * make make install
 *
 *
 * ../glibc-2.10.1/configure \ --prefix=/usr \ --with-headers=/usr/include \
 * --host=x86_64-linux-gnu \ --build=x86_64-pc-linux-gnu \ --without-gd
 */
private boolean useEpollNativeSelector = false;

Netty本身是建立在传输层之上的通信框架,为了消息的安全性,还需要设置SSL,以保证通信安全。在此处,RocketMQ使用Netty内建的SSL工具类。

public void loadSslContext() {
    // 默认为permissive
    TlsMode tlsMode = TlsSystemConfig.tlsMode;
    log.info("Server is running in TLS {} mode", tlsMode.getName());

    if (tlsMode != TlsMode.DISABLED) {
        try {
            sslContext = TlsHelper.buildSslContext(false);
            log.info("SSLContext created for server");
        } catch (CertificateException e) {
            log.error("Failed to create SSLContext for server", e);
        } catch (IOException e) {
            log.error("Failed to create SSLContext for server", e);
        }
    }
}

public static SslContext buildSslContext(boolean forClient) throws IOException, CertificateException {
    //     /etc/rocketmq/tls.properties
    //  默认此文件不存在
    File configFile = new File(TlsSystemConfig.tlsConfigFile);
    extractTlsConfigFromFile(configFile);
    // 日志记录最终的配置信息
    logTheFinalUsedTlsConfig();

    SslProvider provider;
    if (OpenSsl.isAvailable()) {
        provider = SslProvider.OPENSSL;
        LOGGER.info("Using OpenSSL provider");
    } else {
        provider = SslProvider.JDK;
        LOGGER.info("Using JDK SSL provider");
    }

    // false
    if (forClient) {
        if (tlsTestModeEnable) {
            return SslContextBuilder
                .forClient()
                .sslProvider(SslProvider.JDK)
                .trustManager(InsecureTrustManagerFactory.INSTANCE)
                .build();
        } else {
            SslContextBuilder sslContextBuilder = SslContextBuilder.forClient().sslProvider(SslProvider.JDK);


            if (!tlsClientAuthServer) {
                sslContextBuilder.trustManager(InsecureTrustManagerFactory.INSTANCE);
            } else {
                if (!isNullOrEmpty(tlsClientTrustCertPath)) {
                    sslContextBuilder.trustManager(new File(tlsClientTrustCertPath));
                }
            }

            return sslContextBuilder.keyManager(
                !isNullOrEmpty(tlsClientCertPath) ? new FileInputStream(tlsClientCertPath) : null,
                !isNullOrEmpty(tlsClientKeyPath) ? decryptionStrategy.decryptPrivateKey(tlsClientKeyPath, true) : null,
                !isNullOrEmpty(tlsClientKeyPassword) ? tlsClientKeyPassword : null)
                .build();
        }
    } else {
        // tls.test.mode.enable 系统变量,默认为true
        if (tlsTestModeEnable) {
            // 使用netty内建的ssl context
            SelfSignedCertificate selfSignedCertificate = new SelfSignedCertificate();
            return SslContextBuilder
                .forServer(selfSignedCertificate.certificate(), selfSignedCertificate.privateKey())
                .sslProvider(SslProvider.JDK)
                .clientAuth(ClientAuth.OPTIONAL)
                .build();
        } else {
            SslContextBuilder sslContextBuilder = SslContextBuilder.forServer(
                !isNullOrEmpty(tlsServerCertPath) ? new FileInputStream(tlsServerCertPath) : null,
                !isNullOrEmpty(tlsServerKeyPath) ? decryptionStrategy.decryptPrivateKey(tlsServerKeyPath, false) : null,
                !isNullOrEmpty(tlsServerKeyPassword) ? tlsServerKeyPassword : null)
                .sslProvider(provider);

            if (!tlsServerAuthClient) {
                sslContextBuilder.trustManager(InsecureTrustManagerFactory.INSTANCE);
            } else {
                if (!isNullOrEmpty(tlsServerTrustCertPath)) {
                    sslContextBuilder.trustManager(new File(tlsServerTrustCertPath));
                }
            }

            sslContextBuilder.clientAuth(parseClientAuthMode(tlsServerNeedClientAuth));
            return sslContextBuilder.build();
        }
    }
}

如果你需要自定义SSL相关信息,请构建/etc/rocketmq/tls.properties文件,并将tls.test.mode.enable属性设置为false。配置文件支持的全部配置项如下所示:

    public static final String TLS_SERVER_MODE = "tls.server.mode";
    public static final String TLS_ENABLE = "tls.enable";
    public static final String TLS_CONFIG_FILE = "tls.config.file";
    public static final String TLS_TEST_MODE_ENABLE = "tls.test.mode.enable";

    public static final String TLS_SERVER_NEED_CLIENT_AUTH = "tls.server.need.client.auth";
    public static final String TLS_SERVER_KEYPATH = "tls.server.keyPath";
    public static final String TLS_SERVER_KEYPASSWORD = "tls.server.keyPassword";
    public static final String TLS_SERVER_CERTPATH = "tls.server.certPath";
    public static final String TLS_SERVER_AUTHCLIENT = "tls.server.authClient";
    public static final String TLS_SERVER_TRUSTCERTPATH = "tls.server.trustCertPath";

    public static final String TLS_CLIENT_KEYPATH = "tls.client.keyPath";
    public static final String TLS_CLIENT_KEYPASSWORD = "tls.client.keyPassword";
    public static final String TLS_CLIENT_CERTPATH = "tls.client.certPath";
    public static final String TLS_CLIENT_AUTHSERVER = "tls.client.authServer";
    public static final String TLS_CLIENT_TRUSTCERTPATH = "tls.client.trustCertPath";

每一个配置项的具体信息请查阅源码文档,在此处不具体介绍。

启动服务端

在所有准备工作完成后,我们正式开始ServerBootstrap的构造。这一步只是常规的ServerBootstrap构造过程,我们直接看代码即可。

public void start() {
    // 8  worker线程池,负责执行ChannelHandler链
    this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
        nettyServerConfig.getServerWorkerThreads(),
        new ThreadFactory() {

            private AtomicInteger threadIndex = new AtomicInteger(0);

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
            }
        });

    // 准备共享的ChannelHandler
    prepareSharableHandlers();

    // 配置ServerBootstrap
    ServerBootstrap childHandler =
        this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
            .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
            .option(ChannelOption.SO_BACKLOG, 1024)
            .option(ChannelOption.SO_REUSEADDR, true)
            .option(ChannelOption.SO_KEEPALIVE, false)
            .childOption(ChannelOption.TCP_NODELAY, true)
            .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
            .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
            .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline()
                        .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
                        .addLast(defaultEventExecutorGroup,
                            encoder,
                            new NettyDecoder(),
                            new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
                            connectionManageHandler,
                            serverHandler
                        );
                }
            });

    if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
        childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
    }

    try {
        ChannelFuture sync = this.serverBootstrap.bind().sync();
        InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
        // 8888
        this.port = addr.getPort();
    } catch (InterruptedException e1) {
        throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
    }

    if (this.channelEventListener != null) {
        this.nettyEventExecutor.start();
    }

    // 定期扫描回复表,执行回调方法
    this.timer.scheduleAtFixedRate(new TimerTask() {

        @Override
        public void run() {
            try {
                NettyRemotingServer.this.scanResponseTable();
            } catch (Throwable e) {
                log.error("scanResponseTable exception", e);
            }
        }
    }, 1000 * 3, 1000);
}

以上指定了ServerBootstrap的各种配置项,包括TCP的配置以及本地绑定的端口号,默认为8888

ChannelHandlers

作为服务端,最主要的功能就是接受客户端的请求并进行处理。由于底层通信框架是Netty,因此我们直接看ChannelPipeline中注册的ChannelHandler即可。

在之前的start()方法中,有一个方法为prepareSharableHandlers(),它是一个私有方法,用来准备将要使用的ChannelHandler

    private void prepareSharableHandlers() {
        handshakeHandler = new HandshakeHandler(TlsSystemConfig.tlsMode);
        encoder = new NettyEncoder();
        connectionManageHandler = new NettyConnectManageHandler();
        serverHandler = new NettyServerHandler();
    }

在之后ServerBootstrap的构造过程中,共注册了handshakeHandler, encoder, decoder, IdleStateHandler, connectionManageHandler以及serverHandler六种ChannelHandler,接下来我们逐个介绍它们的用处以及实现。

handshakeHandler

HandshakeHandler的声明如下,它是一个入站处理器,用于处理客户端发送来的消息:

class HandshakeHandler extends SimpleChannelInboundHandler<ByteBuf>

让我们看一下它的核心方法channelRead0,这个方法用于处理客户端发送的消息:

        private static final byte HANDSHAKE_MAGIC_CODE = 0x16;

        @Override
        protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {

            // mark the current position so that we can peek the first byte to determine if the content is starting with
            // TLS handshake
            msg.markReaderIndex();

            byte b = msg.getByte(0);

            if (b == HANDSHAKE_MAGIC_CODE) {
                switch (tlsMode) {
                    case DISABLED:
                        ctx.close();
                        log.warn("Clients intend to establish a SSL connection while this server is running in SSL disabled mode");
                        break;
                    case PERMISSIVE:
                    case ENFORCING:
                        if (null != sslContext) {
                            ctx.pipeline()
                                .addAfter(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, TLS_HANDLER_NAME, sslContext.newHandler(ctx.channel().alloc()))
                                .addAfter(defaultEventExecutorGroup, TLS_HANDLER_NAME, FILE_REGION_ENCODER_NAME, new FileRegionEncoder());
                            log.info("Handlers prepended to channel pipeline to establish SSL connection");
                        } else {
                            ctx.close();
                            log.error("Trying to establish a SSL connection but sslContext is null");
                        }
                        break;

                    default:
                        log.warn("Unknown TLS mode");
                        break;
                }
            } else if (tlsMode == TlsMode.ENFORCING) {
                ctx.close();
                log.warn("Clients intend to establish an insecure connection while this server is running in SSL enforcing mode");
            }

            // reset the reader index so that handshake negotiation may proceed as normal.
            msg.resetReaderIndex();

            try {
                // Remove this handler
                ctx.pipeline().remove(this);
            } catch (NoSuchElementException e) {
                log.error("Error while removing HandshakeHandler", e);
            }

            // Hand over this message to the next .
            ctx.fireChannelRead(msg.retain());
        }

它主要用来进行TLS启动的握手步骤,服务端可以配置是否使用SSL加密,分为三个级别:

  1. DISABLED。禁用SSL,任何SSL握手请求都将被拒绝并且关闭连接。
  2. PERMISSIVE。可选,客户端无论是否使用SSL都会被接受。
  3. ENFORCING。必需,任何不使用SSL的连接都将被拒绝。

如果允许SSL加密,并且客户端发送了SSL握手请求,那么服务端的ChannelPipeline中将会新增两个ChannelHandler,分别为SslHandler以及FileRegionEncoder,第一个用来SSL加密通信,第二个用于处理FileRegion信息,由于SslHandler要求数据形式必须为ByteBuf,因此当使用FileRegion时需要先将其转换为ByteBuf类型的信息。这两个ChannelHandler都将被增加到handshakeHandler后面,增加完成后handshakeHandler将会被移除,因为它的任务已经完成了。

encoder & decoder

这两个处理器已经在之前介绍过,此处不多说。

IdleStateHandler

IdleStateHandlerNetty内建的空闲处理器,RocketMQ对其的构造使用如下:

new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds())

当在给定时间(getServerChannelMaxIdleTimeSeconds())内客户端与服务端之间的连接没有发生读写事件,那么将会发出空闲事件IdleStateEvent,这个事件将会在connectionManageHandler中处理,关闭这条连接。关于IdleStateHandler的具体实现原理,请查阅其源码文档。

connectionManageHandler

连接管理处理器主要用于记录日志,如果在构造NettyRemotingServer时提供了事件监听器,那么当有客户端连接或者失联时,将会发出事件,由监听器进行处理。

serverHandler

核心的业务处理器,核心方法如下:

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
        processMessageReceived(ctx, msg);
    }

    public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
        final RemotingCommand cmd = msg;
        if (cmd != null) {
            switch (cmd.getType()) {
                case REQUEST_COMMAND:
                    processRequestCommand(ctx, cmd);
                    break;
                case RESPONSE_COMMAND:
                    processResponseCommand(ctx, cmd);
                    break;
                default:
                    break;
            }
        }
    }

请求处理主要适用于服务端,服务端持有一个请求处理器表,请求码与处理器一对一映射。当请求到来时,会根据其请求码到表中寻找对应的处理器进行处理,如果没有那么将使用默认处理器。如果不存在默认处理器,那么将会直接发送一个不支持此请求的回复给客户端。

处理器可以通过registerProcessor(final int requestCode, final NettyRequestProcessor processor, final ExecutorService executor)registerDefaultProcessor(final NettyRequestProcessor processor, final ExecutorService executor)两个方法进行注册,第一个根据请求码注册,第二个则是注册默认处理器。

除此以外,用户还可以注册hook,它们会在处理请求前以及处理回复后执行,你可以自定义RPCHook实现类并通过registerRPCHook(RPCHook rpcHook)方法注册。

下面是processRequestCommand方法的实现:

    public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
        // 根据请求码获取对应的处理器,默认使用defaultRequestProcessor
        final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
        final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
        final int opaque = cmd.getOpaque();

        // 如果存在可用的处理器
        if (pair != null) {
            Runnable run = new Runnable() {
                @Override
                public void run() {
                    try {
                        // 执行hook
                        doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
                        // 处理请求,并生成回复
                        final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);
                        // 执行hook
                        doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);

                        // 如果不是单向请求,发送回复
                        if (!cmd.isOnewayRPC()) {
                            if (response != null) {
                                response.setOpaque(opaque);
                                response.markResponseType();
                                try {
                                    ctx.writeAndFlush(response);
                                } catch (Throwable e) {
                                    log.error("process request over, but response failed", e);
                                    log.error(cmd.toString());
                                    log.error(response.toString());
                                }
                            } else {

                            }
                        }
                    } catch (Throwable e) {
                        log.error("process request exception", e);
                        log.error(cmd.toString());

                        if (!cmd.isOnewayRPC()) {
                            final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,
                                RemotingHelper.exceptionSimpleDesc(e));
                            response.setOpaque(opaque);
                            ctx.writeAndFlush(response);
                        }
                    }
                }
            };

            // 默认为false
            if (pair.getObject1().rejectRequest()) {
                final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
                    "[REJECTREQUEST]system busy, start flow control for a while");
                response.setOpaque(opaque);
                ctx.writeAndFlush(response);
                return;
            }

            try {
                // 构造任务并提交到线程池
                final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
                pair.getObject2().submit(requestTask);
            } catch (RejectedExecutionException e) {
                if ((System.currentTimeMillis() % 10000) == 0) {
                    log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel())
                        + ", too many requests and system thread pool busy, RejectedExecutionException "
                        + pair.getObject2().toString()
                        + " request code: " + cmd.getCode());
                }

                if (!cmd.isOnewayRPC()) {
                    final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
                        "[OVERLOAD]system busy, start flow control for a while");
                    response.setOpaque(opaque);
                    ctx.writeAndFlush(response);
                }
            }
        } else {
            String error = " request type " + cmd.getCode() + " not supported";
            final RemotingCommand response =
                RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
            response.setOpaque(opaque);
            ctx.writeAndFlush(response);
            log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);
        }
    }

NettyRequestProcessor接口提供了boolean rejectRequest()方法,用户可以实现它进行限流或者熔断操作,而它的另一个方法RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws Exception则用来处理到来的请求。

如果可以根据请求码找到可用的NettyRequestProcessor,那么这个请求将会被封装并交由业务线程池处理。

回复处理则适用于客户端,关于这个功能我们将在后续介绍,接下来先看一下如何发出一个请求。

客户端

客户端的构造以及启动过程与服务端大体一样,它的构造方法也是做一些准备工作,如下所示:

    public NettyRemotingClient(final NettyClientConfig nettyClientConfig) {
        this(nettyClientConfig, null);
    }

    public NettyRemotingClient(final NettyClientConfig nettyClientConfig,
        final ChannelEventListener channelEventListener) {
        super(nettyClientConfig.getClientOnewaySemaphoreValue(), nettyClientConfig.getClientAsyncSemaphoreValue());
        this.nettyClientConfig = nettyClientConfig;
        this.channelEventListener = channelEventListener;

        // Runtime.getRuntime().availableProcessors()
        int publicThreadNums = nettyClientConfig.getClientCallbackExecutorThreads();
        if (publicThreadNums <= 0) {
            publicThreadNums = 4;
        }

        // 作为默认业务线程池
        this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
            private AtomicInteger threadIndex = new AtomicInteger(0);

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "NettyClientPublicExecutor_" + this.threadIndex.incrementAndGet());
            }
        });

        // 收发消息线程池
        this.eventLoopGroupWorker = new NioEventLoopGroup(1, new ThreadFactory() {
            private AtomicInteger threadIndex = new AtomicInteger(0);

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, String.format("NettyClientSelector_%d", this.threadIndex.incrementAndGet()));
            }
        });

        // 是否使用SSL
        if (nettyClientConfig.isUseTLS()) {
            try {
                sslContext = TlsHelper.buildSslContext(true);
                log.info("SSL enabled for client");
            } catch (IOException e) {
                log.error("Failed to create SSLContext", e);
            } catch (CertificateException e) {
                log.error("Failed to create SSLContext", e);
                throw new RuntimeException("Failed to create SSLContext", e);
            }
        }
    }

准备工作完成后,让我们看一下它的启动过程。

    public void start() {
        this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
            nettyClientConfig.getClientWorkerThreads(),
            new ThreadFactory() {

                private AtomicInteger threadIndex = new AtomicInteger(0);

                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet());
                }
            });

        Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
            .option(ChannelOption.TCP_NODELAY, true)
            .option(ChannelOption.SO_KEEPALIVE, false)
            .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
            .option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
            .option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
            .handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ChannelPipeline pipeline = ch.pipeline();
                    if (nettyClientConfig.isUseTLS()) {
                        if (null != sslContext) {
                            pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));
                            log.info("Prepend SSL handler");
                        } else {
                            log.warn("Connections are insecure as SSLContext is null!");
                        }
                    }
                    pipeline.addLast(
                        defaultEventExecutorGroup,
                        new NettyEncoder(),
                        new NettyDecoder(),
                        new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
                        new NettyConnectManageHandler(),
                        new NettyClientHandler());
                }
            });

        this.timer.scheduleAtFixedRate(new TimerTask() {
            @Override
            public void run() {
                try {
                    NettyRemotingClient.this.scanResponseTable();
                } catch (Throwable e) {
                    log.error("scanResponseTable exception", e);
                }
            }
        }, 1000 * 3, 1000);

        if (this.channelEventListener != null) {
            this.nettyEventExecutor.start();
        }
    }

客户端的线程模型为一个线程负责建立连接以及收发消息,另一个线程池负责ChannelPipeline链中处理器的执行。ChannelPipeline链的结构如下所示:

Head -> SslHandler(optional) -> NettyEncoder -> NettyDecoder -> IdleStateHandler -> NettyConnectManageHandler -> NettyClientHandler -> Tail

其中大部分处理器的功能我们都已经介绍过,而NettyClientHandlerNettyServerHandler的功能也一样。

请求

RocketMQ中定义了三种请求方式:同步请求,异步请求,单向请求。接下来逐个进行介绍。

同步调用

同步调用在发出请求后会进行等待状态,直到收到回复或者超时。因此,RemotingClient#invokeSync方法需要提供一个超时时间的参数,不过大多情况这个超时时间是固定的,所以如果另外提供一个使用默认超时时间的invokeSync方法会更加方便。

    @Override
    public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
        throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
        long beginStartTime = System.currentTimeMillis();
        // 通过netty构造channel
        // Bootstrap#conntect(addr)
        final Channel channel = this.getAndCreateChannel(addr);
        if (channel != null && channel.isActive()) {
            try {
                doBeforeRpcHooks(addr, request);
                long costTime = System.currentTimeMillis() - beginStartTime;
                if (timeoutMillis < costTime) {
                    throw new RemotingTimeoutException("invokeSync call timeout");
                }
                // 发起同步调用
                RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
                doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
                return response;
            } catch (RemotingSendRequestException e) {
                log.warn("invokeSync: send request exception, so close the channel[{}]", addr);
                this.closeChannel(addr, channel);
                throw e;
            } catch (RemotingTimeoutException e) {
                if (nettyClientConfig.isClientCloseSocketIfTimeout()) {
                    this.closeChannel(addr, channel);
                    log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
                }
                log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
                throw e;
            }
        } else {
            this.closeChannel(addr, channel);
            throw new RemotingConnectException(addr);
        }
    }

使用Netty进行通信时需要先与服务端建立连接,在invokeSync中我们需要提供服务端的地址,然后通过getAndCreateChannel方法构造或者获取一个已有的Channel用于通信。我们也可以在NettyRemotingClient实例构造完成后通过void updateNameServerAddressList(final List<String> addrs)方法提前注册一组服务端地址,当我们传入的服务端地址为null时,客户端将从之前注册的服务端地址中选择一个进行通信。当然,这种方式看起来非常不优雅,可以再多加一个invokeSync(RemotingCommand)方法,虽然会出现一些方法数量的膨胀,但在这个场景下并不是大问题。

    private Channel getAndCreateChannel(final String addr) throws InterruptedException {
        if (null == addr) {
            return getAndCreateNameserverChannel();
        }

        ChannelWrapper cw = this.channelTables.get(addr);
        if (cw != null && cw.isOK()) {
            return cw.getChannel();
        }

        return this.createChannel(addr);
    }

    private Channel getAndCreateNameserverChannel() throws InterruptedException {
        // 首次调用时返回null
        String addr = this.namesrvAddrChoosed.get();
        if (addr != null) {
            ChannelWrapper cw = this.channelTables.get(addr);
            // 当前channel可用
            if (cw != null && cw.isOK()) {
                return cw.getChannel();
            }
        }

        final List<String> addrList = this.namesrvAddrList.get();
        // 双重检查锁定
        if (this.lockNamesrvChannel.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
            try {
                addr = this.namesrvAddrChoosed.get();
                if (addr != null) {
                    ChannelWrapper cw = this.channelTables.get(addr);
                    if (cw != null && cw.isOK()) {
                        return cw.getChannel();
                    }
                }

                // 轮询选择
                if (addrList != null && !addrList.isEmpty()) {
                    for (int i = 0; i < addrList.size(); i++) {
                        int index = this.namesrvIndex.incrementAndGet();
                        index = Math.abs(index);
                        index = index % addrList.size();
                        String newAddr = addrList.get(index);

                        this.namesrvAddrChoosed.set(newAddr);
                        log.info("new name server is chosen. OLD: {} , NEW: {}. namesrvIndex = {}", addr, newAddr, namesrvIndex);
                        Channel channelNew = this.createChannel(newAddr);
                        if (channelNew != null) {
                            return channelNew;
                        }
                    }
                }
            } catch (Exception e) {
                log.error("getAndCreateNameserverChannel: create name server channel exception", e);
            } finally {
                this.lockNamesrvChannel.unlock();
            }
        } else {
            log.warn("getAndCreateNameserverChannel: try to lock name server, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
        }

        return null;
    }

    private Channel createChannel(final String addr) throws InterruptedException {
        ChannelWrapper cw = this.channelTables.get(addr);
        if (cw != null && cw.isOK()) {
            cw.getChannel().close();
            channelTables.remove(addr);
        }

        if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
            try {
                boolean createNewConnection;
                cw = this.channelTables.get(addr);
                if (cw != null) {

                    if (cw.isOK()) {
                        cw.getChannel().close();
                        this.channelTables.remove(addr);
                        createNewConnection = true;
                    } else if (!cw.getChannelFuture().isDone()) {
                        createNewConnection = false;
                    } else {
                        this.channelTables.remove(addr);
                        createNewConnection = true;
                    }
                } else {
                    createNewConnection = true;
                }

                if (createNewConnection) {
                    ChannelFuture channelFuture = this.bootstrap.connect(RemotingHelper.string2SocketAddress(addr));
                    log.info("createChannel: begin to connect remote host[{}] asynchronously", addr);
                    cw = new ChannelWrapper(channelFuture);
                    this.channelTables.put(addr, cw);
                }
            } catch (Exception e) {
                log.error("createChannel: create channel exception", e);
            } finally {
                this.lockChannelTables.unlock();
            }
        } else {
            log.warn("createChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
        }

        if (cw != null) {
            ChannelFuture channelFuture = cw.getChannelFuture();
            if (channelFuture.awaitUninterruptibly(this.nettyClientConfig.getConnectTimeoutMillis())) {
                if (cw.isOK()) {
                    log.info("createChannel: connect remote host[{}] success, {}", addr, channelFuture.toString());
                    return cw.getChannel();
                } else {
                    log.warn("createChannel: connect remote host[" + addr + "] failed, " + channelFuture.toString(), channelFuture.cause());
                }
            } else {
                log.warn("createChannel: connect remote host[{}] timeout {}ms, {}", addr, this.nettyClientConfig.getConnectTimeoutMillis(),
                    channelFuture.toString());
            }
        }

        return null;
    }

当传入的服务端地址为null或者上一个选择的服务端连接不可用时,将会在之前注册的服务端地址列表中轮询选取一个连接,连接使用Bootstrap#connect(..)方法。同时,已经构建的Channel将会被封装并缓存到channelTables中,下次与同一个服务端通信时可以直接复用此连接。

与服务端成功建立连接后,才是真正的同步调用过程,这个过程由NettyRemotingAbstract#invokeSyncImpl方法实现。

    public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
        final long timeoutMillis)
        throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
        // 获取请求码
        final int opaque = request.getOpaque();

        try {
            // 构造回复回调
            final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
            // 记录到responseTable
            this.responseTable.put(opaque, responseFuture);
            final SocketAddress addr = channel.remoteAddress();
            // 发送请求并增加回调方法
            channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture f) throws Exception {
                    if (f.isSuccess()) {
                        responseFuture.setSendRequestOK(true);
                        return;
                    } else {
                        responseFuture.setSendRequestOK(false);
                    }

                    // 发送失败,记录失败原因
                    responseTable.remove(opaque);
                    responseFuture.setCause(f.cause());
                    responseFuture.putResponse(null);
                    log.warn("send a request command to channel <" + addr + "> failed.");
                }
            });

            // 等待回复
            RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
            if (null == responseCommand) {
                if (responseFuture.isSendRequestOK()) {
                    throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
                        responseFuture.getCause());
                } else {
                    throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
                }
            }

            return responseCommand;
        } finally {
            this.responseTable.remove(opaque);
        }
    }

如果发送请求成功,那么将会等待给定的超时时间,在这段时间内如果收到回复,那么发送请求的线程将会被唤醒,否则抛出超时异常。

注意NettyClientHandler处理器,当收到服务端的回复时,将调用processMessageReceived方法处理回复。在发送请求时,请求序列号已经被记录到表中,它映射的值是一个ResponseFuture实例。因此,当收到请求对应的回复时,可以通过请求序列号找到对应的ResponseFuture实例,并设置回复值,然后唤醒发送请求的线程。

    public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
        final int opaque = cmd.getOpaque();
        final ResponseFuture responseFuture = responseTable.get(opaque);
        if (responseFuture != null) {
            responseFuture.setResponseCommand(cmd);

            responseTable.remove(opaque);

            // 处理异步请求
            if (responseFuture.getInvokeCallback() != null) {
                executeInvokeCallback(responseFuture);
            } else {
                // 设置response
                responseFuture.putResponse(cmd);
                // 唤醒responseFuture
                responseFuture.release();
            }
        } else {
            log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
            log.warn(cmd.toString());
        }
    }

至此,一个完整的同步请求的发起,服务端的处理、回复以及客户端最后对回复的处理过程已经分析完成。如果你对多线程以及异步有足够的了解,这个过程是比较容易理解的。

异步调用

同步调用是较为常用的请求手段,但是对于某些场景来说,异步请求可以极大提升程序的性能。事实上,同步调用与异步调用的代码实现也极为相似,只在关键部分略有不同。

    @Override
    public void invokeAsync(String addr, RemotingCommand request, long timeoutMillis, InvokeCallback invokeCallback)
        throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException,
        RemotingSendRequestException {
        long beginStartTime = System.currentTimeMillis();
        final Channel channel = this.getAndCreateChannel(addr);
        if (channel != null && channel.isActive()) {
            try {
                doBeforeRpcHooks(addr, request);
                long costTime = System.currentTimeMillis() - beginStartTime;
                if (timeoutMillis < costTime) {
                    throw new RemotingTooMuchRequestException("invokeAsync call timeout");
                }
                this.invokeAsyncImpl(channel, request, timeoutMillis - costTime, invokeCallback);
            } catch (RemotingSendRequestException e) {
                log.warn("invokeAsync: send request exception, so close the channel[{}]", addr);
                this.closeChannel(addr, channel);
                throw e;
            }
        } else {
            this.closeChannel(addr, channel);
            throw new RemotingConnectException(addr);
        }
    }

上面是NettyRemotingClient#invokeAsync方法的实现,它与invokeSync方法唯一的区别只是调用了父类的invokeAsyncImpl方法。同时,如果你选择异步请求,你需要自己提供一个相应的回调InvokeCallback用于处理回复内容。RocketMQ对于异步请求的数量也由限制,默认可以同时发起65535个异步请求,这个数量通过Semaphore类进行限制,你可以配置com.rocketmq.remoting.clientAsyncSemaphoreValue系统属性来自定义这个最大限制值。

    public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis,
        final InvokeCallback invokeCallback)
        throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
        long beginStartTime = System.currentTimeMillis();
        final int opaque = request.getOpaque();
        // 尝试获取请求资格
        boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
        if (acquired) {
            final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);
            long costTime = System.currentTimeMillis() - beginStartTime;
            // 如果执行到此步时已经超时,那么直接抛出超时异常,fail fast
            if (timeoutMillis < costTime) {
                once.release();
                throw new RemotingTimeoutException("invokeAsyncImpl call timeout");
            }

            final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis - costTime, invokeCallback, once);
            this.responseTable.put(opaque, responseFuture);
            try {
                channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture f) throws Exception {
                        if (f.isSuccess()) {
                            responseFuture.setSendRequestOK(true);
                            return;
                        }
                        requestFail(opaque);
                        log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel));
                    }
                });
            } catch (Exception e) {
                responseFuture.release();
                log.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e);
                throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
            }
        } else {
            // 在给定时间内没有获取请求资格,抛出相应异常
            if (timeoutMillis <= 0) {
                throw new RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast");
            } else {
                String info =
                    String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d",
                        timeoutMillis,
                        this.semaphoreAsync.getQueueLength(),
                        this.semaphoreAsync.availablePermits()
                    );
                log.warn(info);
                throw new RemotingTimeoutException(info);
            }
        }
    }

异步请求的实现与同步请求也是大体相似,只是增加了获取请求资格的步骤,以及去除了等待回复的过程。

收到回复的处理方法依然是processResponseCommand,因为在请求时注册了对应的回调InvokeCallback,所以直接调用executeInvokeCallback方法。

    public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
        final int opaque = cmd.getOpaque();
        final ResponseFuture responseFuture = responseTable.get(opaque);
        if (responseFuture != null) {
            responseFuture.setResponseCommand(cmd);

            responseTable.remove(opaque);

            // 处理异步请求
            if (responseFuture.getInvokeCallback() != null) {
                executeInvokeCallback(responseFuture);
            } else {
                // 设置response
                responseFuture.putResponse(cmd);
                // 唤醒responseFuture
                responseFuture.release();
            }
        } else {
            log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
            log.warn(cmd.toString());
        }
    }

    private void executeInvokeCallback(final ResponseFuture responseFuture) {
        boolean runInThisThread = false;
        // 获取执行回调方法的线程池
        ExecutorService executor = this.getCallbackExecutor();
        // 如果存在,那么交由线程池处理,否则直接在当前线程处理
        if (executor != null) {
            try {
                executor.submit(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            // 调用注册的回调方法
                            responseFuture.executeInvokeCallback();
                        } catch (Throwable e) {
                            log.warn("execute callback in executor exception, and callback throw", e);
                        } finally {
                            responseFuture.release();
                        }
                    }
                });
            } catch (Exception e) {
                runInThisThread = true;
                log.warn("execute callback in executor exception, maybe executor busy", e);
            }
        } else {
            runInThisThread = true;
        }

        if (runInThisThread) {
            try {
                responseFuture.executeInvokeCallback();
            } catch (Throwable e) {
                log.warn("executeInvokeCallback Exception", e);
            } finally {
                responseFuture.release();
            }
        }
    }

当处理完异步请求的回复后,将会调用Semaphore#release()方法释放一个异步请求资格。

单向调用

单向调用与异步调用类似,也需要获取请求资格,默认最大限制也是65535,可以设置系统属性com.rocketmq.remoting.clientOnewaySemaphoreValue来改变默认值。另外,单向调用无需服务端发送回复。

具体实现如下,不多讲述。

    @Override
    public void invokeOneway(String addr, RemotingCommand request, long timeoutMillis) throws InterruptedException,
        RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
        final Channel channel = this.getAndCreateChannel(addr);
        if (channel != null && channel.isActive()) {
            try {
                doBeforeRpcHooks(addr, request);
                this.invokeOnewayImpl(channel, request, timeoutMillis);
            } catch (RemotingSendRequestException e) {
                log.warn("invokeOneway: send request exception, so close the channel[{}]", addr);
                this.closeChannel(addr, channel);
                throw e;
            }
        } else {
            this.closeChannel(addr, channel);
            throw new RemotingConnectException(addr);
        }
    }
    public void invokeOnewayImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis)
        throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
        request.markOnewayRPC();
        boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
        if (acquired) {
            final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreOneway);
            try {
                channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture f) throws Exception {
                        once.release();
                        if (!f.isSuccess()) {
                            log.warn("send a request command to channel <" + channel.remoteAddress() + "> failed.");
                        }
                    }
                });
            } catch (Exception e) {
                once.release();
                log.warn("write send a request command to channel <" + channel.remoteAddress() + "> failed.");
                throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
            }
        } else {
            if (timeoutMillis <= 0) {
                throw new RemotingTooMuchRequestException("invokeOnewayImpl invoke too fast");
            } else {
                String info = String.format(
                    "invokeOnewayImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d",
                    timeoutMillis,
                    this.semaphoreOneway.getQueueLength(),
                    this.semaphoreOneway.availablePermits()
                );
                log.warn(info);
                throw new RemotingTimeoutException(info);
            }
        }
    }

结语

我们已经介绍完RocketMQ通信模块的核心功能,如果你能理解它的大体实现,那么阅读其他模块的源码也会轻松许多。当然,这个模块的具体实现也有着许多问题,无论代码质量还是接口设计上都存在许多优化的空间,不过,它的整体通信架构依然是值得我们学习以及借鉴的。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,875评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,569评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,475评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,459评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,537评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,563评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,580评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,326评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,773评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,086评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,252评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,921评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,566评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,190评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,435评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,129评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,125评论 2 352