RocketMQ基础篇 Producer发送消息

RocketMQ基础篇 Producer发送消息 流程图.png

生产者发送消息的主要流程图如上图所示。具体的代码由于比较多,我就不在这边贴出来的。
主要讲一下我认为比较重要的点

消息队列负载均衡

Producer会每隔30s从Namesrv获取最新的Topic路由信息,并缓存到本地

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

    @Override
    public void run() {
        try {
            // 每个30s从NameServer更新路由表信息
            MQClientInstance.this.updateTopicRouteInfoFromNameServer();
        } catch (Exception e) {
            log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
        }
    }
}, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);

路由信息就是用来发送时选择具体的Broker和队列的

private SendResult sendSelectImpl(
    Message msg,
    MessageQueueSelector selector,
    Object arg,
    final CommunicationMode communicationMode,
    final SendCallback sendCallback, final long timeout
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    long beginStartTime = System.currentTimeMillis();
    this.makeSureStateOK();
    Validators.checkMessage(msg, this.defaultMQProducer);
    
    // 获取Topic队列信息
    // 此处的流程:
    // 先从本地缓存中获取,获取到返回
    // 没有获取到.从NameSrv中获取,获取到返回
    // 没有获取到.如果能够自动创建Topic,会把消息放到TBW102.后续会有自动创建Topic的逻辑
    TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
    if (topicPublishInfo != null && topicPublishInfo.ok()) {
        MessageQueue mq = null;
        try {
        List<MessageQueue> messageQueueList =
        mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList());
        Message userMessage = MessageAccessor.cloneMessage(msg);
        String userTopic = NamespaceUtil.withoutNamespace(userMessage.getTopic(), mQClientFactory.getClientConfig().getNamespace());
        userMessage.setTopic(userTopic);
    
        mq = mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg));
        } catch (Throwable e) {
            throw new MQClientException("select message queue threw exception.", e);
        }
        // 省略代码....
    }
    // 省略代码....
}

此处流程可以总结为:

  • 尝试获取队列列表
    • 从本地缓存中获取,获取到返回
    • 没有获取到。从NameSrv中获取,获取到返回
    • 没有获取到。获取TBW102的队列,获取到返回。后续会有自动创建Topic的逻辑
  • 选择其中一个队列 selector 策略

关于异步消息

异步消息需要我们单独加一个回调方法,添加在发送消息成功/失败的一些处理。
因为异步消息没有对Broker回来的结果进行额外的处理,那么自然我们就不能像同步消息一样,对Broker返回回来的结果单独针对SendResult进行单独的重试操作。所以需要我们在失败的回调方法上进行额外的处理(例如重试消息发送)
具体的原因,我们可以看下消息发送的主干逻辑

消息发送的主干逻辑

另外我在这里贴一下消息发送的主干代码(具体的代码和它们的注释可以看下我的github 我是一个超链接

// ... 省略部分代码
// 查找主题路由信息
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
    // ... 省略部分代码
    for (; times < timesTotal; times++) {
      // 上次发送的broker名称
      String lastBrokerName = null == mq ? null : mq.getBrokerName();
      // 选择一条messagequeue(轮询+失败规避)
      MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
      if (mqSelected != null) {
        // ... 省略部分代码
        try {
          // ... 省略部分代码
          sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
          endTimestamp = System.currentTimeMillis();
          // 失败规避
          this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
          switch (communicationMode) {
            case ASYNC:
              return null;
            case ONEWAY:
              return null;
            case SYNC:
              if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                // 发送失败重试
                if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                  continue;
                }
              }

              return sendResult;
            default:
              break;
          }
        } catch (/** 一系列异常处理 **/) {

        }
      }
    }
}

流程:

  • 尝试获取Topic路由信息
  • 选择一个队列(默认是轮询+失败规避)
  • 发送消息,失败会重试

失败规避

上面提到了失败规避,到底什么是失败规避呢?
在我们一次消息发送过程中,消息有可能发送失败。在消息发送失败,重试时选择发送消息的队列时,就会规避上次MessageQueue所在的Broker。这样能够减少很多不必要的请求(因为Broker宕机后,很大情况下这个Broker短时间内依旧是无法使用的)
那么,为什么会有宕机的Broker在我们的内存中存在?
因为NameSrv是根据心跳检测来确定Broker是否可用的(有间隔 10s),且消息生产者更新路由信息也是有间隔的(30s)。且为了Namesrv设计的简单,Namesrv不会主动将Broker宕机的信息推给消息生产者,而是需要消息生产者定时更新的时候,才会感知到Broker宕机。
在这期间存在误差,所以我们是要一个机制(失败规避策略)来减少一些不必要的性能消耗
另外,失败规避默认是关闭的

private boolean sendLatencyFaultEnable = false;

public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
    if (this.sendLatencyFaultEnable) {
        // 默认以30s作为computeNotAvailableDuration的参数
        long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
        this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
    }
}

关于发送端消息的可靠性

发送端消息的可靠性主要是靠消息发送重试
RocketMQ的架构中,可能会存在多个Broker为某个topic提供服务,这个topic的消息被存放在多个Broker下。(有点类似于Redis的Cluster)
当生产者往某个Broker发送消息失败时,会进行失败规避,选择其他提供服务的Broker,进行发送消息。确保消息能够被稳定的送到Broker中

消息重新发送,消息的msgid会发生变化吗

msgId,对于客户端来说msgId是由客户端producer实例端生成的,具体来说,调用方法MessageClientIDSetter.createUniqIDBuffer()生成唯一的Id;

//for MessageBatch,ID has been set in the generating process
if (!(msg instanceof MessageBatch)) {
    // 为消息分配唯一的全局id
    MessageClientIDSetter.setUniqID(msg);
}
public static void setUniqID(final Message msg) {
    // 因为有了这个判读,所以我们重试的时候,消息id是不会发生变化的
    if (msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX) == null) {
        msg.putProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, createUniqID());
    }
}

public static final String PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX = "UNIQ_KEY";

但是,offsetMsgId是会发生变化的


RocketMQ基础篇 Producer发送消息 msgid.png
  • msgId,对于客户端来说msgId是由客户端producer实例端生成的,具体来说,调用方法MessageClientIDSetter.createUniqIDBuffer()生成唯一的Id;
  • offsetMsgId,offsetMsgId是由Broker服务端在写入消息时生成的(采用”IP地址+Port端口”与“CommitLog的物理偏移量地址”做了一个字符串拼接),其中offsetMsgId就是在RocketMQ控制台直接输入查询的那个messageId。

消息发送的时候,如何自己选择消息推送的队列

发送消息的时候可以自定义一个MessageQueueSelector,就可以自己选推到那个队列,实现顺序消息的需求了

void send(final Message msg, final MessageQueueSelector selector, final Object arg,
        final SendCallback sendCallback) throws MQClientException, RemotingException,
        InterruptedException;
private SendResult sendSelectImpl(
        Message msg,
        MessageQueueSelector selector,
        Object arg,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback, final long timeout
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        long beginStartTime = System.currentTimeMillis();
        this.makeSureStateOK();
        Validators.checkMessage(msg, this.defaultMQProducer);

        TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
        if (topicPublishInfo != null && topicPublishInfo.ok()) {
            MessageQueue mq = null;
            try {
                List<MessageQueue> messageQueueList =
                    mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList());
                Message userMessage = MessageAccessor.cloneMessage(msg);
                String userTopic = NamespaceUtil.withoutNamespace(userMessage.getTopic(), mQClientFactory.getClientConfig().getNamespace());
                userMessage.setTopic(userTopic);

                mq = mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg));
            } catch (Throwable e) {
                throw new MQClientException("select message queue threw exception.", e);
            }

            long costTime = System.currentTimeMillis() - beginStartTime;
            if (timeout < costTime) {
                throw new RemotingTooMuchRequestException("sendSelectImpl call timeout");
            }
            if (mq != null) {
                return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout - costTime);
            } else {
                throw new MQClientException("select message queue return null.", null);
            }
        }

        validateNameServerSetting();
        throw new MQClientException("No route info for this topic, " + msg.getTopic(), null);
    }

有一个注意点:这种基于selector方式的,发送消息失败是不会重试的
原因:我的理解是我们自己自定义的selector,在没有规则策略的前提下,大概率还是选择到这个失败的队列里面。但是如果有规避策略的话,又和我们定义selector的本意违背了(例如我要实现顺序消息,结果失败重试把它丢到了其他队列,违背了顺序消息的本意)

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,384评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,845评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,148评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,640评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,731评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,712评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,703评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,473评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,915评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,227评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,384评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,063评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,706评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,302评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,531评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,321评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,248评论 2 352