1. 首先安装rabbitmq-management
方法有很多, 我这里用的是rabbitmq的docker镜像
docker安装可以参照这篇文章deepin系统下的docker安装
接下来使用docker安装rabbitmq, 我们可以在Docker Hub中搜索rabbitmq, 找到最新的版本安装
sudo docker run -d --hostname my-rabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:3.7.8-management
安装之后使用
docker ps -a
检查下, rabbitmq的镜像是否启动, 正常启动状态如下:
如果没有启动使用下面命令启动
docker start 1b84a82c0d0a
start后面那个id, 就是docker ps -a 第一列镜像的container id
2. 访问rabbitmq-management
我的rabbitmq是运行在docker上的, 启动时默认对外映射的端口是15672,
如果是直接安装的rabbitmq, 默认端口为5672,
我这里通过http://192.168.12.12:15672, 就可以访问到rabbitmq的管理端,
默认账户/密码是:guest/guest
3. springboot集成
通过idea的创建一个springboot工程 , pom文件如下
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.6.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.jhinno</groupId>
<artifactId>stream-rabbitmq</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>stream-rabbitmq</name>
<description>Demo project for Spring Boot</description>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
<spring-cloud.version>Finchley.SR2</spring-cloud.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
3.创建消息接收类
@Component
@Slf4j
@EnableBinding(Processor.class)
public class MyMQReciver {
@StreamListener(Processor.INPUT)
public void process(String message){
log.info("hahahah : "+message);
System.out.println("hahahah : "+message);
}
}
这里有两处关键点:
1. @StreamListener(Processor.INPUT)
这里其实是要声明一个订阅的键值, Processor类是一个org.springframework.cloud.stream.messaging jar包中内置的接口, 查看其源码可以看到它继承了Source和Sink两个类
package org.springframework.cloud.stream.messaging;
public interface Processor extends Source, Sink {
}
package org.springframework.cloud.stream.messaging;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
public interface Source {
String OUTPUT = "output";
@Output("output")
MessageChannel output();
}
package org.springframework.cloud.stream.messaging;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;
public interface Sink {
String INPUT = "input";
@Input("input")
SubscribableChannel input();
}
结构很简单, 我们也可以仿照去实现自己的Processor
2. @EnableBinding(Processor.class)
这里绑定的就是Processor(或者我们自己实现的Processor)
4.创建发送消息测试类
@RestController
public class SendController {
@Autowired
private Processor pipe;
@GetMapping("/send")
public void send(@RequestParam String message){
pipe.output().send(MessageBuilder.withPayload(message).build());
}
}
5.在application.yml中增加配置
spring:
cloud:
stream:
bindings:
input:
destination: queue.log.messages #要和output的destination一致, 这样才能将队列和写入消息的exchange关联起来
binder: local_rabbit
group: logMessageConsumers
output:
destination: queue.log.messages
binder: local_rabbit
binders:
local_rabbit:
type: rabbit
environment:
spring:
rabbitmq:
host: 192.168.12.12
port: 5672
username: guest
password: guest
virtual-host: /
6.启动springboot项目
首先浏览器进入rabbitmq管理端查看, 发现我们在application.yml中创建的output destination被自动创建出来了
input destination也被自动创建出来了,并且自动添加了绑定
确认rabbitmq这边没有问题后, 我们访问放松消息接口测试下, http://localhost:8080/send?message=hello
发现MyMQReciver已经成功接受到了消息
看到控制台的输出意味着消息发送和接受成功了.
记录这篇文章的原因是在网上搜了很多教程, 结果很多根本不能正确运行, 有些关键点甚至是错误的, 浪费了很多时间, 最后是在外网看了两篇Spring Cloud Stream 的文章才得以正确运行,下面是原始文章地址:
https://www.javainuse.com/spring/cloud-stream-rabbitmq-1
https://www.javainuse.com/spring/cloud-stream-rabbitmq-2
本文的代码都已经提交到github,https://github.com/LucienYang/spring-cloud-stream-rabbitmq-example.git, 欢迎批评指正