一、 概述
本文我们来学习 Spring Cloud Alibaba 提供的 Spring Cloud Stream RocketMQ 组件,基于 Spring Cloud Stream 的编程模型,接入 RocketMQ 作为消息中间件,实现消息驱动的微服务。
在开始本文之前,要对 RocketMQ 进行简单的学习。可以阅读本系列前面的[RocketMQ]文章,在本机搭建一个 RocketMQ 服务。
二、Spring Cloud Stream 介绍
Spring Cloud Stream 是一个用于构建基于消息的微服务应用框架,使用 Spring Integration 与 Broker 进行连接。
一般来说,消息队列中间件都有一个 Broker Server(代理服务器),消息中转角色,负责存储消息、转发消息。
例如说在 RocketMQ 中,Broker 负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。另外,Broker 也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。
Spring Cloud Stream 提供了消息中间件的统一抽象,推出了 publish-subscribe、consumer groups、partition 这些统一的概念。
Spring Cloud Stream 内部有两个概念:Binder 和 Binding。
① Binder,跟消息中间件集成的组件,用来创建对应的 Binding。各消息中间件都有自己的 Binder 具体实现。
public interface Binder<T,
C extends ConsumerProperties, // 消费者配置
P extends ProducerProperties> { // 生产者配置
// 创建消费者的 Binding
Binding<T> bindConsumer(String name, String group, T inboundBindTarget, C consumerProperties);
// 创建生产者的 Binding
Binding<T> bindProducer(String name, T outboundBindTarget, P producerProperties);
}
- Kafka 实现了 KafkaMessageChannelBinder
- RabbitMQ 实现了 RabbitMessageChannelBinder
- RocketMQ 实现了 RocketMQMessageChannelBinder
② Binding,包括 Input Binding 和 Output Binding。Binding 在消息中间件与应用程序提供的 Provider 和 Consumer 之间提供了一个桥梁,实现了开发者只需使用应用程序的 Provider 或 Consumer 生产或消费数据即可,屏蔽了开发者与底层消息中间件的接触。
最终整体交互如下图所示:三、 快速入门
本小节,我们一起来快速入门下,会创建 2 个项目,分别作为生产者和消费者。最终项目如下图所示:3.1 搭建生产者
3.1.1 引入依赖
创建 [pom.xml
]文件中,引入 Spring Cloud Alibaba RocketMQ 相关依赖。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.erbadagang.springcloud.stream</groupId>
<artifactId>sc-stream-rocketmq-producer</artifactId>
<version>0.0.1</version>
<properties>
<maven.compiler.target>1.8</maven.compiler.target>
<maven.compiler.source>1.8</maven.compiler.source>
<spring.boot.version>2.2.4.RELEASE</spring.boot.version>
<spring.cloud.version>Hoxton.SR1</spring.cloud.version>
<spring.cloud.alibaba.version>2.2.0.RELEASE</spring.cloud.alibaba.version>
</properties>
<!--
引入 Spring Boot、Spring Cloud、Spring Cloud Alibaba 三者 BOM 文件,进行依赖版本的管理,防止不兼容。
在 https://dwz.cn/mcLIfNKt 文章中,Spring Cloud Alibaba 开发团队推荐了三者的依赖关系
-->
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>${spring.boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring.cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-dependencies</artifactId>
<version>${spring.cloud.alibaba.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<!-- 引入 SpringMVC 相关依赖,并实现对其的自动配置 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- 引入 Spring Cloud Alibaba Stream RocketMQ 相关依赖,将 RocketMQ 作为消息队列,并实现对其的自动配置 -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
</dependencies>
</project>
3.1.2 配置文件
[application.yaml
]
spring:
application:
name: stream-rocketmq-producer-application
cloud:
# Spring Cloud Stream 配置项,对应 BindingServiceProperties 类
stream:
# Binding 配置项,对应 BindingProperties Map
bindings:
erbadagang-output:
destination: ERBADAGANG-TOPIC-01 # 目的地。这里使用 RocketMQ Topic
content-type: application/json # 内容格式。这里使用 JSON
trek-output:
destination: TREK-TOPIC-01 # 目的地。这里使用 RocketMQ Topic
content-type: application/json # 内容格式。这里使用 JSON
# Spring Cloud Stream RocketMQ 配置项
rocketmq:
# RocketMQ Binder 配置项,对应 RocketMQBinderConfigurationProperties 类
binder:
name-server: 101.133.227.13:9876 # RocketMQ Namesrv 地址
# RocketMQ 自定义 Binding 配置项,对应 RocketMQBindingProperties Map
bindings:
erbadagang-output:
# RocketMQ Producer 配置项,对应 RocketMQProducerProperties 类
producer:
group: test # 生产者分组
sync: true # 是否同步发送消息,默认为 false 异步。
server:
port: 18080
同时我们设置了2个binding,模拟2个topic情形。
① spring.cloud.stream
为 Spring Cloud Stream 配置项,对应 BindingServiceProperties 类。配置的层级有点深,我们一层一层来看看。
② spring.cloud.stream.bindings
为 Binding 配置项,对应 BindingProperties Map。其中,key 为 Binding 的名字。要注意,虽然说 Binding 分成 Input 和 Output 两种类型,但是在配置项中并不会体现出来,而是要在稍后搭配 @Input
还是 @Output
注解,才会有具体的区分。
这里,我们配置了一个名字为 erbadagang-output
和 trek-output
的 Binding。从命名上,我们的意图是想作为 Output Binding,用于生产者发送消息。
-
destination
:目的地。在 RocketMQ 中,使用 Topic 作为目的地。这里我们设置为ERBADAGANG-TOPIC-01
。主题(Topic):表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是 RocketMQ 进行消息订阅的基本单位。
content-type
:内容格式。这里使用 JSON 格式,因为稍后我们将发送消息的类型为 POJO,使用 JSON 进行序列化。
③ spring.cloud.stream.rocketmq
为 Spring Cloud Stream RocketMQ 配置项。
④ spring.cloud.stream.rocketmq.binder
为 RocketMQ Binder 配置项,对应 RocketMQBinderConfigurationProperties 类。
-
name-server
:RocketMQ Namesrv 地址。名字服务(Name Server):名称服务充当路由消息的提供者。生产者或消费者能够通过名字服务查找各主题相应的 Broker IP 列表。多个 Namesrv 实例组成集群,但相互独立,没有信息交换。
⑤ spring.cloud.stream.rocketmq.bindings
为 RocketMQ 自定义 Binding 配置项,用于对通用的spring.cloud.stream.bindings
配置项的增强,实现 RocketMQ Binding 独特的配置。该配置项对应 RocketMQBindingProperties Map,其中 key 为 Binding 的名字,需要对应上噢。
这里,我们对名字为 erbadagang-output
的 Binding 进行增强,进行 Producer 的配置。其中,producer
为 RocketMQ Producer 配置项,对应 RocketMQProducerProperties 类。
-
group
:生产者分组。生产者组(Producer Group):同一类 Producer 的集合,这类 Producer 发送同一类消息且发送逻辑一致。如果发送的是事务消息且原始生产者在发送之后崩溃,则 Broker 服务器会联系同一生产者组的其他生产者实例以提交或回溯消费。
-
sync
:是否同步发送消息,默认为false
异步。一般业务场景下,使用同步发送消息较多,所以这里我们设置为true
同步消息。使用 RocketMQ 发送三种类型的消息:同步消息(sync)、异步消息(async)和单向消息(oneway)。其中前两种消息是可靠的,因为会有发送是否成功的应答。
MySource.java
package com.erbadagang.springcloudalibaba.stream.rocketmq.producer.message;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
public interface MySource {
@Output("erbadagang-output")
MessageChannel erbadagangOutput();
@Output("trek-output")
MessageChannel trekOutput();
}
这里,我们通过 @Output
注解,声明了一个名字为 erbadagang-output
的 Output Binding。注意,这个名字要和我们配置文件中的 spring.cloud.stream.bindings
配置项对应上。
同时,@Output
注解的方法的返回结果为 MessageChannel 类型,可以使用它发送消息。
3.1.4 Demo01Message
创建 [Demo01Message]类,示例 Message 消息。代码如下:
public class Demo01Message {
/**
* 编号
*/
private Integer id;
// ... 省略 setter/getter/toString 方法
}
3.1.5 Demo01Controller
创建 [Demo01Controller]类,提供发送消息的 HTTP 接口。代码如下:
package com.erbadagang.springcloudalibaba.stream.rocketmq.producer.controller;
import com.erbadagang.springcloudalibaba.stream.rocketmq.producer.message.Demo01Message;
import com.erbadagang.springcloudalibaba.stream.rocketmq.producer.message.MySource;
import org.apache.rocketmq.common.message.MessageConst;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Random;
@RestController
@RequestMapping("/demo01")
public class Demo01Controller {
private Logger logger = LoggerFactory.getLogger(getClass());
@Autowired
private MySource mySource;//<1>
@GetMapping("/send")
public boolean send() {
// <2>创建 Message
Demo01Message message = new Demo01Message()
.setId(new Random().nextInt());
// <3>创建 Spring Message 对象
Message<Demo01Message> springMessage = MessageBuilder.withPayload(message)
.build();
// <4>发送消息
return mySource.erbadagangOutput().send(springMessage);
}
@GetMapping("/sendTrek")
public boolean sendTrek() {
// <2>创建 Message
Demo01Message message = new Demo01Message()
.setId(new Random().nextInt());
// <3>创建 Spring Message 对象
Message<Demo01Message> springMessage = MessageBuilder.withPayload(message)
.build();
// <4>发送消息
return mySource.trekOutput().send(springMessage);
}
@GetMapping("/send_delay")
public boolean sendDelay() {
// 创建 Message
Demo01Message message = new Demo01Message()
.setId(new Random().nextInt());
// 创建 Spring Message 对象
Message<Demo01Message> springMessage = MessageBuilder.withPayload(message)
.setHeader(MessageConst.PROPERTY_DELAY_TIME_LEVEL, "3") // 设置延迟级别为 3,10 秒后消费。
.build();
// 发送消息
boolean sendResult = mySource.erbadagangOutput().send(springMessage);
logger.info("[sendDelay][发送消息完成, 结果 = {}]", sendResult);
return sendResult;
}
@GetMapping("/send_tag")
public boolean sendTag() {
for (String tag : new String[]{"trek", "specialized", "look"}) {
// 创建 Message
Demo01Message message = new Demo01Message()
.setId(new Random().nextInt());
// 创建 Spring Message 对象
Message<Demo01Message> springMessage = MessageBuilder.withPayload(message)
.setHeader(MessageConst.PROPERTY_TAGS, tag) // 设置 Tag
.build();
// 发送消息
mySource.erbadagangOutput().send(springMessage);
}
return true;
}
}
-
<1>
处,使用@Autowired
注解,注入 MySource Bean。 -
<2>
处,创建 Demo01Message 对象。 -
<3>
处,使用 MessageBuilder 创建 Spring Message 对象,并设置消息内容为 Demo01Message 对象。 -
<4>
处,通过 MySource 获得 MessageChannel 对象,然后发送消息。
3.1.6 ProducerApplication
创建 [ProducerApplication]类,启动应用。代码如下:
package com.erbadagang.springcloudalibaba.stream.rocketmq.producer;
import com.erbadagang.springcloudalibaba.stream.rocketmq.producer.message.MySource;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
@SpringBootApplication
@EnableBinding(MySource.class)
public class ProducerApplication {
public static void main(String[] args) {
SpringApplication.run(ProducerApplication.class, args);
}
}
使用 @EnableBinding
注解,声明指定接口开启 Binding 功能,扫描其 @Input
和 @Output
注解。这里,我们设置为 MySource 接口。
3.2 搭建消费者
创建项目,作为消费者。
3.2.1 引入依赖
创建 [pom.xml
],引入 Spring Cloud Alibaba RocketMQ 相关依赖。
友情提示:和「3.1.1 引入依赖」基本一样。
3.2.2 配置文件
创建 [application.yaml
]配置文件,添加 Spring Cloud Alibaba RocketMQ 相关配置。
spring:
application:
name: erbadagang-consumer-application
cloud:
# Spring Cloud Stream 配置项,对应 BindingServiceProperties 类
stream:
# Binding 配置项,对应 BindingProperties Map
bindings:
erbadagang-input:
destination: ERBADAGANG-TOPIC-01 # 目的地。这里使用 RocketMQ Topic
content-type: application/json # 内容格式。这里使用 JSON
group: erbadagang-consumer-group-ERBADAGANG-TOPIC-01 # 消费者分组,命名规则:组名+topic名
trek-input:
destination: TREK-TOPIC-01 # 目的地。这里使用 RocketMQ Topic
content-type: application/json # 内容格式。这里使用 JSON
group: trek-consumer-group-TREK-TOPIC-01 # 消费者分组,命名规则:组名+topic名
# Spring Cloud Stream RocketMQ 配置项
rocketmq:
# RocketMQ Binder 配置项,对应 RocketMQBinderConfigurationProperties 类
binder:
name-server: 101.133.227.13:9876 # RocketMQ Namesrv 地址
# RocketMQ 自定义 Binding 配置项,对应 RocketMQBindingProperties Map
bindings:
erbadagang-input:
# RocketMQ Consumer 配置项,对应 RocketMQConsumerProperties 类
consumer:
enabled: true # 是否开启消费,默认为 true
broadcasting: false # 是否使用广播消费,默认为 false 使用集群消费,如果要使用广播消费值设成true。
server:
port: ${random.int[10000,19999]} # 随机端口,方便启动多个消费者
总体来说,和「3.1.2 配置文件」是比较接近的,所以我们只说差异点噢。
① spring.cloud.stream.bindings
为 Binding 配置项。
这里,我们配置了一个名字为 erbadagang-input
和trek-input
的 Binding。从命名上,我们的意图是想作为 Input Binding,用于消费者消费消息。
-
group
:消费者分组。消费者组(Consumer Group):同一类 Consumer 的集合,这类 Consumer 通常消费同一类消息且消费逻辑一致。消费者组使得在消息消费方面,实现负载均衡和容错的目标变得非常容易。要注意的是,消费者组的消费者实例必须订阅完全相同的 Topic。RocketMQ 支持两种消息模式:集群消费(Clustering)和广播消费(Broadcasting)。
② spring.cloud.stream.rocketmq.bindings
为 RocketMQ 自定义 Binding 配置项。
这里,我们对名字为 erbadagang-input
的 Binding 进行增强,进行 Consumer 的配置。其中,consumer
为 RocketMQ Producer 配置项,对应 RocketMQConsumerProperties 类。
enabled
:是否开启消费,默认为true
。在日常开发时,如果在本地环境不想消费,可以通过设置enabled
为false
进行关闭。-
broadcasting
: 是否使用广播消费,默认为false
使用的是集群消费。- 集群消费(Clustering):集群消费模式下,相同 Consumer Group 的每个 Consumer 实例平均分摊消息。
- 广播消费(Broadcasting):广播消费模式下,相同 Consumer Group 的每个 Consumer 实例都接收全量的消息。
这里一点要注意!!!加了三个感叹号,一定要理解集群消费和广播消费的差异。我们来举个例子,以有两个消费者分组 A 和 B 的场景举例子:
- 假设每个消费者分组各启动一个实例,此时我们发送一条消息,该消息会被两个消费者分组
"consumer_group_01"
和"consumer_group_02"
都各自消费一次。 - 假设每个消费者分组各启动一个实例,此时我们发送一条消息,该消息会被分组 A 的某个实例消费一次,被分组 B 的某个实例也消费一次
通过集群消费的机制,我们可以实现针对相同 Topic ,不同消费者分组实现各自的业务逻辑。例如说:用户注册成功时,发送一条 Topic 为 "USER_REGISTER"
的消息。然后,不同模块使用不同的消费者分组,订阅该 Topic ,实现各自的拓展逻辑:
- 积分模块:判断如果是手机注册,给用户增加 20 积分。
- 优惠劵模块:因为是新用户,所以发放新用户专享优惠劵。
- 站内信模块:因为是新用户,所以发送新用户的欢迎语的站内信。
- … 等等
这样,我们就可以将注册成功后的业务拓展逻辑,实现业务上的解耦,未来也更加容易拓展。同时,也提高了注册接口的性能,避免用户需要等待业务拓展逻辑执行完成后,才响应注册成功。
同时,相同消费者分组的多个实例,可以实现高可用,保证在一个实例意外挂掉的情况下,其它实例能够顶上。并且,多个实例都进行消费,能够提升消费速度。
3.2.3 MySink
创建 [MySink]接口,声明名字为 Input Binding。代码如下:
package com.erbadagang.springcloudalibaba.stream.rocketmq.consumer.listener;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;
public interface MySink {
String ERBADAGANG_INPUT = "erbadagang-input";
String TREK_INPUT = "trek-input";
@Input(ERBADAGANG_INPUT)
SubscribableChannel demo01Input();
@Input(TREK_INPUT)
SubscribableChannel trekInput();
}
这里,我们通过 @Input
注解,声明了一个名字为 erbadagang-input
和trek-input
的 Input Binding。注意,这个名字要和我们配置文件中的 spring.cloud.stream.bindings
配置项对应上。
同时,@Input
注解的方法的返回结果为 SubscribableChannel 类型,可以使用它订阅消息来消费。MessageChannel 提供的订阅消息的方法如下:
public interface SubscribableChannel extends MessageChannel {
boolean subscribe(MessageHandler handler); // 订阅
boolean unsubscribe(MessageHandler handler); // 取消订阅
}
那么,我们是否要实现 MySink 接口呢?和MySource
一样,答案也是不需要,还是全部交给 Spring Cloud Stream 的 BindableProxyFactory 大兄弟来解决。BindableProxyFactory 会通过动态代理,自动实现 MySink 接口。 而 @Input
注解的方法的返回值,BindableProxyFactory 会扫描带有 @Input
注解的方法,自动进行创建。
例如说,#demo01Input()
方法被自动创建返回结果为 DirectWithAttributesChannel,它也是 SubscribableChannel 的子类。
3.2.4 Demo01Message
创建 [Demo01Message]类,示例 Message 消息。
友情提示:和[3.1.4 Demo01Message]基本一样。
3.2.5 Demo01Consumer
创建 [Demo01Consumer] 类,消费消息。代码如下:
package com.erbadagang.springcloudalibaba.stream.rocketmq.consumer.listener;
import com.erbadagang.springcloudalibaba.stream.rocketmq.consumer.message.Demo01Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
@Component
public class Demo01Consumer {
private Logger logger = LoggerFactory.getLogger(getClass());
@StreamListener(MySink.ERBADAGANG_INPUT)
public void onMessage(@Payload Demo01Message message) {
logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
}
@StreamListener(MySink.TREK_INPUT)
public void onTrekMessage(@Payload Demo01Message message) {
logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
}
}
在方法上,添加 @StreamListener
注解,声明对应的 Input Binding。这里,我们使用 MySink.ERBADAFANG_INPUT
。
又因为我们消费的消息是 POJO 类型,所以我们需要添加 @Payload
注解,声明需要进行反序列化成 POJO 对象。
3.2.6 ConsumerApplication
创建 [ConsumerApplication]类,启动应用。代码如下:
package com.erbadagang.springcloudalibaba.stream.rocketmq.consumer;
import com.erbadagang.springcloudalibaba.stream.rocketmq.consumer.listener.MySink;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
@SpringBootApplication
@EnableBinding(MySink.class)
public class ConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
}
}
使用 @EnableBinding
注解,声明指定接口开启 Binding 功能,扫描其 @Input
和 @Output
注解。这里,我们设置为 MySink 接口。
3.3 测试单集群多实例的场景
本小节,我们会在一个消费者集群启动两个实例,测试在集群消费的情况下的表现。
① 执行 ConsumerApplication 两次,启动两个消费者的实例,从而实现在消费者分组 erbadagang-consumer-group-ERBADAGANG-TOPIC-01 下有两个消费者实例。
因为 IDEA 默认同一个程序只允许启动 1 次,所以我们需要配置 DemoProviderApplication 为 Allow parallel run。
② 执行 ProducerApplication,启动生产者的实例。
之后,请求 http://127.0.0.1:18080/demo01/send 接口三次,发送三条消息。此时在 IDEA 控制台看到消费者打印日志如下:
// ConsumerApplication 控制台 01
2020-08-05 09:39:29.073 INFO 50472 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][线程编号:78 消息内容:Demo01Message{id=-1682643477}]
2020-02-22 09:41:32.754 INFO 50472 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][线程编号:78 消息内容:Demo01Message{id=1890257867}]
// ConsumerApplication 控制台 02
2020-08-05 09:41:32.264 INFO 50534 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][线程编号:80 消息内容:Demo01Message{id=1401668556}]
符合预期。从日志可以看出,每条消息仅被消费一次。
访问http://localhost:18080/demo01/sendTrek
也是同样效果,只不过是演示如何操作多个topic。
3.4 测试多消费集群多实例的场景
本小节,我们会在二个消费者集群各启动两个实例,测试在集群消费的情况下的表现。
① 执行 ConsumerApplication 两次,启动两个消费者的实例,从而实现在消费者分组 derbadagangemo01-consumer-group-ERBADAGANG-TOPIC-01
下有两个消费者实例。
② 修改 sca-stream-rocketmq-consumer
项目的配置文件,修改 spring.cloud.stream.bindings.erbadagang-input.group
配置项,将消费者分组改成 NEW-erbadagang-consumer-group-ERBADAGANG-TOPIC-01
。
然后,执行 ConsumerApplication 两次,再启动两个消费者的实例,从而实现在消费者分组 NEW-erbadagang-consumer-group-ERBADAGANG-TOPIC-01
下有两个消费者实例。
③ 执行 ProducerApplication,启动生产者的实例。
之后,请求 http://127.0.0.1:18080/demo01/send 接口三次,发送三条消息。此时在 IDEA 控制台看到消费者打印日志如下:
// 消费者分组 `demo01-consumer-group-DEMO-TOPIC-01` 的ConsumerApplication 控制台 01
2020-08-06 10:17:07.886 INFO 50472 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][线程编号:78 消息内容:Demo01Message{id=-276398167}]
2020-08-06 10:17:08.237 INFO 50472 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][线程编号:78 消息内容:Demo01Message{id=-250975158}]
// 消费者分组 `demo01-consumer-group-DEMO-TOPIC-01` 的ConsumerApplication 控制台 02
2020-02-22 10:17:08.710 INFO 50534 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][线程编号:80 消息内容:Demo01Message{id=412281482}]
// 消费者分组 `X-demo01-consumer-group-DEMO-TOPIC-01` 的ConsumerApplication 控制台 01
2020-08-06 10:17:07.887 INFO 51092 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][线程编号:51 消息内容:Demo01Message{id=-276398167}]
2020-02-22 10:17:08.238 INFO 51092 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][线程编号:51 消息内容:Demo01Message{id=-250975158}]
// 消费者分组 `X-demo01-consumer-group-DEMO-TOPIC-01` 的ConsumerApplication 控制台 02
2020-08-06 10:17:08.787 INFO 51096 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][线程编号:77 消息内容:Demo01Message{id=412281482}]
从日志可以看出,每条消息被每个消费者集群都进行了消费,且仅被消费一次。
3.5 小结
至此,我们已经完成了 Stream RocketMQ 的快速入门。回过头看看 Binder 和 Binding 的概念,是不是就清晰一些了。
底线
本文源代码使用 Apache License 2.0开源许可协议,这里是本文源码Gitee地址,可通过命令git clone+地址
下载代码到本地,也可直接点击链接通过浏览器方式查看源代码。