RocketMQ学习教程:10.RocketMQ多端口监听【云图智联】

本文主要介绍RocketMQ的多端口监听机制,通过本文,你可以了解到Broker端源码中remotingServer和fastRemotingServer的区别,以及客户端配置中,vipChannelEnabled的作用。

1 多端口监听

在RocketMQ中,可以通过broker.conf配置文件中指定listenPort配置项来指定Broker监听客户端请求的端口,如果不指定,默认监听10911端口。

listenPort=10911

不过,Broker启动时,实际上会监听3个端口:10909、10911、10912,如下所示:

$ lsof -iTCP -nP | grep LISTEN

java  1892656 tianshouzhi.robin   96u  IPv6 14889281  0t0  TCP *:10912 (LISTEN)

java  1892656 tianshouzhi.robin  101u  IPv6 14889285  0t0  TCP *:10911 (LISTEN)

java  1892656 tianshouzhi.robin  102u  IPv6 14889288  0t0  TCP *:10909 (LISTEN)

而其他两个端口是根据listenPort的值,动态计算出来的。这三个端口由Broker内部不同的组件使用,作用分别如下:

    remotingServer:监听listenPort配置项指定的监听端口,默认10911

    fastRemotingServer:监听端口值listenPort-2,即默认为10909

    HAService:监听端口为值为listenPort+1,即10912,该端口用于Broker的主从同步

本文主要聚焦于remotingServer和fastRemotingServer的区别:

Broker端:remotingServer可以处理客户端所有请求,如:生产者发送消息的请求,消费者拉取消息的请求。fastRemotingServer功能基本与remotingServer相同,唯一不同的是不可以处理消费者拉取消息的请求。Broker在向NameServer注册时,只会上报remotingServer监听的listenPort端口。

客户端:默认情况下,生产者发送消息是请求fastRemotingServer,我们也可以通过配置让其请求remotingServer;消费者拉取消息只能请求remotingServer。

下面通过源码进行验证Broker端构建remotingServer和fastRemotingServer时的区别,以及客户端如何配置。

2 Broker端

在BrokerController内部定义了remotingServer和fastRemotingServer两个字段

private RemotingServer remotingServer;

private RemotingServer fastRemotingServer;

在初始化时,在initiallize方法内部会对这两个字段的进行初始化:

BrokerController#initialize

public boolean initialize() throws CloneNotSupportedException {

    boolean result = this.topicConfigManager.load();

    result = result && this.consumerOffsetManager.load();

    result = result && this.subscriptionGroupManager.load();

    result = result && this.consumerFilterManager.load();

    if (result) {..}//加载message store,略

    result = result && this.messageStore.load();

    if (result) {

        //1 remotingServer监听listenPort端口,默认10911

        this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, 

                                                      this.clientHousekeepingService);


        //2 fastRemotingServer监听listenPort-2端口,默认10990

        NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();

        fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);

        this.fastRemotingServer = new NettyRemotingServer(fastConfig, 

                                                    this.clientHousekeepingService);

        //...启动异步线程池,略

        //3 注册请求处理器

        this.registerProcessor();

可以看到,这两个字段实例化时:remotingServer使用了nettyServerConfig配置;而fastRemotingServer将配置克隆了一份,然后只是修改了监听的的端口号,其他不变。

创建完之后remotingServer和fastRemotingServer,会调用registerProcessor注册请求处理器。fastRemotingServer与remotingServer注册的请求处理器类型几乎完全相同,相关源码如下红色框所示:

org.apache.rocketmq.broker.BrokerController#registerProcessor

public void registerProcessor() {

    /**

     * SendMessageProcessor

     */

    SendMessageProcessor sendProcessor = new SendMessageProcessor(this);

    sendProcessor.registerSendMessageHook(sendMessageHookList);

    sendProcessor.registerConsumeMessageHook(consumeMessageHookList);

    this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);

    this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);

    this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor,this.sendMessageExecutor);

    this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);

    this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);

    this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);

    this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);

    this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);

    /**

     * PullMessageProcessor,注意这里只注册到了到了remotingServer中,没有注册到fastRemotingServer

     */

    this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor);

    this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);


    //...

可以看到,唯一不同的是,对于PullMessageProcessor,只在remotingServer中注册了,并没有在fastRemotingServer注册。意味着为fastRemotingServer不可以处理消费者拉取消息的请求(还有很多其他的处理器类型,这里并没有完全列出)。

在明白了fastRemotingServer和remotingServer之后,下面从客户端分析,如何进行选择。

3 客户端

客户端的DefaultMQProducer和DefaultMQPushConsumer都继承了ClientConfig类,这个类中有一些公共的配置项,其中包含一个布尔字段vipChannelEnabled。从字面意思看,其用于控制是否开启VIP通道,如果为true,生产者发送的消息会请求fastRemotingServer,否则请求remotingServer。

在RocketMQ 4.5.0及之前,vipChannelEnabled字段值默认为true。在RocketMQ 4.5.1之后,修改为了false。可以通过JVM参数 -Dcom.rocketmq.sendMessageWithVIPChannel=false,来修改这个默认值。

org.apache.rocketmq.client.ClientConfig

public class ClientConfig {

//是否启用VIP通道

  private boolean vipChannelEnabled = Boolean.parseBoolean(

                System.getProperty(SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY, 

                "true"));

...

  public boolean isVipChannelEnabled() {

      return vipChannelEnabled;

  }

  public void setVipChannelEnabled(final boolean vipChannelEnabled) {

      this.vipChannelEnabled = vipChannelEnabled;

  }

...

}

生产者:

生产者在发送消息时,都会通过DefaultMQProducerImpl#sendKernelImpl方法,这个方法内部会判断是否开启VIP通道,如下图红色框:

private SendResult sendKernelImpl(final Message msg,

    final MessageQueue mq,

    final CommunicationMode communicationMode,

    final SendCallback sendCallback,

    final TopicPublishInfo topicPublishInfo,

    final long timeout) throws MQClientException {

    long beginStartTime = System.currentTimeMillis();

    String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());

    if (null == brokerAddr) {

        tryToFindTopicPublishInfo(mq.getTopic());

        brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());

    }

    SendMessageContext context = null;

    //判断是否开启了VIP通道

    if (brokerAddr != null) {

        brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), 

                                             brokerAddr);

    //...

在开启VIP通道的情况下,会将请求的broker 端口地址-2,改为请求fastRemotingServer,如下所示:

org.apache.rocketmq.common.MixAll#brokerVIPChannel

public static String brokerVIPChannel(final boolean isChange, final String brokerAddr) {

    if (isChange) {

        String[] ipAndPort = brokerAddr.split(":");

        String brokerAddrNew = ipAndPort[0] + ":" 

                               + (Integer.parseInt(ipAndPort[1]) - 2);

        return brokerAddrNew;

    } else {

        return brokerAddr;

    }

}

消费者

消费者拉取消息总是会调用remotingServer,因为PullMessageProcessor只在remotingServer中进行了注册,fastRemotingServer无法处理这个请求,因此并不会修改端口,可参考PullAPIWrapper类。

关于其他请求:

Broker支持很多客户端请求类型,除了发送/拉取消息之外,还包括创建Topic、查询/更新offset,发送心跳信息,查询消费者组ID列表等。具体请求哪个端口,主要是看有没有调用MixAl#brokerVIPChannel方法修改端口。例如对于心跳请求,即使设置了brokerVIPChannel也不起作用,因为心跳请求之前不会修改端口号,因此总是请求remotingServer。

免费学习视频欢迎关注云图智联:https://e.yuntuzhilian.com/

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 220,548评论 6 513
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 94,069评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 166,985评论 0 357
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,305评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,324评论 6 397
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 52,030评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,639评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,552评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 46,081评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,194评论 3 340
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,327评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 36,004评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,688评论 3 332
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,188评论 0 23
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,307评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,667评论 3 375
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,337评论 2 358