RocketMQ基础篇(下)

情绪要么顺服你,要么支配你,这要看谁说了算。

                                                        ——吉米.罗恩

大纲

普通消息
其他消息

玩转各种消息

1.普通消息

    整体流程如下

    >导入MQ客户端依赖

        <dependency>

            <groupId>org.apache.rocketmq</groupId>

            <artifactId>rocketmq-client</artifactId>

            <version>4.8.0</version>

        </dependency>

    >消息发送者步骤

        1.创建消息生产者producer,并指定生产者组名

        2.指定NameServer地址

        3.启动producer

        4.创建消息对象,指定Topic、Tag和消息体

        5.发送消息

        6.关闭生产者producer

    >消息消费者步骤

        1.创建消费者Consumer,指定消费者组名

        2.指定NameServer地址

        3.订阅主题Topic和Tag

        4.设置回调函数,处理消息

        5.启动消费者consumer

(1)消息发送

发送同步消息

图示

    这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。

    代码演示

代码示例
结果示例

    同步发送是指消息发送方发出数据后,同步等待,直到收到接收方发回响应之后才发下一个请求。

    >Message ID

    消息的全局唯一标识(内部机制的ID生成是使用机器IP和消息偏移量的组成,所以有可能重复,如果是幂等性还是最好考虑Key),由消息队列MQ系统自动生成,唯一标识某条信息。

    >SendStatus

    发送的标识。成功,失败等。

    >Queue

    相当于是Topic的分区:用于并行发送和接收消息。

发送异步消息

图示

    异常消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。

    代码演示

代码示例

消息发送方在发送了一条消息后,不等接收方发回响应,接着进行第二条消息发送。发送方通过回调接口的方式接收服务器响应,并对响应结果进行处理。

单向发送

图示

    这种方式主要用在不特别关心发送结果的场景,例如:日志发送。

    代码演示

代码示例

    单向(Oneway)发送特点为发送方只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。

消息发送的权衡

图示

(2)消息消费

集群消费

图示

    消费者的一种消费模式。一个Consumer Group中的各个Consumer实例分摊去消费消息,即一条消息只会投递到一个Consumer Group下面的一个实例。

    实际上,每个Consumer是平均分摊Message Queue的去做拉取消息。例如:某个Topic有3条Q,其中一个Consumer Group有3个实例(可能是3个进程,或者3台机器),那么每个实例只消费其中的1条Q。

    而由Producer发送消息的时候是轮询所有的Q,所以消息会平均散落在不同的Q上,可以认为Q上的消息是平均的。那么实例也就平均地消费消息了。

    这种模式下,消费进度(Consumer Offset)的存储会持久化到Broker

    代码演示

代码示例

广播消费

图示

    消费者的一种消费模式。消息将对一个Consumer Group下的各个Consumer实例都投递一遍。即使这些Consumer属于同一个Consumer Group,消息也会被Consumer Group中的每个Consumer都消费一次。

    实际上,是一个消费组下的每个消费者实例都获取到了topic下面的每个Message Queue去拉取消费。所以消息会投递到每个消费者实例。

    这种模式下,消费进度(Consumer Offset)会存储持久化到实例本地

    代码演示

代码示例

消息消费时的权衡

    集群模式:适用场景&注意事项

    消费端集群化部署,每条消息 只需要被处理一次。

    由于消费进度在服务端维护,可靠性更高。

    集群消费模式下,每一条消息都只会被分发到一台机器上处理。如果需要被集群下的每一台机器都处理,请使用广播模式。

    集群消费模式下,不保证每一次失败重投的消息路由到同一台机器上,因此处理消息时不应该做任何确定性假设。

    广播模式:适用场景&注意事项

    广播消费模式下不支持顺序消息。

    广播消费模式下不支持重置消费位点。

    每条消息都需要被相同逻辑的多台机器处理。

    消费进度在客户端维护,出现重复的概率稍大于集群模式。

    广播模式下,消息队列RocketMQ保证每条消息至少被每台客户端消费一次,但是并不会对消费失败的消息进行失败重投,因此业务方需要关注消费失败的情况。

    广播模式下,客户端每一次重启都会从最新消息消费。客户端在被停止期间发送至服务端的消息将会被自动跳过,请谨慎选择。

    广播模式下,每条消息都会被大量的客户端重复处理,因此推荐尽可能使用集群模式。

    目前仅Java客户端支持广播模式。

    广播模式下服务端不维护消费进度,所以消息队列RocketMQ控制台不支持消息堆积查询、消息堆积报警和订阅关系查询功能。

2.顺序消息

    消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。

    顺序消费的原理解析,在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。

    >全局顺序消息

图示

    >部分顺序消息

图示

(1)顺序消息生产

    一个订单的顺序流程是:创建、付款、推送、完成。订单号相同的消息会被先后发送到同一个队列中,下面是订单进行分区有序的示例代码。

代码示例1


代码示例2
结果

使用顺序消息:首先要保证消息是有序进入MQ的,消息放入MQ之前,对id等关键字进行取模,放入指定messageQueue,consume消费消息失败时,不能返回reconsume--later,这样会导致乱序。

应该返回suspend_current_queue_a_moment,意思是先等一会,一会儿再处理这批消息,而不是放到重试队列里。

(2)顺序消息消费

    消费时,同一个OrderId获取到的肯定是同一个队列。从而确保一个订单中处理的顺序。

代码示例

3.消息发送时的重要方法/属性

(1)属性

org.apache.rocketmq.example.details. ProducerDetails 类中

代码示例

    producerGroup:生产者所属组

    defaultTopicQueueNums:默认主题在每一个Broker队列数量

    sendMsgTimeout:发送消息默认默认超时时间,默认3s

    compressMsgBodyOverHowmuch:消息体超过该值则启用压缩,默认4k

    retryTimesWhenSendFailed:同步方式发送消息重复次数,默认为2,总共执行3次

    retryTimesWhenSendAsyncFailed:异步方法发送消息重试次数,默认为2

    retryAnotherBrokerWhenNotStoreOK:消息重试时选择另外一个Broker时,是否不等待存储结果就返回,默认为false

    maxMessageSize:允许发送的最大消息长度,默认为4M

(2)方法

    org.apache.rocketmq.example.details. ProducerDetails 类中

源码图示
代码示例
结果

单向发送

代码示例
源码图示

同步发送

代码示例
源码图示

异步发送

代码示例
代码示例
源码图示

4.消息消费时的重要方法/属性

    org.apache.rocketmq.example.details. ComuserDetails 类中

(1)属性

代码示例
源码图示

(2)方法

    void subscribe(final String topic, final MessageSelector selector) :订阅消息,并指定队列选择器

    void unsubscribe(final String topic):取消消息订阅

    Set<MessageQueue> fetchSubscribeMessageQueues(final String topic) :获取消费者对主题分配了那些消息队列

    void registerMessageListener(final MessageListenerConcurrently messageListener):注册并发事件监听器

示例代码

void registerMessageListener(final MessageListenerOrderly messageListener):注册顺序消息事件监听器

示例代码

(3)消费确认(ACK)

    业务实现消费回调的时候,当且仅当此回调函数返回 ConsumeConcurrentlyStatus.CONSUME_SUCCESS,RocketMQ 才会认为这批消息(默认是 1 条) 是消费完成的中途断电,抛出异常等都不会认为成功——即都会重新投递。

    返回 ConsumeConcurrentlyStatus.RECONSUME_LATER,RocketMQ 就会认为这批消息消费失败了。

    如果业务的回调没有处理好而抛出异常,会认为是消费失败 ConsumeConcurrentlyStatus.RECONSUME_LATER 处理。

    为了保证消息是肯定被至少消费成功一次,RocketMQ 会把这批消息重发回 Broker(topic 不是原 topic 而是这个消费组的 RETRY topic),在延迟的某个时间点(默认是 10 秒,业务可设置)后,再次投递到这个 ConsumerGroup。而如果一直这样重复消费都持续失败到一定次数(默认 16 次),就会投递到 DLQ 死信队列。应用可以监控死信队列来做人工干预。

    另外如果使用顺序消费的回调 MessageListenerOrderly 时,由于顺序消费是要前者消费成功才能继续消费,所以没有 RECONSUME_LATER 的这个状态, 只有 SUSPEND_CURRENT_QUEUE_A_MOMENT 来暂停队列的其余消费,直到原消息不断重试成功为止才能继续消费。

5.延时消息

(1)概念介绍

    延时消息:Producer 将消息发送到消息队列 RocketMQ 服务端,但并不期望这条消息立马投递,而是延迟一定时间后才投递到 Consumer 进行消费, 该消息即延时消息。

(2)适用场景

    消息生产和消费有时间窗口要求:比如在电商交易中超时未支付关闭订单的场景,在订单创建时会发送一条延时消息。这条消息将会在 30 分钟以后投递给消费者,消费者收到此消息后需要判断对应的订单是否已完成支付。 如支付未完成,则关闭订单。如已完成支付则忽略。

(3)使用方式

    Apache RocketMQ 目前只支持固定精度的定时消息,因为如果要支持任意的时间精度,在 Broker 层面,必须要做消息排序,如果再涉及到持久化, 那么消息排序要不可避免的产生巨大性能开销。(阿里云 RocketMQ 提供了任意时刻的定时消息功能,Apache 的 RocketMQ 并没有,阿里并没有开源)。

    发送延时消息时需要设定一个延时时间长度,消息将从当前发送时间点开始延迟固定时间之后才开始投递。

    延迟消息是根据延迟队列的 level 来的,延迟队列默认是

    msg.setDelayTimeLevel(3)代表延迟 10 秒

    "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"

源码示例

是这 18 个等级(秒(s)、分(m)、小时(h)),level 为 1,表示延迟 1 秒后消费,level 为 5 表示延迟 1 分钟后消费,level 为 18 表示延迟 2 个小时消费。生产消息跟普通的生产消息类似,只需要在消息上设置延迟队列的 level 即可。消费消息跟普通的消费消息一致。

(4)代码演示

    org.apache.rocketmq.example. scheduled 包中

生产者

代码示例

消费者

代码示例

6.批量消息

    批量发送消息能显著提高传递小消息的性能。限制是这些批量消息应该有相同的 topic,相同的 waitStoreMsgOK,而且不能是延时消息。此外,这一批消息的总大小不应超过 4MB。

(1)代码演示

    org.apache.rocketmq.example. batch 包中

生产者

代码示例

消费者

代码示例

(2)批量切分

    如果消息的总长度可能大于 4MB 时,这时候最好把消息进行分割。

代码演示

    我们需要发送 10 万元素的数组,这个量很大,怎么快速发送完。同时每一次批量发送的消息大小不能超过 4M 。

    具体见代码

代码示例

7.过滤消息

    org.apache.rocketmq.example. filter 包中

(1)Tag过滤

    在大多数情况下,TAG 是一个简单而有用的设计,其可以来选择您想要的消息。

代码示例
代码示例

    消费者将接收包含 TAGA 或 TAGB 或 TAGC 的消息。但是限制是一个消息只能有一个标签,这对于复杂的场景可能不起作用。在这种情况下,可以使用 SQL 表达式筛选消息。SQL 特性可以通过发送消息时的属性来进行计算。

(2)Sql过滤

SQL基本语法

RocketMQ 定义了一些基本语法来支持这个特性。你也可以很容易地扩展它。

只有使用 push 模式的消费者才能用使用 SQL92 标准的 sql 语句,常用的语句如下:

    数值比较:比如:>,>=,<,<=,BETWEEN,=;

    字符比较:比如:=,<>,IN;

    IS NULL 或者 IS NOT NULL;

    逻辑符号:AND,OR,NOT;

    常量支持类型为:

    数值,比如:123,3.1415;

    字符,比如:'abc',必须用单引号包裹起来;

    NULL,特殊的常量;

    布尔值,TRUE 或 FALSE;

消息生产者(加入消息属性)

    发送消息时,你能通过 putUserProperty 来设置消息的属性。

代码示例

消息消费者(使用SQL筛选)

    用 MessageSelector.bySql 来使用 sql 筛选消息。

代码示例

如果这个地方抛出错误:说明 Sql92 功能没有开启。

出错结果

需要修改 Broker.conf 配置文件。

加入 enablePropertyFilter=true 然后重启 Broker 服务。

8.事务消息

图示

    其中分为两个流程:正常事务消息的发送及提交、事务消息的补偿流程。

(1)正常事务流程

    (1)发送消息(half 消息):图中步骤 1。

    (2)服务端响应消息写入结果:图中步骤 2。

    (3)根据发送结果执行本地事务(如果写入失败,此时 half 消息对业务不可见,本地逻辑不执行):图中步骤 3。

    (4)根据本地事务状态执行 Commit 或者 Rollback(Commit 操作生成消息索引,消息对消费者可见):图中步骤 4

(2)事务补偿流程

    (1)对没有 Commit/Rollback 的事务消息(pending 状态的消息),从服务端发起一次“回查”:图中步骤 5。

    (2)Producer 收到回查消息,检查回查消息对应的本地事务的状态:图中步骤 6。

    (3)根据本地事务状态,重新 Commit 或者 Rollback::图中步骤 7。

    其中,补偿阶段用于解决消息 Commit 或者 Rollback 发生超时或者失败的情况。

(3)事务消息状态

    事务消息共有三种状态,提交状态、回滚状态、中间状态:

   > TransactionStatus.CommitTransaction: 提交状态,它允许消费者消费此消息(完成图中了 1,2,3,4 步,第 4 步是Commit)。 

    >TransactionStatus.RollbackTransaction: 回滚状态,它代表该消息将被删除,不允许被消费(完成图中了 1,2,3,4 步, 第 4 步是 Rollback)。

    >TransactionStatus.Unknown: 中间状态,它代表需要检查消息队列来确定状态(完成图中了 1,2,3 步, 但是没有 4 或者没有 7,无法 Commit 或 Rollback)。

(4)代码演示

    org.apache.rocketmq.example. transaction 包中

创建事务性生产者

    使用 TransactionMQProducer 类创建生产者,并指定唯一的 ProducerGroup,就可以设置自定义线程池来处理这些检查请求。执行本地事务后、需要根据执行结果对消息队列进行回复。

代码示例

实现事务的监听接口

    当发送半消息成功时,我们使用 executeLocalTransaction 方法来执行本地事务(步骤 3)。它返回前一节中提到的三个事务状态之一。 checkLocalTranscation 方法用于检查本地事务状态(步骤 5),并回应消息队列的检查请求。它也是返回前一节中提到的三个事务状态之一。

示例代码

(5)使用场景

    用户提交订单后,扣减库存成功、扣减优惠券成功、使用余额成功,但是在确认订单操作失败,需要对库存、余额进行回退。如何保证数据的完整性?

    可以使用 RocketMQ 的分布式事务保证在下单失败后系统数据的完整性。

(6)使用限制

    1. 事务消息不支持延时消息和批量消息。

    2.事务回查的间隔时间:BrokerConfig. transactionCheckInterval 通过 Broker 的配置文件设置好。

    3. 为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的 transactionCheckMax 参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = transactionCheckMax ) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写 AbstractTransactionCheckListener 类来修改这个行为。

    4.事务消息将在 Broker 配置文件中的参数 transactionMsgTimeout 这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于transactionMsgTimeout 参数。

    5.事务性消息可能不止一次被检查或消费。

    6.事务性消息中用到了生产者群组,这种就是一种高可用机制,用来确保事务消息的可靠性。

    7.提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过 RocketMQ 本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。

    8. 事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ 服务器能通过它们的生产者ID 查询到消费者。


我是娆疆_蚩梦,让坚持成为一种习惯,感谢各位大佬的:点赞收藏评论,我们下期见!


上一篇:RocketMQ基础篇(上)

下一篇:RocketMQ底层原理之存储设计

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
禁止转载,如需转载请通过简信或评论联系作者。
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,951评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,606评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,601评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,478评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,565评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,587评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,590评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,337评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,785评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,096评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,273评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,935评论 5 339
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,578评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,199评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,440评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,163评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,133评论 2 352

推荐阅读更多精彩内容