RocketMQ 与 Spring Cloud Stream整合(六、顺序消息)

RocketMQ 提供了两种顺序级别:

  • 普通顺序消息:Producer 将相关联的消息发送到相同的消息队列。
  • 完全严格顺序:在【普通顺序消息】的基础上,Consumer 严格顺序消费。

官方文档是这么描述的:

消息有序,指的是一类消息消费时,能按照发送的顺序来消费。例如:一个订单产生了三条消息分别是订单创建、订单付款、订单完成。消费时要按照这个顺序消费才能有意义,但是同时订单之间是可以并行消费的。RocketMQ 可以严格的保证消息有序。

顺序消息分为全局顺序消息与分区顺序消息,全局顺序是指某个 Topic 下的所有消息都要保证顺序;部分顺序消息只要保证每一组消息被顺序消费即可。

  • 全局顺序:对于指定的一个 Topic,所有消息按照严格的先入先出(FIFO)的顺序进行发布和消费。适用场景:性能要求不高,所有的消息严格按照 FIFO 原则进行消息发布和消费的场景
  • 分区顺序:对于指定的一个 Topic,所有消息根据 Sharding key 进行区块分区。 同一个分区内的消息按照严格的 FIFO 顺序进行发布和消费。Sharding key 是顺序消息中用来区分不同分区的关键字段,和普通消息的 Key 是完全不同的概念。适用场景:性能要求高,以 Sharding key 作为分区字段,在同一个区块中严格的按照 FIFO 原则进行消息发布和消费的场景。

注意,分区顺序就是普通顺序消息,全局顺序就是完全严格顺序。

下面,我们来搭建一个 Spring Cloud Stream 消费异常处理机制的示例。

6.1 搭建生产者

来演示发送顺序消息。

6.1.1 配置文件

修改前面项目的 [application.yml]配置文件,添加 partition-key-expression 配置项,设置 Producer 发送顺序消息的 Sharding key。完整配置如下:

spring:
  application:
    name: stream-rocketmq-producer-application
  cloud:
    # Spring Cloud Stream 配置项,对应 BindingServiceProperties 类
    stream:
      # Binding 配置项,对应 BindingProperties Map
      bindings:
        erbadagang-output:
          destination: ERBADAGANG-TOPIC-01 # 目的地。这里使用 RocketMQ Topic
          content-type: application/json # 内容格式。这里使用 JSON

          # Producer 配置项,对应 ProducerProperties 类
          producer:
            partition-key-expression: payload['id'] # 分区 key 表达式。该表达式基于 Spring EL,从消息中获得分区 key。

      # Spring Cloud Stream RocketMQ 配置项
      rocketmq:
        # RocketMQ Binder 配置项,对应 RocketMQBinderConfigurationProperties 类
        binder:
          name-server: 101.133.227.13:9876 # RocketMQ Namesrv 地址
        # RocketMQ 自定义 Binding 配置项,对应 RocketMQBindingProperties Map
        bindings:
          erbadagang-output:
            # RocketMQ Producer 配置项,对应 RocketMQProducerProperties 类
            producer:
              group: test # 生产者分组
              sync: true # 是否同步发送消息,默认为 false 异步。

server:
  port: 18080

partition-key-expression 配置项,该表达式基于 Spring EL,从消息中获得 Sharding key。

这里,我们设置该配置项为 payload['id'],表示从 Spring Message 的 payload 的 id。稍后我们发送的消息的 payload 为 Demo01Message,那么 id 就是 Demo01Message.id

如果我们想从消息的 headers 中获得 Sharding key,可以设置为 headers['partitionKey']

② Spring Cloud Stream 使用 PartitionHandler 进行 Sharding key 的获得与计算,最终 Sharding key 的结果为 key.hashCode() % partitionCount

在获取到 Sharding key 之后,Spring Cloud Alibaba Stream RocketMQ 提供的 PartitionMessageQueueSelector 选择消息发送的队列。

我们以发送一条 id 为 1 的 Demo01Message 消息为示例,最终会发送到对应 RocketMQ Topic 的队列为 1。计算过程如下:

// 第一步,PartitionHandler 使用 `partition-key-expression` 表达式,从 Message 中获得 Sharding key
key => 1

// 第二步,PartitionHandler 计算最终的 Sharding key
// 默认情况下,每个 RocketMQ Topic 的队列总数是 4。
key => key.hashCode() % partitionCount = 1.hashCode() % 4 = 1 % 4 = 1

// 第三步,PartitionMessageQueueSelector 获得对应 RocketMQ Topic 的队列
队列 => queues.get(key) = queues.get(1)

这样,我们就能保证相同 Sharding Key 的消息,发送到相同的对应 RocketMQ Topic 的队列中。当前,前提是该 Topic 的队列总数不能变噢,不然计算的 Sharding Key 会发生变化。

6.1.2 Demo01Controller

增加发送 3 条顺序消息的 HTTP 接口。代码如下:

package com.erbadagang.springcloudalibaba.stream.rocketmq.producer.controller;

import com.erbadagang.springcloudalibaba.stream.rocketmq.producer.message.Demo01Message;
import com.erbadagang.springcloudalibaba.stream.rocketmq.producer.message.MySource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Random;

@RestController
@RequestMapping("/demo01")
public class Demo01Controller {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @Autowired
    private MySource mySource;//<1>

    @GetMapping("/send_orderly")
    public boolean sendOrderly() {
        // 发送 3 条相同 id 的消息
        int id = new Random().nextInt();
        for (int i = 0; i < 3; i++) {
            // 创建 Message
            Demo01Message message = new Demo01Message().setId(id);
            // 创建 Spring Message 对象
            Message<Demo01Message> springMessage = MessageBuilder.withPayload(message)
                    .build();
            // 发送消息
            mySource.erbadagangOutput().send(springMessage);
        }
        return true;
    }

}

每次发送的 3 条消息使用相同的 id,配合上我们使用它作为 Sharding key,就可以发送对应 Topic 的相同队列中。

另外,发送的虽然是顺序消息,但是和发送普通消息的代码是一模一样的。

6.2 搭建消费者

演示顺序消费消息。

8.2.1 配置文件

修改 [application.yml]配置文件,添加 orderly 配置项,设置 Consumer 顺序消费消息。完整配置如下:

spring:
  application:
    name: erbadagang-consumer-application
  cloud:
    # Spring Cloud Stream 配置项,对应 BindingServiceProperties 类
    stream:
      # Binding 配置项,对应 BindingProperties Map
      bindings:
        erbadagang-input:
          destination: ERBADAGANG-TOPIC-01 # 目的地。这里使用 RocketMQ Topic
          content-type: application/json # 内容格式。这里使用 JSON
          group: erbadagang-consumer-group-ERBADAGANG-TOPIC-01 # 消费者分组,命名规则:组名+topic名

      # Spring Cloud Stream RocketMQ 配置项
      rocketmq:
        # RocketMQ Binder 配置项,对应 RocketMQBinderConfigurationProperties 类
        binder:
          name-server: 101.133.227.13:9876 # RocketMQ Namesrv 地址
        # RocketMQ 自定义 Binding 配置项,对应 RocketMQBindingProperties Map
        bindings:
          erbadagang-input:
            # RocketMQ Consumer 配置项,对应 RocketMQConsumerProperties 类
            consumer:
              enabled: true # 是否开启消费,默认为 true
              broadcasting: false # 是否使用广播消费,默认为 false 使用集群消费
              orderly: true # 是否顺序消费,默认为 false 并发消费。

server:
  port: ${random.int[10000,19999]} # 随机端口,方便启动多个消费者

6.2.2 Demo01Consumer

[Demo01Consumer]类,在消费消息时,打印出消息所在队列编号线程编号,这样我们通过队列编号可以判断消息是否顺序发送,通过线程编号可以判断消息是否顺序消费。代码如下:

package com.erbadagang.springcloudalibaba.stream.rocketmq.consumer.listener;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

@Component
public class Demo01Consumer {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @StreamListener(MySink.ERBADAGANG_INPUT)
    public void onMessage(Message<?> message) {
        logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
    }

}

6.3 简单测试

① 执行 ConsumerApplication,启动消费者的实例。

② 执行 ProducerApplication,启动生产者的实例。

之后,请求 http://127.0.0.1:18080/demo01/send_orderly 接口,发送顺序消息。IDEA 控制台输出日志如下:

2020-08-06 17:31:17.892  INFO 16556 --- [MessageThread_1] c.e.s.s.r.c.listener.Demo01Consumer      : [onMessage][线程编号:70 消息内容:GenericMessage [payload={"id":-1387755989}, headers={rocketmq_QUEUE_ID=1, rocketmq_RECONSUME_TIMES=0, scst_partition=1, rocketmq_BORN_TIMESTAMP=1596706229166, rocketmq_TOPIC=ERBADAGANG-TOPIC-01, rocketmq_FLAG=0, spring_json_header_types={"scst_partition":"java.lang.Integer"}, rocketmq_MESSAGE_ID=C0A82B7C341418B4AAC21D818BAE0001, rocketmq_SYS_FLAG=0, id=6386b54a-4e37-b884-3010-485308229a10, CLUSTER=DefaultCluster, rocketmq_BORN_HOST=103.3.96.229, contentType=application/json, timestamp=1596706277892}]]

2020-08-06 17:31:17.892  INFO 16556 --- [MessageThread_1] c.e.s.s.r.c.listener.Demo01Consumer      : [onMessage][线程编号:70 消息内容:GenericMessage [payload={"id":-1387755989}, headers={rocketmq_QUEUE_ID=1, rocketmq_RECONSUME_TIMES=0, scst_partition=1, rocketmq_BORN_TIMESTAMP=1596706229244, rocketmq_TOPIC=ERBADAGANG-TOPIC-01, rocketmq_FLAG=0, spring_json_header_types={"scst_partition":"java.lang.Integer"}, rocketmq_MESSAGE_ID=C0A82B7C341418B4AAC21D818BFC0002, rocketmq_SYS_FLAG=0, id=d9dd1b0d-2565-7f52-3825-786574b2fc1b, CLUSTER=DefaultCluster, rocketmq_BORN_HOST=103.3.96.229, contentType=application/json, timestamp=1596706277892}]]

id 为 -1387755989 的消息被发送到 RocketMQ 消息队列编号为 rocketmq_QUEUE_ID=1,并且在线程编号为 70 的线程中消费。

底线


本文源代码使用 Apache License 2.0开源许可协议,这里是本文源码Gitee地址,可通过命令git clone+地址下载代码到本地,也可直接点击链接通过浏览器方式查看源代码。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,402评论 6 499
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,377评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,483评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,165评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,176评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,146评论 1 297
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,032评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,896评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,311评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,536评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,696评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,413评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,008评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,659评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,815评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,698评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,592评论 2 353