RocketMQ系列(三):producer

rocketMq消息体

消息体.png

properties扩展中存了什么呢

  • tag: 消息tag,用于消息过滤

  • keys:message索引键,多个空格隔开,rocketMq可以根据这些key快速检索到消息。具体怎么快速检索,在消息存储章节细聊。

  • waitStoreMsgOk: 消息发送时是否等消息存储完成后再返回

  • delayTimeLevel: 消息延迟级别,用于定时消息和消息重试

producer启动过程

启动过程就是新建一个MQClientInstance实例。整个JVM实例中只存在一个MQClientInstance实例。

clientId为 本机ip + JVM线程id。
MQClientInstance作用是:与nameServer交互。网络请求,心跳检测。

发送消息过程

1、获取路由信息

路由信息其实就是消息队列列表
topic路由信息会缓存在producer中,以一个list变量的形式存在在内存中。
如果本地没有topic路由信息,向nameServer发送请求,获取路由信息,更新本地路由表。

所以问题了,本地有了,这个路由表什么时候更新呢?
起一个定时任务,每隔30s从nameServer获取topic路由表,更新本地路由。
producer会跟topic涉及的所有broker建立长连接,没隔30秒发送一个心跳。
broker端也会每10秒扫描一次注册的producer,如果2分钟没有心跳,则断开连接。

2、按照负载均衡策略,选择路由

2.1 默认投递方式:轮训

  • 对queue list进行排序
  • 获取一个全局自增的计数变量。获取一次自增一次。
  • 用这个变量对队列size取模
  • 模到了几,就选择哪个队列。因为变量是自增的,所以模的值也是根据队列size自增的,也就是轮训的。
  • 当自增值增加到int最大值后,该值重置为0

2.2 默认投递方式增强:轮训算法和延迟最小策略

默认的投递方式比较简单,但是也暴露了一个问题,就是有些queue可能由于自身数量积压等原因,可能投递的过程比较长,就尽量不要选择这样的queue了。rocketMq在每次发送一个MQ消息后,都会统计一下消息投递的时间延迟,根据这个时间延迟,可以知道往哪些queue投递的速度快。在这种场景下,会优先使用消息投递最小延迟策略。

2.3 顺序消息的投递方式

如果不用默认方式,可以自己选择MessageQueueSelector。
recketMq也提供了集中选择器实现,当然也可以自己实现。

生产者在消息投递的过程中,使用了 MessageQueueSelector 作为队列选择的策略接口,其定义如下:

package org.apache.rocketmq.client.producer;

import java.util.List;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;

public interface MessageQueueSelector {
        /**
         * 根据消息体和参数,从一批消息队列中挑选出一个合适的消息队列
         * @param mqs  待选择的MQ队列选择列表
         * @param msg  待发送的消息体
         * @param arg  附加参数
         * @return  选择后的队列
         */
        MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
}
投递策略 策略实现类 说明
随机分配策略 SelectMessageQueueByRandom 使用了简单的随机数选择算法
基于Hash分配策略 SelectMessageQueueByHash 根据附加参数的Hash值,按照消息队列列表的大小取余数,得到消息队列的index
基于机器机房位置分配策略 SelectMessageQueueByMachineRoom 开源的版本没有具体的实现,基本的目的应该是机器的就近原则分配

hash代码实现

public class SelectMessageQueueByHash implements MessageQueueSelector {

    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        int value = arg.hashCode();
        if (value < 0) {
            value = Math.abs(value);
        }

        value = value % mqs.size();
        return mqs.get(value);
    }
}

3、根据选择出的路由,发送消息到broker

发送消息的三种方式

  • 可靠同步发送
    发送者执行发送消息api时,同步等待,直到消息服务器返回发送结果。

  • 可靠异步发送
    发送者执行发送消息api时,指定消息发送成功后的回调函数,然后立即返回,线程不阻塞,消息发送成功或失败,在回调函数(一个新的线程)中执行。

  • 单向发送
    发送者执行发送消息api时,直接返回,也没有回调函数。简单的说,就是只管发,不在乎消息是否成功存储在消息服务器上。

怎么控制是使用哪种方式??
在producer调用send函数的时候,有不同的send函数。

如何保证消息一定发送成功

rocketMq有重试机制。调用api的时候,会返回成功。如果返回不成功,则进行下一次投递,往下一个queue投。直到server端返回了成功。
如果在设置的重置次数用完了,还没成功。那就是真失败了。这个时候用户端就会感知了,会抛异常给用户端。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,444评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,421评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,036评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,363评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,460评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,502评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,511评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,280评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,736评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,014评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,190评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,848评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,531评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,159评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,411评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,067评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,078评论 2 352