生产端-初始化

源码版本 4.6.0

先看一个简单消息发送的例子:

public static void main(String[] args) throws MQClientException, InterruptedException {

        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
        producer.start();

        for (int i = 0; i < 128; i++)
            try {
                {
                    Message msg = new Message("TopicTest",
                        "TagA",
                        "OrderID188",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    SendResult sendResult = producer.send(msg);
                    System.out.printf("%s%n", sendResult);
                }

            } catch (Exception e) {
                e.printStackTrace();
            }

        producer.shutdown();
    }

在进行消息发送,即producer.send(msg)之前,需要启动Producer,可以猜想下在启动Prodducer中完成了消息发送端初始化操作,本文就是对初始化进行分析。

org.apache.rocketmq.client.producer.DefaultMQProducer#start

public void start() throws MQClientException {
        // 设置生产者组
        this.setProducerGroup(withNamespace(this.producerGroup));
        // 核心初始化方法
        this.defaultMQProducerImpl.start();
        if (null != traceDispatcher) {
            try {
                // 消息轨迹相关
                traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
            } catch (MQClientException e) {
                log.warn("trace dispatcher start failed ", e);
            }
        }
    }

org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#start(boolean)

public void start(final boolean startFactory) throws MQClientException {
        switch (this.serviceState) {
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;
                // 检查生产者组名是否符合规范,不为空且不为默认组名 DEFAULT_PRODUCER    
                this.checkConfig();

                // 更换生产者实例名称,这个待①说明
                if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
                    this.defaultMQProducer.changeInstanceNameToPID();
                }

                // 获取MQ客户端工厂,注意这个MQClientManager是单例模式的,②补充
                this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);

                // 注册该生产者,③处说明
                boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
                if (!registerOK) {
                    this.serviceState = ServiceState.CREATE_JUST;
                    throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
                        + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                        null);
                }
                
                // 如果注册成功,则加入自动创建主题的内置Topic
                this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());

                // 启动客户端 ④处补充
                if (startFactory) {
                    mQClientFactory.start();
                }

                log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
                    this.defaultMQProducer.isSendMessageWithVIPChannel());
                this.serviceState = ServiceState.RUNNING;
                break;
            case RUNNING:
            case START_FAILED:
            case SHUTDOWN_ALREADY:
                throw new MQClientException("The producer service state not OK, maybe started once, "
                    + this.serviceState
                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                    null);
            default:
                break;
        }

        // 开始发送心跳 ⑤
        this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();

        // 扫描过期请求 ⑥
        this.timer.scheduleAtFixedRate(new TimerTask() {
            @Override
            public void run() {
                try {
                    RequestFutureTable.scanExpiredRequest();
                } catch (Throwable e) {
                    log.error("scan RequestFutureTable exception", e);
                }
            }
        }, 1000 * 3, 1000);
    }
①处说明:
public void changeInstanceNameToPID() {
      if (this.instanceName.equals("DEFAULT")) {
          this.instanceName = String.valueOf(UtilAll.getPid());
      }
  }

在未设置系统参数rocketmq.client.name的时候,默认instanceName为DEFAULT,如果未进行设置,则设置为进程ID,即启动JVM进程的ID

为啥这么干?我的思考点主要是以下两点:

  1. 保证同一个JVM中,获取的mQClientFactory只有一份,获取mQClientFactory的参数是以Instance拼接的字符串,如果Instance保持一致,就可保证在同一个JVM中,只会创建一个客户端工厂。这个有什么好处,首先mQClientFactory中包含了网络组件,定时任务组件,消息拉取组件等,如果都是依据创建一个实例就获取一个新的实例工厂,那么在JVM中可能存在多套相同的功能组件,这样即造成了资源浪费,也可能使得一些内部任务执行错乱。
  2. 不同JVM中的生产者实例能区别开
②处说明:

首先MQClientManager是单例的,也就是一个JVM中只会存在一个实例,接着看getOrCreateMQClientInstance方法,首先构建实例ID:

String clientId = clientConfig.buildMQClientId();
            |
            |
            v
public String buildMQClientId() {
        StringBuilder sb = new StringBuilder();
        // 获取客户端IP
        sb.append(this.getClientIP());
        
        sb.append("@");
        // 拼装InstanceName,一般情况下就是进程ID
        sb.append(this.getInstanceName());
        // 设置unitName 一般为空,可在Producer上设置
        if (!UtilAll.isBlank(this.unitName)) {
            sb.append("@");
            sb.append(this.unitName);
        }

        return sb.toString();
    }            

总的来说clientId = IP + @ + instanceName + unitName,接着就拿这个clientId去缓存中寻找,如果没有,就进行创建。
主要实例化的组件包含这几个:

  • mQClientAPIImpl (Netty通讯组件)
  • pullMessageService (消息拉取组件)
  • rebalanceService (重平衡组件)
  • consumerStatsManager (消费信息统计组件)

顺带说一句,DefaultMQProducer和DefaultMQProducerImpl的关系,可以这么理解,两者之间互相包含,DefaultMQProducer继承了ClientConfig,更相当于一个实例自定义配置类的角色,DefaultMQProducerImpl实现MQProducerInner,消息发送主要逻辑是在这里面完成的。

③处说明:

注册该主题及对应的生产者实例,也就是在Map中放入该数据,即:

MQProducerInner prev = this.producerTable.putIfAbsent(group, producer);
if (prev != null) {
    log.warn("the producer group[{}] exist already.", group);
     return false;
}

注意这里用的是putIfAbsent,如果生产者重复启动,或者组名相同的生产者启动,都会注册失败,触发警告,并启动失败。

抛出的异常:

 new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
                        + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                        null);
④处说明:

获取的客户端实例启动,这个是真正的启动工作线程

public void start() throws MQClientException {

        synchronized (this) {
            switch (this.serviceState) {
                case CREATE_JUST:
                    this.serviceState = ServiceState.START_FAILED;
                    // If not specified,looking address from name server
                    // 如果Producer未设置nameServer地址,则进行远端拉取
                    if (null == this.clientConfig.getNamesrvAddr()) {
                        this.mQClientAPIImpl.fetchNameServerAddr();
                    }
                    // Start request-response channel
                    // 通讯组件启动
                    this.mQClientAPIImpl.start();
                    // Start various schedule tasks
                    // 定时任务启动
                    this.startScheduledTask();
                    // Start pull service
                    // 拉取线程启动
                    this.pullMessageService.start();
                    // Start rebalance service
                    // 重平衡启动
                    this.rebalanceService.start();
                    // Start push service
                    // 启动生产客户端
                    this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                    log.info("the client factory [{}] start OK", this.clientId);
                    this.serviceState = ServiceState.RUNNING;
                    break;
                case START_FAILED:
                    throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
                default:
                    break;
            }
        }
    }

4.1 注意当Producer为配置NameServer地址的时候,则进行远端拉取,这个作用相当大,这个就让线上环境动态对NameServer扩容,迁移成为可能

public String fetchNameServerAddr() {
        try {
            String addrs = this.topAddressing.fetchNSAddr();
            if (addrs != null) {
                if (!addrs.equals(this.nameSrvAddr)) {
                    log.info("name server address changed, old=" + this.nameSrvAddr + ", new=" + addrs);
                    this.updateNameServerAddressList(addrs);
                    this.nameSrvAddr = addrs;
                    return nameSrvAddr;
                }
            }
        } catch (Exception e) {
            log.error("fetchNameServerAddr Exception", e);
        }
        return nameSrvAddr;
    }

根据设置的NameServer路由拉取地址进行拉取,地址拼接如下:

public static String getWSAddr() {
        String wsDomainName = System.getProperty("rocketmq.namesrv.domain", DEFAULT_NAMESRV_ADDR_LOOKUP);
        String wsDomainSubgroup = System.getProperty("rocketmq.namesrv.domain.subgroup", "nsaddr");
        String wsAddr = "http://" + wsDomainName + ":8080/rocketmq/" + wsDomainSubgroup;
        if (wsDomainName.indexOf(":") > 0) {
            wsAddr = "http://" + wsDomainName + "/rocketmq/" + wsDomainSubgroup;
        }
        return wsAddr;
    }

4.2 定时任务启动

private void startScheduledTask() {
        if (null == this.clientConfig.getNamesrvAddr()) {
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

                @Override
                public void run() {
                    try {
                       // 拉取NameSever地址 MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
                    } catch (Exception e) {
                        log.error("ScheduledTask fetchNameServerAddr exception", e);
                    }
                }
            }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
        }

        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                try {
                // 更新Topic路由地址
                    MQClientInstance.this.updateTopicRouteInfoFromNameServer();
                } catch (Exception e) {
                    log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
                }
            }
        }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);

        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                try {
                   // 清除下线Broker,发送心跳
                    MQClientInstance.this.cleanOfflineBroker();
                    MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
                } catch (Exception e) {
                    log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
                }
            }
        }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);

        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                try {
                   // 定时持久化消费进度,对于广播模式很重要
                    MQClientInstance.this.persistAllConsumerOffset();
                } catch (Exception e) {
                    log.error("ScheduledTask persistAllConsumerOffset exception", e);
                }
            }
        }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);

        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                try {
                    // 调整线程池 空实现,没啥用
                    MQClientInstance.this.adjustThreadPool();
                } catch (Exception e) {
                    log.error("ScheduledTask adjustThreadPool exception", e);
                }
            }
        }, 1, 1, TimeUnit.MINUTES);
    }

总结下:

  • 拉取NameServer地址,延时10S,频率2min
  • 更新主题路由信息,延时10ms,频率30S
  • 向Broker发送心跳,延时1S,频率30S
  • 消费进度持久化,延时1S,频率5S
  • 动态调整线程池,不起作用

值得注意的是,定时任务线程池是单线程无界队列类型的,且用的FixedRate模式,实际的执行频率可能不是准确的,有兴趣可以看下ScheduledExecutorService源码

4.3 其余的组件启动和消费相关,这里先不深入了

⑤处说明:

this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();

启动成功后开始发送心跳,心跳发送的过程是持有锁的,个人感觉主要是避免心跳混乱,特殊用途暂时没联想到。

心跳发送主代码:

 private void sendHeartbeatToAllBroker() {
        // 准备心跳发送包,主要是消费订阅配置和生产者配置等信息,这个后续再详细讨论
        final HeartbeatData heartbeatData = this.prepareHeartbeatData();
        final boolean producerEmpty = heartbeatData.getProducerDataSet().isEmpty();
        final boolean consumerEmpty = heartbeatData.getConsumerDataSet().isEmpty();
        if (producerEmpty && consumerEmpty) {
            log.warn("sending heartbeat, but no consumer and no producer");
            return;
        }
        // 获取所有的Broker地址
        if (!this.brokerAddrTable.isEmpty()) {
            long times = this.sendHeartbeatTimesTotal.getAndIncrement();
            Iterator<Entry<String, HashMap<Long, String>>> it = this.brokerAddrTable.entrySet().iterator();
            while (it.hasNext()) {
                Entry<String, HashMap<Long, String>> entry = it.next();
                String brokerName = entry.getKey();
                HashMap<Long, String> oneTable = entry.getValue();
                if (oneTable != null) {
                    for (Map.Entry<Long, String> entry1 : oneTable.entrySet()) {
                        Long id = entry1.getKey();
                        String addr = entry1.getValue();
                        if (addr != null) {
                            if (consumerEmpty) {
                                if (id != MixAll.MASTER_ID)
                                    continue;
                            }

                            try {
                                // 发送心跳,超时3S
                                int version = this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000);
                                if (!this.brokerVersionTable.containsKey(brokerName)) {
                                    this.brokerVersionTable.put(brokerName, new HashMap<String, Integer>(4));
                                }
                               // 更新版本号 this.brokerVersionTable.get(brokerName).put(addr, version);
                                if (times % 20 == 0) {
                                    log.info("send heart beat to broker[{} {} {}] success", brokerName, id, addr);
                                    log.info(heartbeatData.toString());
                                }
                            } catch (Exception e) {
                                if (this.isBrokerInNameServer(addr)) {
                                    log.info("send heart beat to broker[{} {} {}] failed", brokerName, id, addr, e);
                                } else {
                                    log.info("send heart beat to broker[{} {} {}] exception, because the broker not up, forget it", brokerName,
                                        id, addr, e);
                                }
                            }
                        }
                    }
                }
            }
        }
    }

这块值得注意的是:

  • 如果当前JVM中只有生产者实例,那么只向主节点发送心跳。
  • 如果当前JVM即存在生产者又存在消费者,那么就向所有节点发送心跳。这个和消息发送逻辑,消息消费逻辑有关,后期再谈。

从这也可以学到,只要涉及到网络请求,请加上超时,为了你的服务稳定!

⑤处说明:

移除过期请求,这个requestFutureTable的填充涉及的API:

org.apache.rocketmq.client.producer.DefaultMQProducer#request(org.apache.rocketmq.common.message.Message, long)

从方法说明上看是发送消息,在等到该消息消费后再返回,提供异步和同步模式的API,改API在生产上没实际用过,关于这个就不过多讲解了。

最后提供一个总图:


启动流程-Producer.png
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,457评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,837评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,696评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,183评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,057评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,105评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,520评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,211评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,482评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,574评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,353评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,213评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,576评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,897评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,174评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,489评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,683评论 2 335

推荐阅读更多精彩内容