一.消息中间的几大应用场景
1、异步处理
比如用户在电商网站下单,下单完成后会给用户推送短信或邮件,发短信和邮件的过程就可以异步完成。因为下单付款是核心业务,发邮件和短信并不属于核心功能,并且可能耗时较长,所以针对这种业务场景可以选择先放到消息队列中,有其他服务来异步处理。
2、应用解耦:
假设公司有几个不同的系统,各系统在某些业务有联动关系,比如 A 系统完成了某些操作,需要触发 B 系统及 C 系统。如果 A 系统完成操作,主动调用 B 系统的接口或 C 系统的接口,可以完成功能,但是各个系统之间就产生了耦合。用消息中间件就可以完成解耦,当 A 系统完成操作将数据放进消息队列,B 和 C 系统去订阅消息就可以了。这样各系统只要约定好消息的格式就好了。
3、流量削峰
比如秒杀活动,一下子进来好多请求,有的服务可能承受不住瞬时高并发而崩溃,所以针对这种瞬时高并发的场景,在中间加一层消息队列,把请求先入队列,然后再把队列中的请求平滑的推送给服务,或者让服务去队列拉取。
4、日志处理
kafka 最开始就是专门为了处理日志产生的。
当碰到上面的几种情况的时候,就要考虑用消息队列了。如果你碰巧使用的是 RabbitMQ 或者 kafka ,而且同样也是在使用 Spring Cloud ,那可以考虑下用 Spring Cloud Stream。
二.概念
首先来认识一下 Spring Cloud Stream 中的几个重要概念。
Destination Binders:目标绑定器,目标指的是 kafka 还是 RabbitMQ,绑定器就是封装了目标中间件的包。如果操作的是 kafka 就使用 kafka binder ,如果操作的是 RabbitMQ 就使用 rabbitmq binder。
Destination Bindings:外部消息传递系统和应用程序之间的桥梁,提供消息的“生产者”和“消费者”(由目标绑定器创建)
Message:一种规范化的数据结构,生产者和消费者基于这个数据结构通过外部消息系统与目标绑定器和其他应用程序通信。

三.实现服务通信
1.order服务(消费者)
1.1 添加依赖
<!-- streamCloud rabbit-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
1.2 设置bootstrap.yml
# 在注册中心显示的服务名称
spring:
application:
name: order
cloud:
# 消息通道设置
stream:
binders:
myRabbit:
type: rabbit
environment:
spring:
rabbitmq:
host: 49.235.110.134
port: 5672
username: admin
password: admin
virtual-host: /
bindings:
output_channel:
destination: orderExchange
content-type: application/json
binder: myRabbit
group: order
...
spring.cloud.stream.binders,上面提到了 stream 的 3 个重要概念的第一个 「Destination binders」。上面的配置文件中就配置了一个 binder,命名为 myRabbit,指定 type 为 rabbit ,表示使用的是 rabbitmq 消息中间件,如果用的是 kafka ,则 type 设置为 kafka。environment 就是设置使用的消息中间件的配置信息,包括 host、port、用户名、密码等。可以设置多了个 binder,适配不同的场景。
spring.cloud.stream.bindings ,对应上面提到到 「Destination Bindings」。这里面可以配置多个input或者 output,分别表示消息的接收通道和发送通道,对应到 rabbitmq 上就是不同的 exchange。这个配置文件里定义了一个output,名称为output_channel。这个名称不是乱起的,在我们的程序代码中会用到,用来标示某个方法接收哪个 exchange 或者发送到哪个 exchange 。
每个通道下的 destination 属性指 exchange 的名称,binder 指定在 binders 里设置的 binder,上面配置中指定了 myRabbit。
1.3 创建通道
package cn.lovingliu.loving.message;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
/**
* @Author:LovingLiu
* @Description:
* @Date:Created in 2019-12-02
*/
public interface Barista {
String OUTPUT_CHANNEL = "output_channel";
@Output(Barista.OUTPUT_CHANNEL)
MessageChannel orderOutputChannel();
}
1.4 创建发送消息的Service
@EnableBinding(Barista.class)
@Service //注入到spring容器
public class RabbitmqSender {
//注入Barista
@Autowired
private Barista barista;
// 发送消息
public Boolean sendMessage(String message){
boolean sendStatus = barista.orderOutputChannel().send(MessageBuilder.withPayload(message).build());
System.err.println("--------------sending -------------------");
System.out.println("发送数据:" + message + ",sendStatus: " + sendStatus);
return sendStatus;
}
}
1.5 创建controller
@RestController
@RequestMapping("/order")
@Slf4j
public class OrderController {
@Autowired
private RabbitmqSender sender;
@GetMapping("/send")
public void send(){
Boolean sendStatus = sender.sendMessage("order发送的信息");
System.out.println("发送状态: "+sendStatus);
}
}
1.6 访问地址http://127.0.0.1:2000/order/send

2.product服务
2.1 添加依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
2.2 设置bootstrap.yml
spring:
application:
name: product
# 消息通道设置
cloud:
stream:
binders:
myRabbit:
type: rabbit
environment:
spring:
rabbitmq:
host: 49.235.110.134
port: 5672
username: admin
password: admin
virtual-host: /
bindings:
input_channel:
destination: orderExchange
content-type: application/json
binder: myRabbit
group: order
output_sendto_channel:
destination: orderSendExchange
content-type: application/json
binder: myRabbit
group: order-retry
input_sendto_channel:
destination: orderSendExchange
content-type: application/json
binder: myRabbit
group: order-retry
可以看到 input_channel、output_channel(product) 对应的 destination 是相同的,output_sendto_channel、input_sendto_channel 对应的 destination 也相同, 也就是对应相同的 exchange。一个表示消息来源,一个表示消息去向。
另外还可以设置 group 。因为服务很可能不止一个实例,如果启动多个实例,那么没必要每个实例都消费同一个消息,只要把功能相同的实例的 group 设置为同一个,那么就会只有一个实例来消费消息,避免重复消费的情况。如果设置了 group,那么 group 名称就会成为 queue 的名称,如果没有设置 group ,那么 queue就会根据 destination + 随机字符串的方式命名。
2.3 创建通道
public interface Barista {
String INPUT_CHANNEL = "input_channel";
@Input(Barista.INPUT_CHANNEL)
SubscribableChannel orderInputChannel();
String INPUT_SENDTO_CHANNEL = "input_sendto_channel";
@Input(Barista.INPUT_SENDTO_CHANNEL)
SubscribableChannel sendToInputChannel();
String OUTPUT_SENDTO_CHANNEL = "output_sendto_channel";
@Output(Barista.OUTPUT_SENDTO_CHANNEL)
MessageChannel sendToOutputChannel();
}
2.4 创建消费者业务代码
@EnableBinding(Barista.class)
@Service
@Slf4j
public class RabbitmqReceiver {
@StreamListener(Barista.INPUT_CHANNEL)
@SendTo(Barista.OUTPUT_SENDTO_CHANNEL)
public String receiver(String message){
log.info("【product】message=>{}",message);
String delMsg = String.format("【product】message=>%s",message);
return delMsg;
}
@StreamListener(Barista.INPUT_SENDTO_CHANNEL)
public void sendToReceiver(String message){
log.info("处理之后的信息=>{}",message);
}
}
2.5 访问地址http://127.0.0.1:2000/order/send

三.将配置迁移到统一配置中心
1.product服务
1.1项目配置文件bootstrap.yml
spring:
application:
name: product
cloud:
config:
discovery:
service-id: CONFIG
enabled: true
profile: dev
label: master
bus:
id: ${spring.application.name}:${spring.cloud.config.profile}:${random.value}
eureka:
client:
service-url:
defaultZone: http://127.0.0.1:1000/eureka,http://127.0.0.1:1001/eureka
instance:
instance-id: ${spring.application.name}:${spring.cloud.client.ip-address}:${spring.application.instance_id:${server.port}}
prefer-ip-address: true
1.2 github配置文件product-dev.yml
server:
port: 3000
spring:
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/loving_mall?characterEncoding=utf-8&useSSL=false&allowMultiQueries=true
username: root
password: root
redis:
host: 49.235.110.134
database: 0
port: 6379
password: lovingliu
timeout: 3000ms
jedis:
pool:
max-active: 10
max-wait: -1ms
max-idle: 10
min-idle: 0
cloud:
stream:
binders:
myRabbit:
type: rabbit
environment:
spring:
rabbitmq:
host: 49.235.110.134
port: 5672
username: admin
password: admin
virtual-host: /
bindings:
input_channel:
destination: orderExchange
content-type: application/json
binder: myRabbit
group: order
output_sendto_channel:
destination: orderSendExchange
content-type: application/json
binder: myRabbit
group: order-retry
input_sendto_channel:
destination: orderSendExchange
content-type: application/json
binder: myRabbit
group: order-retry
rabbitmq:
host: 49.235.110.134
port: 5672
username: admin
password: admin
mybatis:
mapper-locations: classpath:mapper/*.xml
redis:
auth-code-prefix: "username:%s"
login-token-prefix: "lovingliu:%s"
auth-code-expire: 120
login-token-expire: 604800
order-lock: 10000
user-cookie-key: "lovingUser"
admin-cookie-key: "lovingAdmin"
2.order服务
2.1 项目配置文件bootstrap.yml
# 在注册中心显示的服务名称
spring:
application:
name: order
cloud:
config:
discovery:
service-id: CONFIG
enabled: true
profile: dev
bus:
id: ${spring.application.name}:${spring.cloud.config.profile}:${random.value}
# 指定注册中心的地址 将服务注册到注册中心上
eureka:
client:
service-url:
defaultZone: http://127.0.0.1:1000/eureka,http://127.0.0.1:1001/eureka
instance:
instance-id: ${spring.application.name}:${spring.cloud.client.ip-address}:${spring.application.instance_id:${server.port}}
prefer-ip-address: true
2.2 github配置文件order-dev.yml
# 在注册中心显示的服务名称
spring:
# 数据库
cloud:
# 消息通道设置
stream:
binders:
myRabbit:
type: rabbit
environment:
spring:
rabbitmq:
host: 49.235.110.134
port: 5672
username: admin
password: admin
virtual-host: /
bindings:
output_channel:
destination: orderExchange
content-type: application/json
binder: myRabbit
group: order
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/loving_mall?characterEncoding=utf-8&useSSL=false&allowMultiQueries=true
username: root
password: root
# redis缓存
redis:
host: 49.235.110.134
database: 0
port: 6379
password: lovingliu
timeout: 3000ms
jedis:
pool:
max-active: 10
max-wait: -1ms
max-idle: 10
min-idle: 0
# 配置信息更新 广播(消息队列)
rabbitmq:
host: 49.235.110.134
port: 5672
username: admin
password: admin
# mybatis扫描
mybatis:
mapper-locations: classpath:mapper/*.xml
# 运行端口
server:
port: 2000
# 设置超时时间(所有调用的服务)
feign:
client:
config:
default:
connectTimeout: 10000
readTimeout: 10000
# log等级
logging:
level:
org.springframework.cloud.bus: debug
# redis
redis:
auth-code-prefix: "username:%s"
login-token-prefix: "lovingliu:%s"
auth-code-expire: 120
login-token-expire: 604800
order-lock: 10000
user-cookie-key: "lovingUser"
admin-cookie-key: "lovingAdmin"
四.配置详解
首先来认识一下 Spring Cloud Stream 中的几个重要概念。
Destination Binders:目标绑定器,目标指的是 kafka 还是 RabbitMQ,绑定器就是封装了目标中间件的包。如果操作的是 kafka 就使用 kafka binder ,如果操作的是 RabbitMQ 就使用 rabbitmq binder。
Destination Bindings:外部消息传递系统和应用程序之间的桥梁,提供消息的“生产者”和“消费者”(由目标绑定器创建)
Message:一种规范化的数据结构,生产者和消费者基于这个数据结构通过外部消息系统与目标绑定器和其他应用程序通信。