Spring Cloud Stream是一个用来为微服务应用构建消息驱动能力的框架。它可以基于Spring Boot来创建独立的、可用于生产的Spring应用程序。它通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动。Spring Cloud Stream引入了发布-订阅、消费组以及分区这三个核心概念。简单说,Spring Cloud Stream本质上就是整合了Spring Boot和Spring Integration,实现了一套轻量级的消息驱动的微服务框架。目前为止,Spring Cloud Stream只支持下面两个著名的消息中间件的自动化配置:
.RabbitMQ
.Kafka
快速入门
首先添加pom依赖
<!--stream集成kafka-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!-- web相关-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
Spring Cloud Stream定义了输入、输出接口Sink、Source、Processor
Sink接口
package org.springframework.cloud.stream.messaging;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;
/**
* Bindable interface with one input channel.
*
* @see org.springframework.cloud.stream.annotation.EnableBinding
* @author Dave Syer
* @author Marius Bogoevici
*/
public interface Sink {
String INPUT = "input";
@Input(Sink.INPUT)
SubscribableChannel input();
}
Source接口
public interface Source {
String OUTPUT = "output";
@Output(Source.OUTPUT)
MessageChannel output();
}
Processor接口
public interface Processor extends Source, Sink {
}
Sink和Source接口分别通过@Input和@Output定义了输入通道和输出通道,而Processor通过继承Source和Sink的方式同时实现了输入通道和输出通道。
通过@Input和@Output的value属性定义通道的名称。如果用户不设置的话,默认为input、output。当我们定义输出通道时,需要返回MessageChannel接口对象。而定义输入接口时需要返回SubscribableChannel对象,继承自MessageChannel。
注入接口绑定
.创建一个将Input作为输出通道的接口:
# 定义sender接口
public interface SinkSender{
@Output(Sink.INPUT)
MessageChannel output();
}
# 将SinkSender绑定到SinkReceiver
@EnableBinding(value={SinkSender.class})
@Slf4j
public class SinkReceiver{
@StreamListener(Sink.INPUT)
public void receive(Object payload){
log.info("Received: "+payload.getClass().getName());
}
}
#单元测试
@RunWith(SpringRunner.class)
@SpringBootTest
public class StreamSenderTest {
@Autowired
private SinkSender sinkSender;
@Test
public void contextLoads(){
sinkSender.output().send(MessageBuilder.withPayload("From
sinksender").build());
}
}
.本地启动zookeeper、kafka集群(安装过程请参考其他博客)
./zkserver.sh start ../conf/zoo3.cfg &
./zkserver.sh start ../conf/zoo2.cfg &
./zkserver.sh start ../conf/zoo1.cfg &
集群中三台机器的端口号分别为2181、2182、2183
./kafka-server-start.sh ../config/server1.properties &
./kafka-server-start.sh ../config/server2.properties &
./kafka-server-start.sh ../config/server3.properties &
端口号分别为:9091、9092、9093
.配置application.yml文件
spring:
application:
name: stream
cloud:
stream:
kafka:
binder:
brokers: localhost:9091,localhost:9092,localhost:9093
default-binder: kafka
bindings:
active_input:
destination: stream_active
active_output:
destination: stream_active
kafka:
# bootstrap-servers: localhost:9091,localhost:9092,localhost:9093
consumer:
group-id: stream1
producer:
key-serializer: org.apache.kafka.common.serialization.ByteArraySerializer
value-serializer: org.apache.kafka.common.serialization.ByteArraySerializer
client-id: producer1
listener:
poll-timeout: 6000000
server:
port: 8100
logging:
level: info
.运行该单元测试用例,会打印出下面的信息,表明消息被发送到了Input通道,并被响应的消费者消费。
2018-10-27 22:07:35.609 INFO 72927 --- [ main] springcloud.stream1.stream.SinkReceiver : Received: [B
注入消息通道
由于Spring Cloud Stream会根据绑定接口中的@Input和@Output注解来创建消息通道实例,所以我们也可以通过直接注入的方式来使用通道对象。比如我们可以通过下面的实例,注入上面列子中的SinkSender接口定义的名为input的消息输入通道。
@RunWith(SpringRunner.class)
@SpringBootTest
public class StreamSenderTest {
@Autowired
private MessageChannel input;
@Test
public void contextLoads(){
input.send(MessageBuilder.withPayload("From sinksender").build());
}
}
.也可以自定义消息通道
代码如下:
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
public interface ActiveChannel {
String ACTIVE_OUTPUT="active_output";
String ACTIVE_INPUT="active_input";
/**
* 发消息通道
* @return
*/
@Output(value = ACTIVE_OUTPUT)
MessageChannel sendActiveMessage();
/**
* 收消息通道
* @return
*/
@Input(ACTIVE_INPUT)
SubscribableChannel receiveActiveMessage();
}
.定义发送及消费通道
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import springcloud.stream1.stream.shop.ActiveChannel;
import javax.annotation.Resource;
@RestController
@Slf4j
public class ActiveController {
@Resource(name= ActiveChannel.ACTIVE_OUTPUT)
private MessageChannel sendActiveMessageChannel;
@GetMapping("/sendMsg")
public boolean sendMsg(String context){
boolean isSucc=sendActiveMessageChannel.send(MessageBuilder.withPayload(context).build());
return isSucc;
}
@StreamListener(ActiveChannel.ACTIVE_INPUT)
public void receiveShopMsg(Message<String> message){
log.info(message.getPayload());
}
}
.启动kafka、zookeeper集群
.在kafka控制台启动新的consumer,查看消费日志
./kafka-console-consumer.sh --bootstrap-server localhost:9091,localhost:9092,localhost:9093 --from-beginning --topic stream_active
.浏览器调用http://localhost:8100/sendMsg?context=naonao
# 响应结果
2018-10-27 22:14:41.554 INFO 72975 --- [container-0-C-1] s.stream1.controller.ActiveController : naonao
.kafka consoler打印日志如下:
很多时候我们在一个微服务中可能会创建多个不同名的MessageChannel实例,这样通过@Autowired注入时,要注意参数命名需要与通道同名时才能被正确注入,或者可以利用@Qualifier注解来指定实例名称,代码如下:
public interface MySource {
@Output("output-1)
MessageChannel output1();
@Output("output-2)
MessageChannel output2();
}
@Component
public class OutputSender{
@Autowired @Qualifier("output-1")
private MessageChannel output;
}