相关资料以及注意事项:
- 工程GitHub
- 需要配置本地RabbitMQ环境,方法见SpringBoot - 消息队列RabbitMQ安装测试
- SpringCloud Stream官方文档
- SpringCloud Stream官方文档中文翻译
- 环境:SpringBoot 2.0.4.RELEASE + SpringCloud Finchley.SR1
简介
SpringCloud Stream是SpringCloud对于消息中间件的进一步封装,它简化了开发人员对于消息队列的操作。目前仅支持RabbitMQ与Kafka。
示例代码
1.引入Maven
根据你的消息队列类型引入spring-cloud-starter-stream-rabbit
或者spring-cloud-starter-stream-kafka
2.相关注释
-
@input
: 设置输入信道名称。不设置参数,通道名称就默认为方法名。 -
@output
:设置输出信道名称。不设置参数,通道名称就默认为方法名。 -
@StreamListener
:设置监听信道,用于接受来自消息队列的消息 -
@SendTo
:配合@StreamListener
使用,在收到信息后发送反馈信息 -
@EnableBinding
:注解用于绑定一个或者多个接口作为参数
3.预设类
-
Sink
:stream中接受消息的接口 -
Source
:stream中输出消息的接口 -
Processor
:stream中绑定输入输出的接口,主要用来讲发布者和订阅者绑定到一起
4.配置文件参数
对于RabbitMQ,destination 对应的是exchange,group对应的是queue(带有前缀)。对于kafka,destination 对应的是Topic,group就是对应的消费group。对于一个应用集群,如果不需要重复消费消息,必须定义group,否则不必定义group(比如刷新配置消息)。接收消息通道和发送消息通道名不可以重复。需要在后台监控页面看到直观的对象数据,需要设置 content-type: application/json。
cloud:
stream:
bindings:
input: #通道名称
group: group1
content-type: application/json
destination: exchange
5.程序代码
一个controller,分别是传递字符串与传对象。
一个receiver,一个是接受并发出反馈信息,另一个为接收反馈。
@RestController
public class SendMessageController {
@Autowired
private Processor processor;
@GetMapping("/sendMessage")
public void process()
{
String message = new StringBuilder().append("now ").append(new Date()).toString();
processor.output().send(MessageBuilder.withPayload(message).build());
}
@GetMapping("/sendObject")
public void sendObject()
{
Person person = new Person();
person.setName("张三");
person.setAge(123);
//process中已设置input
processor.output().send(MessageBuilder.withPayload(person).build());
}
}
@Component
@EnableBinding(Processor.class)
@Slf4j
public class StreamReceiver {
@StreamListener(Processor.OUTPUT)
@SendTo(Processor.INPUT)
public String process(String message)
{
System.out.println(message);
log.info("process: StreamReceiver:{}",message);
return "我是回执";
}
//@StreamListener(Processor.OUTPUT)
//public void process1(Person person)
//{
// System.out.println(person);
// log.info("process1: StreamReceiver:{}",person);
//}
@StreamListener(Processor.INPUT)
public void process2(String message)
{
System.out.println(message);
log.info("process2: StreamReceiver:{}",message);
}
}
好了,那么有关于SpringCloud Stream就介绍到这里。