情绪要么顺服你,要么支配你,这要看谁说了算。
——吉米.罗恩
大纲
玩转各种消息
1.普通消息
整体流程如下
>导入MQ客户端依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.8.0</version>
</dependency>
>消息发送者步骤
1.创建消息生产者producer,并指定生产者组名
2.指定NameServer地址
3.启动producer
4.创建消息对象,指定Topic、Tag和消息体
5.发送消息
6.关闭生产者producer
>消息消费者步骤
1.创建消费者Consumer,指定消费者组名
2.指定NameServer地址
3.订阅主题Topic和Tag
4.设置回调函数,处理消息
5.启动消费者consumer
(1)消息发送
发送同步消息
这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。
代码演示
同步发送是指消息发送方发出数据后,同步等待,直到收到接收方发回响应之后才发下一个请求。
>Message ID
消息的全局唯一标识(内部机制的ID生成是使用机器IP和消息偏移量的组成,所以有可能重复,如果是幂等性还是最好考虑Key),由消息队列MQ系统自动生成,唯一标识某条信息。
>SendStatus
发送的标识。成功,失败等。
>Queue
相当于是Topic的分区:用于并行发送和接收消息。
发送异步消息
异常消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。
代码演示
消息发送方在发送了一条消息后,不等接收方发回响应,接着进行第二条消息发送。发送方通过回调接口的方式接收服务器响应,并对响应结果进行处理。
单向发送
这种方式主要用在不特别关心发送结果的场景,例如:日志发送。
代码演示
单向(Oneway)发送特点为发送方只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。
消息发送的权衡
(2)消息消费
集群消费
消费者的一种消费模式。一个Consumer Group中的各个Consumer实例分摊去消费消息,即一条消息只会投递到一个Consumer Group下面的一个实例。
实际上,每个Consumer是平均分摊Message Queue的去做拉取消息。例如:某个Topic有3条Q,其中一个Consumer Group有3个实例(可能是3个进程,或者3台机器),那么每个实例只消费其中的1条Q。
而由Producer发送消息的时候是轮询所有的Q,所以消息会平均散落在不同的Q上,可以认为Q上的消息是平均的。那么实例也就平均地消费消息了。
这种模式下,消费进度(Consumer Offset)的存储会持久化到Broker。
代码演示
广播消费
消费者的一种消费模式。消息将对一个Consumer Group下的各个Consumer实例都投递一遍。即使这些Consumer属于同一个Consumer Group,消息也会被Consumer Group中的每个Consumer都消费一次。
实际上,是一个消费组下的每个消费者实例都获取到了topic下面的每个Message Queue去拉取消费。所以消息会投递到每个消费者实例。
这种模式下,消费进度(Consumer Offset)会存储持久化到实例本地。
代码演示
消息消费时的权衡
集群模式:适用场景&注意事项
消费端集群化部署,每条消息 只需要被处理一次。
由于消费进度在服务端维护,可靠性更高。
集群消费模式下,每一条消息都只会被分发到一台机器上处理。如果需要被集群下的每一台机器都处理,请使用广播模式。
集群消费模式下,不保证每一次失败重投的消息路由到同一台机器上,因此处理消息时不应该做任何确定性假设。
广播模式:适用场景&注意事项
广播消费模式下不支持顺序消息。
广播消费模式下不支持重置消费位点。
每条消息都需要被相同逻辑的多台机器处理。
消费进度在客户端维护,出现重复的概率稍大于集群模式。
广播模式下,消息队列RocketMQ保证每条消息至少被每台客户端消费一次,但是并不会对消费失败的消息进行失败重投,因此业务方需要关注消费失败的情况。
广播模式下,客户端每一次重启都会从最新消息消费。客户端在被停止期间发送至服务端的消息将会被自动跳过,请谨慎选择。
广播模式下,每条消息都会被大量的客户端重复处理,因此推荐尽可能使用集群模式。
目前仅Java客户端支持广播模式。
广播模式下服务端不维护消费进度,所以消息队列RocketMQ控制台不支持消息堆积查询、消息堆积报警和订阅关系查询功能。
2.顺序消息
消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。
顺序消费的原理解析,在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。
>全局顺序消息
>部分顺序消息
(1)顺序消息生产
一个订单的顺序流程是:创建、付款、推送、完成。订单号相同的消息会被先后发送到同一个队列中,下面是订单进行分区有序的示例代码。
使用顺序消息:首先要保证消息是有序进入MQ的,消息放入MQ之前,对id等关键字进行取模,放入指定messageQueue,consume消费消息失败时,不能返回reconsume--later,这样会导致乱序。
应该返回suspend_current_queue_a_moment,意思是先等一会,一会儿再处理这批消息,而不是放到重试队列里。
(2)顺序消息消费
消费时,同一个OrderId获取到的肯定是同一个队列。从而确保一个订单中处理的顺序。
3.消息发送时的重要方法/属性
(1)属性
org.apache.rocketmq.example.details. ProducerDetails 类中
producerGroup:生产者所属组
defaultTopicQueueNums:默认主题在每一个Broker队列数量
sendMsgTimeout:发送消息默认默认超时时间,默认3s
compressMsgBodyOverHowmuch:消息体超过该值则启用压缩,默认4k
retryTimesWhenSendFailed:同步方式发送消息重复次数,默认为2,总共执行3次
retryTimesWhenSendAsyncFailed:异步方法发送消息重试次数,默认为2
retryAnotherBrokerWhenNotStoreOK:消息重试时选择另外一个Broker时,是否不等待存储结果就返回,默认为false
maxMessageSize:允许发送的最大消息长度,默认为4M
(2)方法
org.apache.rocketmq.example.details. ProducerDetails 类中
单向发送
同步发送
异步发送
4.消息消费时的重要方法/属性
org.apache.rocketmq.example.details. ComuserDetails 类中
(1)属性
(2)方法
void subscribe(final String topic, final MessageSelector selector) :订阅消息,并指定队列选择器
void unsubscribe(final String topic):取消消息订阅
Set<MessageQueue> fetchSubscribeMessageQueues(final String topic) :获取消费者对主题分配了那些消息队列
void registerMessageListener(final MessageListenerConcurrently messageListener):注册并发事件监听器
void registerMessageListener(final MessageListenerOrderly messageListener):注册顺序消息事件监听器
(3)消费确认(ACK)
业务实现消费回调的时候,当且仅当此回调函数返回 ConsumeConcurrentlyStatus.CONSUME_SUCCESS,RocketMQ 才会认为这批消息(默认是 1 条) 是消费完成的中途断电,抛出异常等都不会认为成功——即都会重新投递。
返回 ConsumeConcurrentlyStatus.RECONSUME_LATER,RocketMQ 就会认为这批消息消费失败了。
如果业务的回调没有处理好而抛出异常,会认为是消费失败 ConsumeConcurrentlyStatus.RECONSUME_LATER 处理。
为了保证消息是肯定被至少消费成功一次,RocketMQ 会把这批消息重发回 Broker(topic 不是原 topic 而是这个消费组的 RETRY topic),在延迟的某个时间点(默认是 10 秒,业务可设置)后,再次投递到这个 ConsumerGroup。而如果一直这样重复消费都持续失败到一定次数(默认 16 次),就会投递到 DLQ 死信队列。应用可以监控死信队列来做人工干预。
另外如果使用顺序消费的回调 MessageListenerOrderly 时,由于顺序消费是要前者消费成功才能继续消费,所以没有 RECONSUME_LATER 的这个状态, 只有 SUSPEND_CURRENT_QUEUE_A_MOMENT 来暂停队列的其余消费,直到原消息不断重试成功为止才能继续消费。
5.延时消息
(1)概念介绍
延时消息:Producer 将消息发送到消息队列 RocketMQ 服务端,但并不期望这条消息立马投递,而是延迟一定时间后才投递到 Consumer 进行消费, 该消息即延时消息。
(2)适用场景
消息生产和消费有时间窗口要求:比如在电商交易中超时未支付关闭订单的场景,在订单创建时会发送一条延时消息。这条消息将会在 30 分钟以后投递给消费者,消费者收到此消息后需要判断对应的订单是否已完成支付。 如支付未完成,则关闭订单。如已完成支付则忽略。
(3)使用方式
Apache RocketMQ 目前只支持固定精度的定时消息,因为如果要支持任意的时间精度,在 Broker 层面,必须要做消息排序,如果再涉及到持久化, 那么消息排序要不可避免的产生巨大性能开销。(阿里云 RocketMQ 提供了任意时刻的定时消息功能,Apache 的 RocketMQ 并没有,阿里并没有开源)。
发送延时消息时需要设定一个延时时间长度,消息将从当前发送时间点开始延迟固定时间之后才开始投递。
延迟消息是根据延迟队列的 level 来的,延迟队列默认是
msg.setDelayTimeLevel(3)代表延迟 10 秒
"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"
是这 18 个等级(秒(s)、分(m)、小时(h)),level 为 1,表示延迟 1 秒后消费,level 为 5 表示延迟 1 分钟后消费,level 为 18 表示延迟 2 个小时消费。生产消息跟普通的生产消息类似,只需要在消息上设置延迟队列的 level 即可。消费消息跟普通的消费消息一致。
(4)代码演示
org.apache.rocketmq.example. scheduled 包中
生产者
消费者
6.批量消息
批量发送消息能显著提高传递小消息的性能。限制是这些批量消息应该有相同的 topic,相同的 waitStoreMsgOK,而且不能是延时消息。此外,这一批消息的总大小不应超过 4MB。
(1)代码演示
org.apache.rocketmq.example. batch 包中
生产者
消费者
(2)批量切分
如果消息的总长度可能大于 4MB 时,这时候最好把消息进行分割。
代码演示
我们需要发送 10 万元素的数组,这个量很大,怎么快速发送完。同时每一次批量发送的消息大小不能超过 4M 。
具体见代码
7.过滤消息
org.apache.rocketmq.example. filter 包中
(1)Tag过滤
在大多数情况下,TAG 是一个简单而有用的设计,其可以来选择您想要的消息。
消费者将接收包含 TAGA 或 TAGB 或 TAGC 的消息。但是限制是一个消息只能有一个标签,这对于复杂的场景可能不起作用。在这种情况下,可以使用 SQL 表达式筛选消息。SQL 特性可以通过发送消息时的属性来进行计算。
(2)Sql过滤
SQL基本语法
RocketMQ 定义了一些基本语法来支持这个特性。你也可以很容易地扩展它。
只有使用 push 模式的消费者才能用使用 SQL92 标准的 sql 语句,常用的语句如下:
数值比较:比如:>,>=,<,<=,BETWEEN,=;
字符比较:比如:=,<>,IN;
IS NULL 或者 IS NOT NULL;
逻辑符号:AND,OR,NOT;
常量支持类型为:
数值,比如:123,3.1415;
字符,比如:'abc',必须用单引号包裹起来;
NULL,特殊的常量;
布尔值,TRUE 或 FALSE;
消息生产者(加入消息属性)
发送消息时,你能通过 putUserProperty 来设置消息的属性。
消息消费者(使用SQL筛选)
用 MessageSelector.bySql 来使用 sql 筛选消息。
如果这个地方抛出错误:说明 Sql92 功能没有开启。
需要修改 Broker.conf 配置文件。
加入 enablePropertyFilter=true 然后重启 Broker 服务。
8.事务消息
其中分为两个流程:正常事务消息的发送及提交、事务消息的补偿流程。
(1)正常事务流程
(1)发送消息(half 消息):图中步骤 1。
(2)服务端响应消息写入结果:图中步骤 2。
(3)根据发送结果执行本地事务(如果写入失败,此时 half 消息对业务不可见,本地逻辑不执行):图中步骤 3。
(4)根据本地事务状态执行 Commit 或者 Rollback(Commit 操作生成消息索引,消息对消费者可见):图中步骤 4
(2)事务补偿流程
(1)对没有 Commit/Rollback 的事务消息(pending 状态的消息),从服务端发起一次“回查”:图中步骤 5。
(2)Producer 收到回查消息,检查回查消息对应的本地事务的状态:图中步骤 6。
(3)根据本地事务状态,重新 Commit 或者 Rollback::图中步骤 7。
其中,补偿阶段用于解决消息 Commit 或者 Rollback 发生超时或者失败的情况。
(3)事务消息状态
事务消息共有三种状态,提交状态、回滚状态、中间状态:
> TransactionStatus.CommitTransaction: 提交状态,它允许消费者消费此消息(完成图中了 1,2,3,4 步,第 4 步是Commit)。
>TransactionStatus.RollbackTransaction: 回滚状态,它代表该消息将被删除,不允许被消费(完成图中了 1,2,3,4 步, 第 4 步是 Rollback)。
>TransactionStatus.Unknown: 中间状态,它代表需要检查消息队列来确定状态(完成图中了 1,2,3 步, 但是没有 4 或者没有 7,无法 Commit 或 Rollback)。
(4)代码演示
org.apache.rocketmq.example. transaction 包中
创建事务性生产者
使用 TransactionMQProducer 类创建生产者,并指定唯一的 ProducerGroup,就可以设置自定义线程池来处理这些检查请求。执行本地事务后、需要根据执行结果对消息队列进行回复。
实现事务的监听接口
当发送半消息成功时,我们使用 executeLocalTransaction 方法来执行本地事务(步骤 3)。它返回前一节中提到的三个事务状态之一。 checkLocalTranscation 方法用于检查本地事务状态(步骤 5),并回应消息队列的检查请求。它也是返回前一节中提到的三个事务状态之一。
(5)使用场景
用户提交订单后,扣减库存成功、扣减优惠券成功、使用余额成功,但是在确认订单操作失败,需要对库存、余额进行回退。如何保证数据的完整性?
可以使用 RocketMQ 的分布式事务保证在下单失败后系统数据的完整性。
(6)使用限制
1. 事务消息不支持延时消息和批量消息。
2.事务回查的间隔时间:BrokerConfig. transactionCheckInterval 通过 Broker 的配置文件设置好。
3. 为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的 transactionCheckMax 参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = transactionCheckMax ) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写 AbstractTransactionCheckListener 类来修改这个行为。
4.事务消息将在 Broker 配置文件中的参数 transactionMsgTimeout 这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于transactionMsgTimeout 参数。
5.事务性消息可能不止一次被检查或消费。
6.事务性消息中用到了生产者群组,这种就是一种高可用机制,用来确保事务消息的可靠性。
7.提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过 RocketMQ 本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。
8. 事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ 服务器能通过它们的生产者ID 查询到消费者。
我是娆疆_蚩梦,让坚持成为一种习惯,感谢各位大佬的:点赞、收藏和评论,我们下期见!
上一篇:RocketMQ基础篇(上)