Spring Cloud Stream 集成 RocketMQ

Spring Cloud Stream 是什么?

它是什么

Spring Cloud Stream 是一个构建高度可扩展的事件驱动微服务的框架,与共享消息系统相连。

该框架提供了一个灵活的编程模型,它建立在已经建立和熟悉的 Spring 用法和最佳实践之上,包括支持持久化的 pub/sub 语义、消费者组和有状态分区。

绑定的一些实现

Spring Cloud Stream支持多种绑定实现,下表包括了GitHub项目的链接。

Spring Cloud Stream的核心构件是:

  • Destination Binders: 负责提供与外部消息系统集成的组件。
  • Destination Bindings: 作为消息中间件与应用程序的提供者和消费者之间的桥梁。
  • Message: 生产者和消费者用于与目的地装订器沟通的典型数据结构(从而通过外部消息系统与其他应用程序进行通信的典型数据结构)。

为什么用 Cloud Stream?

  1. 解耦。使用了 SCS 之后,我们只需要在配置文件中配置下对应的中间件服务器地址等信息,然后就可使用,使得业务中不需要出现具体的消息中间件。
  2. 便于迁移。例如项目中一开始使用的是 rabbitmq,后期要想迁移成 kafka 的话,如果使用传统方式,在使用的地方使用具体消息中间件的话,那么迁移的成本会很高,而使用 SCS 的话,只需要更改配置文件即可。

如何使用?(集成 rocket mq )

配置 JAVA_HOME 路径(以下为 mac 环境下的配置)

  1. chmod 777 /etc/profile

  2. sudo vim /etc/profile

  3. export JAVA_HOME=/Library/Java/JavaVirtualMachines/jdk1.8.0_162.jdk/Contents/Home

    export PATH=$JAVA_HOME/bin:$PATH

  4. source /etc/profile

安装启动 Rocket MQ

  1. 从官网 下载二进制文件
  2. 启动 nameserver nohup sh bin/mqnamesrv &
  3. 启动 broker nohup sh bin/mqbroker -n localhost:9876 autoCreateTopicEnable=true &
  4. 设置 nameserver 地址 export NAMESRV_ADDR=localhost:9876
  5. 生产者发送消息 sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
  6. 消费者消费消息 sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
  7. 关闭 broker sh bin/mqshutdown broker
  8. 关闭 nameserver sh bin/mqshutdown namesrv

提示:

如果没有执行 export NAMESRV_ADDR=localhost:9876

会导致 java.lang.IllegalStateException: org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to null failed

集成 SCS

  1. 通过 https://start.spring.io/ 创建一个初始化项目

  2. 这里贴出 pom 文件配置

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.2.6.RELEASE</version>
            <relativePath/> <!-- lookup parent from repository -->
        </parent>
        <groupId>org.example</groupId>
        <artifactId>mq</artifactId>
        <version>1.0-SNAPSHOT</version>
    
        <properties>
            <java.version>1.8</java.version>
            <spring-cloud.version>Hoxton.SR4</spring-cloud.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>com.alibaba.cloud</groupId>
                <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
                <version>2.2.1.RELEASE</version>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-stream-test-support</artifactId>
                <scope>test</scope>
            </dependency>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-stream</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
                <exclusions>
                    <exclusion>
                        <groupId>org.junit.vintage</groupId>
                        <artifactId>junit-vintage-engine</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <scope>test</scope>
            </dependency>
        </dependencies>
    
        <dependencyManagement>
            <dependencies>
                <dependency>
                    <groupId>org.springframework.cloud</groupId>
                    <artifactId>spring-cloud-dependencies</artifactId>
                    <version>${spring-cloud.version}</version>
                    <type>pom</type>
                    <scope>import</scope>
                </dependency>
            </dependencies>
        </dependencyManagement>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </build>
    </project>
    
  3. 创建 CustomerChannel

    public interface CustomerChannel {
    
        /**
         * 这里的名称对应了spring.cloud.stream.rocketmq.bindings.<channelName>
         */
        String OUTPUT = "my-output";
        String INPUT = "my-input";
    
        @Output(CustomerChannel.OUTPUT)
        MessageChannel output();
    
        @Input(CustomerChannel.INPUT)
        SubscribableChannel input();
    }
    
  4. 定义 TestController

    @RestController
    @EnableBinding({CustomerChannel.class})
    public class TestController {
    
        private final CustomerChannel customerChannel;
    
        public TestController(CustomerChannel customerChannel) {
            this.customerChannel = customerChannel;
        }
    
        /**
         * 使用一个controller断点模拟发送消息,可以在setHeader方法中设置header来实现消息过滤
         */
        @PostMapping("/message-send")
        public String testCustomInterfaceSendMsg() {
            Message<String> message = MessageBuilder.withPayload("send message")
                    .setHeader(RocketMQHeaders.TAGS, "tag2")
                    .setHeader("mytag", "my-tag")
                    .build();
    
            this.customerChannel.output().send(message);
    
            Message<String> message2 = MessageBuilder.withPayload("send message")
                    .setHeader(RocketMQHeaders.TAGS, "tag3")
                    .setHeader("mytag", "your-tag")
                    .build();
    
            this.customerChannel.output().send(message2);
    
            return "success";
        }
    
        /**
         * 使用@StreamListener来监听消息
         */
        @StreamListener(value = CustomerChannel.INPUT, condition = "headers['mytag']=='my-tag'")
        public void testCustomListener(Message message) {
            System.out.println(message.getHeaders().get("TAGS") + " " + message.getPayload().toString());
        }
    
        /**
         * 使用@StreamListener来监听消息
         */
        @StreamListener(value = CustomerChannel.INPUT, condition = "headers['mytag']=='your-tag'")
        public void testCustomListenerFilter(Message message) {
            System.out.println(message.getHeaders().get("TAGS") + " " + message.getPayload().toString());
        }
    }
    
  5. 配置 application.yml

    spring:
      cloud:
        stream:
          rocketmq:
            binder:
              name-server: localhost:9876
              enable-msg-trace: true
            bindings:
              my-input:
                consumer:
                  tags: tag2 || tag1 || tag3 || tag4 # tag 为 tag1/tag2/tag3/tag4
          bindings:
            my-input:
              destination: my-stream-topic # 相当于 rocketmq 的 topic
              group: my-stream-group
              binder: rocketmq #
              consumer:
                instanceCount: 1 # 指定实例数量
            my-output:
              destination: my-stream-topic # 相当于 rocketmq 的 topic
    
  6. 运行 MQApplication,使用 POST 方法请求 localhost:8989/message-send

核心原理

消息发送和消费的流程:

  1. 消息通过 MessageChannel(output) 进行发送,AbstractMessageChannel 实现了 MessageChannelAbstractSubscribableChannel 继承了 AbstractMessageChannel 并且实现了 SubscribableChannel,重写了其中的 subscribe 方法,subscribe() 指定了 MessageHandler,最终会调用 RocketMQMessageHandler 发送消息

  2. 消息发出之后,对应的消息中间件内部会有通道适配器,将中间件特有的消息格式转换为 SpringMessage,然后发送到 MessageChannel(input)

  3. StreamListener 订阅了对应的 input ,根据一定的条件,就能收到消费者发出的消息。

原理图

本文参考

https://github.com/alibaba/spring-cloud-alibaba/wiki/RocketMQ-en

http://rocketmq.apache.org/docs/quick-start/

RocketMQ 和 Spring Cloud Stream

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,185评论 6 503
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,652评论 3 393
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,524评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,339评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,387评论 6 391
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,287评论 1 301
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,130评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,985评论 0 275
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,420评论 1 313
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,617评论 3 334
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,779评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,477评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,088评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,716评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,857评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,876评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,700评论 2 354

推荐阅读更多精彩内容