Spring Cloud Stream rabbitmq集成说明

1. 首先安装rabbitmq-management

方法有很多, 我这里用的是rabbitmq的docker镜像

docker安装可以参照这篇文章deepin系统下的docker安装

接下来使用docker安装rabbitmq, 我们可以在Docker Hub中搜索rabbitmq, 找到最新的版本安装

sudo docker run -d --hostname my-rabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:3.7.8-management

安装之后使用

docker ps -a

检查下, rabbitmq的镜像是否启动, 正常启动状态如下:


image.png

如果没有启动使用下面命令启动

docker start 1b84a82c0d0a

start后面那个id, 就是docker ps -a 第一列镜像的container id

2. 访问rabbitmq-management

我的rabbitmq是运行在docker上的, 启动时默认对外映射的端口是15672,
如果是直接安装的rabbitmq, 默认端口为5672,
我这里通过http://192.168.12.12:15672, 就可以访问到rabbitmq的管理端,
默认账户/密码是:guest/guest

3. springboot集成

通过idea的创建一个springboot工程 , pom文件如下

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.6.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.jhinno</groupId>
    <artifactId>stream-rabbitmq</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>stream-rabbitmq</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
        <spring-cloud.version>Finchley.SR2</spring-cloud.version>
    </properties>

    <dependencies>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-bus-amqp</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

3.创建消息接收类

@Component
@Slf4j
@EnableBinding(Processor.class)
public class MyMQReciver {

    @StreamListener(Processor.INPUT)
    public void process(String message){
        log.info("hahahah : "+message);
        System.out.println("hahahah : "+message);
    }
}

这里有两处关键点:
1. @StreamListener(Processor.INPUT)
这里其实是要声明一个订阅的键值, Processor类是一个org.springframework.cloud.stream.messaging jar包中内置的接口, 查看其源码可以看到它继承了Source和Sink两个类

package org.springframework.cloud.stream.messaging;

public interface Processor extends Source, Sink {
}
package org.springframework.cloud.stream.messaging;

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

public interface Source {
    String OUTPUT = "output";

    @Output("output")
    MessageChannel output();
}
package org.springframework.cloud.stream.messaging;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

public interface Sink {
    String INPUT = "input";

    @Input("input")
    SubscribableChannel input();
}

结构很简单, 我们也可以仿照去实现自己的Processor
2. @EnableBinding(Processor.class)
这里绑定的就是Processor(或者我们自己实现的Processor)

4.创建发送消息测试类

@RestController
public class SendController {

    @Autowired
    private Processor pipe;

    @GetMapping("/send")
    public void send(@RequestParam String message){
        pipe.output().send(MessageBuilder.withPayload(message).build());
    }
}

5.在application.yml中增加配置

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: queue.log.messages #要和output的destination一致, 这样才能将队列和写入消息的exchange关联起来
          binder: local_rabbit
          group: logMessageConsumers
        output:
          destination: queue.log.messages
          binder: local_rabbit
      binders:
        local_rabbit:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: 192.168.12.12
                port: 5672
                username: guest
                password: guest
                virtual-host: /

6.启动springboot项目

首先浏览器进入rabbitmq管理端查看, 发现我们在application.yml中创建的output destination被自动创建出来了


image.png

input destination也被自动创建出来了,并且自动添加了绑定


image.png

image.png

确认rabbitmq这边没有问题后, 我们访问放松消息接口测试下, http://localhost:8080/send?message=hello
发现MyMQReciver已经成功接受到了消息

image.png

看到控制台的输出意味着消息发送和接受成功了.

记录这篇文章的原因是在网上搜了很多教程, 结果很多根本不能正确运行, 有些关键点甚至是错误的, 浪费了很多时间, 最后是在外网看了两篇Spring Cloud Stream 的文章才得以正确运行,下面是原始文章地址:

https://www.javainuse.com/spring/cloud-stream-rabbitmq-1
https://www.javainuse.com/spring/cloud-stream-rabbitmq-2

本文的代码都已经提交到github,https://github.com/LucienYang/spring-cloud-stream-rabbitmq-example.git, 欢迎批评指正

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,616评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,020评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,078评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,040评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,154评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,265评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,298评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,072评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,491评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,795评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,970评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,654评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,272评论 3 318
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,985评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,223评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,815评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,852评论 2 351

推荐阅读更多精彩内容