RocketMQ 与 Spring Cloud Stream整合(一、快速入门)

一、 概述

本文我们来学习 Spring Cloud Alibaba 提供的 Spring Cloud Stream RocketMQ 组件,基于 Spring Cloud Stream 的编程模型,接入 RocketMQ 作为消息中间件,实现消息驱动的微服务。
在开始本文之前,要对 RocketMQ 进行简单的学习。可以阅读本系列前面的[RocketMQ]文章,在本机搭建一个 RocketMQ 服务。

二、Spring Cloud Stream 介绍

Spring Cloud Stream 是一个用于构建基于消息的微服务应用框架,使用 Spring Integration 与 Broker 进行连接。

一般来说,消息队列中间件都有一个 Broker Server(代理服务器),消息中转角色,负责存储消息、转发消息。

例如说在 RocketMQ 中,Broker 负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。另外,Broker 也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。

Spring Cloud Stream 提供了消息中间件的统一抽象,推出了 publish-subscribe、consumer groups、partition 这些统一的概念。

Spring Cloud Stream 内部有两个概念:BinderBinding

Binder,跟消息中间件集成的组件,用来创建对应的 Binding。各消息中间件都有自己的 Binder 具体实现。

public interface Binder<T, 
 C extends ConsumerProperties, // 消费者配置
 P extends ProducerProperties> { // 生产者配置

 // 创建消费者的 Binding
 Binding<T> bindConsumer(String name, String group, T inboundBindTarget, C consumerProperties);

 // 创建生产者的 Binding
 Binding<T> bindProducer(String name, T outboundBindTarget, P producerProperties);

}

Binding,包括 Input Binding 和 Output Binding。Binding 在消息中间件与应用程序提供的 Provider 和 Consumer 之间提供了一个桥梁,实现了开发者只需使用应用程序的 Provider 或 Consumer 生产或消费数据即可,屏蔽了开发者与底层消息中间件的接触。

最终整体交互如下图所示:
Spring Cloud Stream Application binder&bindings

三、 快速入门

本小节,我们一起来快速入门下,会创建 2 个项目,分别作为生产者和消费者。最终项目如下图所示:
producer和consumer项目

3.1 搭建生产者

3.1.1 引入依赖

创建 [pom.xml]文件中,引入 Spring Cloud Alibaba RocketMQ 相关依赖。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>
    <groupId>com.erbadagang.springcloud.stream</groupId>
    <artifactId>sc-stream-rocketmq-producer</artifactId>
    <version>0.0.1</version>

    <properties>
        <maven.compiler.target>1.8</maven.compiler.target>
        <maven.compiler.source>1.8</maven.compiler.source>
        <spring.boot.version>2.2.4.RELEASE</spring.boot.version>
        <spring.cloud.version>Hoxton.SR1</spring.cloud.version>
        <spring.cloud.alibaba.version>2.2.0.RELEASE</spring.cloud.alibaba.version>
    </properties>

    <!--
        引入 Spring Boot、Spring Cloud、Spring Cloud Alibaba 三者 BOM 文件,进行依赖版本的管理,防止不兼容。
        在 https://dwz.cn/mcLIfNKt 文章中,Spring Cloud Alibaba 开发团队推荐了三者的依赖关系
     -->
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-parent</artifactId>
                <version>${spring.boot.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring.cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
            <dependency>
                <groupId>com.alibaba.cloud</groupId>
                <artifactId>spring-cloud-alibaba-dependencies</artifactId>
                <version>${spring.cloud.alibaba.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>
        <!-- 引入 SpringMVC 相关依赖,并实现对其的自动配置 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!-- 引入 Spring Cloud Alibaba Stream RocketMQ 相关依赖,将 RocketMQ 作为消息队列,并实现对其的自动配置 -->
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
        </dependency>
    </dependencies>

</project>

3.1.2 配置文件

[application.yaml]

spring:
  application:
    name: stream-rocketmq-producer-application
  cloud:
    # Spring Cloud Stream 配置项,对应 BindingServiceProperties 类
    stream:
      # Binding 配置项,对应 BindingProperties Map
      bindings:
        erbadagang-output:
          destination: ERBADAGANG-TOPIC-01 # 目的地。这里使用 RocketMQ Topic
          content-type: application/json # 内容格式。这里使用 JSON

        trek-output:
          destination: TREK-TOPIC-01 # 目的地。这里使用 RocketMQ Topic
          content-type: application/json # 内容格式。这里使用 JSON
      # Spring Cloud Stream RocketMQ 配置项
      rocketmq:
        # RocketMQ Binder 配置项,对应 RocketMQBinderConfigurationProperties 类
        binder:
          name-server: 101.133.227.13:9876 # RocketMQ Namesrv 地址
        # RocketMQ 自定义 Binding 配置项,对应 RocketMQBindingProperties Map
        bindings:
          erbadagang-output:
            # RocketMQ Producer 配置项,对应 RocketMQProducerProperties 类
            producer:
              group: test # 生产者分组
              sync: true # 是否同步发送消息,默认为 false 异步。

server:
  port: 18080

同时我们设置了2个binding,模拟2个topic情形。

spring.cloud.stream 为 Spring Cloud Stream 配置项,对应 BindingServiceProperties 类。配置的层级有点深,我们一层一层来看看。

spring.cloud.stream.bindings 为 Binding 配置项,对应 BindingProperties Map。其中,key 为 Binding 的名字。要注意,虽然说 Binding 分成 Input 和 Output 两种类型,但是在配置项中并不会体现出来,而是要在稍后搭配 @Input 还是 @Output 注解,才会有具体的区分。

这里,我们配置了一个名字为 erbadagang-outputtrek-output 的 Binding。从命名上,我们的意图是想作为 Output Binding,用于生产者发送消息。

  • destination:目的地。在 RocketMQ 中,使用 Topic 作为目的地。这里我们设置为 ERBADAGANG-TOPIC-01

    主题(Topic):表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是 RocketMQ 进行消息订阅的基本单位。

  • content-type:内容格式。这里使用 JSON 格式,因为稍后我们将发送消息的类型为 POJO,使用 JSON 进行序列化。

spring.cloud.stream.rocketmq 为 Spring Cloud Stream RocketMQ 配置项。

spring.cloud.stream.rocketmq.binder 为 RocketMQ Binder 配置项,对应 RocketMQBinderConfigurationProperties 类。

  • name-server:RocketMQ Namesrv 地址。

    名字服务(Name Server):名称服务充当路由消息的提供者。生产者或消费者能够通过名字服务查找各主题相应的 Broker IP 列表。多个 Namesrv 实例组成集群,但相互独立,没有信息交换。

spring.cloud.stream.rocketmq.bindings 为 RocketMQ 自定义 Binding 配置项,用于对通用的spring.cloud.stream.bindings 配置项的增强,实现 RocketMQ Binding 独特的配置。该配置项对应 RocketMQBindingProperties Map,其中 key 为 Binding 的名字,需要对应上噢。

这里,我们对名字为 erbadagang-output 的 Binding 进行增强,进行 Producer 的配置。其中,producer 为 RocketMQ Producer 配置项,对应 RocketMQProducerProperties 类。

  • group:生产者分组。

    生产者组(Producer Group):同一类 Producer 的集合,这类 Producer 发送同一类消息且发送逻辑一致。如果发送的是事务消息且原始生产者在发送之后崩溃,则 Broker 服务器会联系同一生产者组的其他生产者实例以提交或回溯消费。

  • sync:是否同步发送消息,默认为 false 异步。一般业务场景下,使用同步发送消息较多,所以这里我们设置为 true 同步消息。

    使用 RocketMQ 发送三种类型的消息:同步消息(sync)、异步消息(async)和单向消息(oneway)。其中前两种消息是可靠的,因为会有发送是否成功的应答。

MySource.java

package com.erbadagang.springcloudalibaba.stream.rocketmq.producer.message;

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

public interface MySource {

    @Output("erbadagang-output")
    MessageChannel erbadagangOutput();

    @Output("trek-output")
    MessageChannel trekOutput();

}

这里,我们通过 @Output 注解,声明了一个名字为 erbadagang-output 的 Output Binding。注意,这个名字要和我们配置文件中的 spring.cloud.stream.bindings 配置项对应上。

同时,@Output 注解的方法的返回结果为 MessageChannel 类型,可以使用它发送消息。

3.1.4 Demo01Message

创建 [Demo01Message]类,示例 Message 消息。代码如下:

public class Demo01Message {

 /**
 * 编号
 */
 private Integer id;

 // ... 省略 setter/getter/toString 方法

}

3.1.5 Demo01Controller

创建 [Demo01Controller]类,提供发送消息的 HTTP 接口。代码如下:

package com.erbadagang.springcloudalibaba.stream.rocketmq.producer.controller;

import com.erbadagang.springcloudalibaba.stream.rocketmq.producer.message.Demo01Message;
import com.erbadagang.springcloudalibaba.stream.rocketmq.producer.message.MySource;
import org.apache.rocketmq.common.message.MessageConst;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Random;

@RestController
@RequestMapping("/demo01")
public class Demo01Controller {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @Autowired
    private MySource mySource;//<1>

    @GetMapping("/send")
    public boolean send() {
        // <2>创建 Message
        Demo01Message message = new Demo01Message()
                .setId(new Random().nextInt());
        // <3>创建 Spring Message 对象
        Message<Demo01Message> springMessage = MessageBuilder.withPayload(message)
                .build();
        // <4>发送消息
        return mySource.erbadagangOutput().send(springMessage);
    }

    @GetMapping("/sendTrek")
    public boolean sendTrek() {
        // <2>创建 Message
        Demo01Message message = new Demo01Message()
                .setId(new Random().nextInt());
        // <3>创建 Spring Message 对象
        Message<Demo01Message> springMessage = MessageBuilder.withPayload(message)
                .build();
        // <4>发送消息
        return mySource.trekOutput().send(springMessage);
    }

    @GetMapping("/send_delay")
    public boolean sendDelay() {
        // 创建 Message
        Demo01Message message = new Demo01Message()
                .setId(new Random().nextInt());
        // 创建 Spring Message 对象
        Message<Demo01Message> springMessage = MessageBuilder.withPayload(message)
                .setHeader(MessageConst.PROPERTY_DELAY_TIME_LEVEL, "3") // 设置延迟级别为 3,10 秒后消费。
                .build();
        // 发送消息
        boolean sendResult = mySource.erbadagangOutput().send(springMessage);
        logger.info("[sendDelay][发送消息完成, 结果 = {}]", sendResult);
        return sendResult;
    }

    @GetMapping("/send_tag")
    public boolean sendTag() {
        for (String tag : new String[]{"trek", "specialized", "look"}) {
            // 创建 Message
            Demo01Message message = new Demo01Message()
                    .setId(new Random().nextInt());
            // 创建 Spring Message 对象
            Message<Demo01Message> springMessage = MessageBuilder.withPayload(message)
                    .setHeader(MessageConst.PROPERTY_TAGS, tag) // 设置 Tag
                    .build();
            // 发送消息
            mySource.erbadagangOutput().send(springMessage);
        }
        return true;
    }

}
  • <1> 处,使用 @Autowired 注解,注入 MySource Bean。
  • <2> 处,创建 Demo01Message 对象。
  • <3> 处,使用 MessageBuilder 创建 Spring Message 对象,并设置消息内容为 Demo01Message 对象。
  • <4> 处,通过 MySource 获得 MessageChannel 对象,然后发送消息。

3.1.6 ProducerApplication

创建 [ProducerApplication]类,启动应用。代码如下:

package com.erbadagang.springcloudalibaba.stream.rocketmq.producer;

import com.erbadagang.springcloudalibaba.stream.rocketmq.producer.message.MySource;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;

@SpringBootApplication
@EnableBinding(MySource.class)
public class ProducerApplication {

    public static void main(String[] args) {
        SpringApplication.run(ProducerApplication.class, args);
    }

}

使用 @EnableBinding 注解,声明指定接口开启 Binding 功能,扫描其 @Input@Output 注解。这里,我们设置为 MySource 接口。

3.2 搭建消费者

创建项目,作为消费者。

3.2.1 引入依赖

创建 [pom.xml],引入 Spring Cloud Alibaba RocketMQ 相关依赖。

友情提示:和「3.1.1 引入依赖」基本一样。

3.2.2 配置文件

创建 [application.yaml]配置文件,添加 Spring Cloud Alibaba RocketMQ 相关配置。

spring:
  application:
    name: erbadagang-consumer-application
  cloud:
    # Spring Cloud Stream 配置项,对应 BindingServiceProperties 类
    stream:
      # Binding 配置项,对应 BindingProperties Map
      bindings:
        erbadagang-input:
          destination: ERBADAGANG-TOPIC-01 # 目的地。这里使用 RocketMQ Topic
          content-type: application/json # 内容格式。这里使用 JSON
          group: erbadagang-consumer-group-ERBADAGANG-TOPIC-01 # 消费者分组,命名规则:组名+topic名

        trek-input:
          destination: TREK-TOPIC-01 # 目的地。这里使用 RocketMQ Topic
          content-type: application/json # 内容格式。这里使用 JSON
          group: trek-consumer-group-TREK-TOPIC-01 # 消费者分组,命名规则:组名+topic名
      # Spring Cloud Stream RocketMQ 配置项
      rocketmq:
        # RocketMQ Binder 配置项,对应 RocketMQBinderConfigurationProperties 类
        binder:
          name-server: 101.133.227.13:9876 # RocketMQ Namesrv 地址
        # RocketMQ 自定义 Binding 配置项,对应 RocketMQBindingProperties Map
        bindings:
          erbadagang-input:
            # RocketMQ Consumer 配置项,对应 RocketMQConsumerProperties 类
            consumer:
              enabled: true # 是否开启消费,默认为 true
              broadcasting: false # 是否使用广播消费,默认为 false 使用集群消费,如果要使用广播消费值设成true。

server:
  port: ${random.int[10000,19999]} # 随机端口,方便启动多个消费者

总体来说,和「3.1.2 配置文件」是比较接近的,所以我们只说差异点噢。

spring.cloud.stream.bindings 为 Binding 配置项。

这里,我们配置了一个名字为 erbadagang-inputtrek-input的 Binding。从命名上,我们的意图是想作为 Input Binding,用于消费者消费消息。

  • group:消费者分组。

    消费者组(Consumer Group):同一类 Consumer 的集合,这类 Consumer 通常消费同一类消息且消费逻辑一致。消费者组使得在消息消费方面,实现负载均衡和容错的目标变得非常容易。要注意的是,消费者组的消费者实例必须订阅完全相同的 Topic。RocketMQ 支持两种消息模式:集群消费(Clustering)和广播消费(Broadcasting)。

spring.cloud.stream.rocketmq.bindings 为 RocketMQ 自定义 Binding 配置项。

这里,我们对名字为 erbadagang-input 的 Binding 进行增强,进行 Consumer 的配置。其中,consumer 为 RocketMQ Producer 配置项,对应 RocketMQConsumerProperties 类。

  • enabled:是否开启消费,默认为 true。在日常开发时,如果在本地环境不想消费,可以通过设置 enabledfalse 进行关闭。

  • broadcasting: 是否使用广播消费,默认为 false 使用的是集群消费。

    • 集群消费(Clustering):集群消费模式下,相同 Consumer Group 的每个 Consumer 实例平均分摊消息。
    • 广播消费(Broadcasting):广播消费模式下,相同 Consumer Group 的每个 Consumer 实例都接收全量的消息。

这里一点要注意!!!加了三个感叹号,一定要理解集群消费和广播消费的差异。我们来举个例子,以有两个消费者分组 A 和 B 的场景举例子:

  • 假设每个消费者分组各启动一个实例,此时我们发送一条消息,该消息会被两个消费者分组 "consumer_group_01""consumer_group_02" 都各自消费一次。
  • 假设每个消费者分组各启动一个实例,此时我们发送一条消息,该消息会被分组 A 的某个实例消费一次,被分组 B 的某个实例也消费一次

通过集群消费的机制,我们可以实现针对相同 Topic ,不同消费者分组实现各自的业务逻辑。例如说:用户注册成功时,发送一条 Topic 为 "USER_REGISTER" 的消息。然后,不同模块使用不同的消费者分组,订阅该 Topic ,实现各自的拓展逻辑:

  • 积分模块:判断如果是手机注册,给用户增加 20 积分。
  • 优惠劵模块:因为是新用户,所以发放新用户专享优惠劵。
  • 站内信模块:因为是新用户,所以发送新用户的欢迎语的站内信。
  • … 等等

这样,我们就可以将注册成功后的业务拓展逻辑,实现业务上的解耦,未来也更加容易拓展。同时,也提高了注册接口的性能,避免用户需要等待业务拓展逻辑执行完成后,才响应注册成功。

同时,相同消费者分组的多个实例,可以实现高可用,保证在一个实例意外挂掉的情况下,其它实例能够顶上。并且,多个实例都进行消费,能够提升消费速度

3.2.3 MySink

创建 [MySink]接口,声明名字为 Input Binding。代码如下:

package com.erbadagang.springcloudalibaba.stream.rocketmq.consumer.listener;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

public interface MySink {

    String ERBADAGANG_INPUT = "erbadagang-input";
    String TREK_INPUT = "trek-input";

    @Input(ERBADAGANG_INPUT)
    SubscribableChannel demo01Input();

    @Input(TREK_INPUT)
    SubscribableChannel trekInput();

}

这里,我们通过 @Input 注解,声明了一个名字为 erbadagang-inputtrek-input的 Input Binding。注意,这个名字要和我们配置文件中的 spring.cloud.stream.bindings 配置项对应上。

同时,@Input 注解的方法的返回结果为 SubscribableChannel 类型,可以使用它订阅消息来消费。MessageChannel 提供的订阅消息的方法如下:

public interface SubscribableChannel extends MessageChannel {

 boolean subscribe(MessageHandler handler); // 订阅

 boolean unsubscribe(MessageHandler handler); // 取消订阅

}

那么,我们是否要实现 MySink 接口呢?和MySource一样,答案也是不需要,还是全部交给 Spring Cloud Stream 的 BindableProxyFactory 大兄弟来解决。BindableProxyFactory 会通过动态代理,自动实现 MySink 接口。 而 @Input注解的方法的返回值,BindableProxyFactory 会扫描带有 @Input 注解的方法,自动进行创建。

例如说,#demo01Input() 方法被自动创建返回结果为 DirectWithAttributesChannel,它也是 SubscribableChannel 的子类。

3.2.4 Demo01Message

创建 [Demo01Message]类,示例 Message 消息。

友情提示:和[3.1.4 Demo01Message]基本一样。

3.2.5 Demo01Consumer

创建 [Demo01Consumer] 类,消费消息。代码如下:

package com.erbadagang.springcloudalibaba.stream.rocketmq.consumer.listener;

import com.erbadagang.springcloudalibaba.stream.rocketmq.consumer.message.Demo01Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

@Component
public class Demo01Consumer {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @StreamListener(MySink.ERBADAGANG_INPUT)
    public void onMessage(@Payload Demo01Message message) {
        logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
    }

    @StreamListener(MySink.TREK_INPUT)
    public void onTrekMessage(@Payload Demo01Message message) {
        logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
    }
}

在方法上,添加 @StreamListener 注解,声明对应的 Input Binding。这里,我们使用 MySink.ERBADAFANG_INPUT

又因为我们消费的消息是 POJO 类型,所以我们需要添加 @Payload 注解,声明需要进行反序列化成 POJO 对象。

3.2.6 ConsumerApplication

创建 [ConsumerApplication]类,启动应用。代码如下:

package com.erbadagang.springcloudalibaba.stream.rocketmq.consumer;

import com.erbadagang.springcloudalibaba.stream.rocketmq.consumer.listener.MySink;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;

@SpringBootApplication
@EnableBinding(MySink.class)
public class ConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class, args);
    }

}

使用 @EnableBinding 注解,声明指定接口开启 Binding 功能,扫描其 @Input@Output 注解。这里,我们设置为 MySink 接口。

3.3 测试单集群多实例的场景

本小节,我们会在一个消费者集群启动两个实例,测试在集群消费的情况下的表现。

① 执行 ConsumerApplication 两次,启动两个消费者的实例,从而实现在消费者分组 erbadagang-consumer-group-ERBADAGANG-TOPIC-01 下有两个消费者实例。

因为 IDEA 默认同一个程序只允许启动 1 次,所以我们需要配置 DemoProviderApplication 为 Allow parallel run。

② 执行 ProducerApplication,启动生产者的实例。
之后,请求 http://127.0.0.1:18080/demo01/send 接口三次,发送三条消息。此时在 IDEA 控制台看到消费者打印日志如下:

// ConsumerApplication 控制台 01
2020-08-05 09:39:29.073  INFO 50472 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer      : [onMessage][线程编号:78 消息内容:Demo01Message{id=-1682643477}]
2020-02-22 09:41:32.754  INFO 50472 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer      : [onMessage][线程编号:78 消息内容:Demo01Message{id=1890257867}]

// ConsumerApplication 控制台 02
2020-08-05 09:41:32.264  INFO 50534 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer      : [onMessage][线程编号:80 消息内容:Demo01Message{id=1401668556}]

符合预期。从日志可以看出,每条消息仅被消费一次。
访问http://localhost:18080/demo01/sendTrek也是同样效果,只不过是演示如何操作多个topic。

3.4 测试多消费集群多实例的场景

本小节,我们会在二个消费者集群启动两个实例,测试在集群消费的情况下的表现。

① 执行 ConsumerApplication 两次,启动两个消费者的实例,从而实现在消费者分组 derbadagangemo01-consumer-group-ERBADAGANG-TOPIC-01 下有两个消费者实例。

② 修改 sca-stream-rocketmq-consumer 项目的配置文件,修改 spring.cloud.stream.bindings.erbadagang-input.group 配置项,将消费者分组改成 NEW-erbadagang-consumer-group-ERBADAGANG-TOPIC-01

然后,执行 ConsumerApplication 两次,再启动两个消费者的实例,从而实现在消费者分组 NEW-erbadagang-consumer-group-ERBADAGANG-TOPIC-01 下有两个消费者实例。

③ 执行 ProducerApplication,启动生产者的实例。

之后,请求 http://127.0.0.1:18080/demo01/send 接口三次,发送三条消息。此时在 IDEA 控制台看到消费者打印日志如下:

// 消费者分组 `demo01-consumer-group-DEMO-TOPIC-01` 的ConsumerApplication 控制台 01
2020-08-06 10:17:07.886  INFO 50472 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer      : [onMessage][线程编号:78 消息内容:Demo01Message{id=-276398167}]
2020-08-06 10:17:08.237  INFO 50472 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer      : [onMessage][线程编号:78 消息内容:Demo01Message{id=-250975158}]

// 消费者分组 `demo01-consumer-group-DEMO-TOPIC-01` 的ConsumerApplication 控制台 02
2020-02-22 10:17:08.710  INFO 50534 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer      : [onMessage][线程编号:80 消息内容:Demo01Message{id=412281482}]

// 消费者分组 `X-demo01-consumer-group-DEMO-TOPIC-01` 的ConsumerApplication 控制台 01
2020-08-06 10:17:07.887  INFO 51092 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer      : [onMessage][线程编号:51 消息内容:Demo01Message{id=-276398167}]
2020-02-22 10:17:08.238  INFO 51092 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer      : [onMessage][线程编号:51 消息内容:Demo01Message{id=-250975158}]

// 消费者分组 `X-demo01-consumer-group-DEMO-TOPIC-01` 的ConsumerApplication 控制台 02
2020-08-06 10:17:08.787  INFO 51096 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer      : [onMessage][线程编号:77 消息内容:Demo01Message{id=412281482}]

从日志可以看出,每条消息被每个消费者集群都进行了消费,且仅被消费一次。

3.5 小结

至此,我们已经完成了 Stream RocketMQ 的快速入门。回过头看看 Binder 和 Binding 的概念,是不是就清晰一些了。

底线


本文源代码使用 Apache License 2.0开源许可协议,这里是本文源码Gitee地址,可通过命令git clone+地址下载代码到本地,也可直接点击链接通过浏览器方式查看源代码。

本文源代码

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,258评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,335评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,225评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,126评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,140评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,098评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,018评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,857评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,298评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,518评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,678评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,400评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,993评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,638评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,801评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,661评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,558评论 2 352