一、Spring Cloud Stream的消息分区
1.Sream解决了什么问题:
是一个用来为微服务应用构建消息驱动能力的框架。它可以基于Spring Boot 来创建独立的,可用于生产的Spring 应用程序。他通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动。参考资料
2.Stream的消息分组:
直观的理解就是一群消费者一起处理消息。需要注意的是:每个发送到消费组的数据,仅由消费组中的一个消费者处理。
默认情况下,当生产者发出一条消息到绑定通道上,这条消息会产生多个副本被每个消费者实例接收和处理,这就很可能会出现重复消费的问题,在某些场景下,我们希望生产者产生的消息只被其中一个实例消费,这个时候我们需要为这些消费者设置消费组来实现这样的功能,实现的方式非常简单,我们只需要在服务消费者端设置spring.cloud.stream.bindings.{channel-name}.group属性即可。
-
创建消息的发送者项目:
消息的发送者 修改POM文件:
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-eureka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-config-server</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
- 修改配置文件:
spring.application.name=config-server
server.port=9020
#设置服务注册中心地址
eureka.client.serviceUrl.defaultZone=http://admin:123456@eureka1:8761/eureka/,http://admin:123456@eureka2:8761/eureka/
#消息队列的链接配置
spring.rabbitmq.host=192.168.226.129
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
spring.rabbitmq.virtualHost=/
# 对应 MQ 是 exchange
spring.cloud.stream.bindings.outputProduct.destination=exchangeProduct
- 创建消息的发送接口:
public interface ISendeService {
String OUTPUT = "outputProduct";
@Output(OUTPUT)
SubscribableChannel send();
}
- 创建实体类:
public class Product implements Serializable {
private int id;
private String name;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Product() {
super();
}
public Product(int id, String name) {
super();
this.id = id;
this.name = name;
}
@Override
public String toString() {
return "Product [id=" + id + ", name=" + name + "]";
}
}
- 修改启动类:
@EnableBinding(value = ISendeService.class)
@EnableEurekaClient
@SpringBootApplication
public class ISendeApplication {
public static void main(String[] args) {
SpringApplication.run(ISendeApplication.class, args);
}
}
-
创建消息的两个接收者项目:
两个项目的配置相同端口号不同 修改POM文件:
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Dalston.SR5</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-eureka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-config-server</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
- 修改配置文件:
spring.application.name=config-server
server.port=9021
#设置服务注册中心地址
eureka.client.serviceUrl.defaultZone=http://admin:123456@eureka1:8761/eureka/,http://admin:123456@eureka2:8761/eureka/
#消息队列的链接配置
spring.rabbitmq.host=192.168.226.129
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
spring.rabbitmq.virtualHost=/
# 对应 MQ 是 exchange
spring.cloud.stream.bindings.inputProduct.destination=exchangeProduct
# 具体分组 对应 MQ 是 队列名称 并且持久化队列
spring.cloud.stream.bindings.inputProduct.group=groupProduct
- 创建实体类:
public class Product implements Serializable {
private int id;
private String name;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Product() {
super();
}
public Product(int id, String name) {
super();
this.id = id;
this.name = name;
}
@Override
public String toString() {
return "Product [id=" + id + ", name=" + name + "]";
}
- 创建接收消息的接口:
public interface IReceiveService {
String INPUT = "inputProduct";
@Input(INPUT)
SubscribableChannel receiver();
}
- 创建处理消息的类:
@Service
@EnableBinding({IReceiveService.class})
public class ReceiverService {
@StreamListener(IReceiveService.INPUT)
public void onReciver(Product product) {
//处理消息
System.out.println("Receiver-Group:"+product.toString());
}
}
- 修改启动类:
@EnableBinding(value = IReceiveService.class)
@EnableEurekaClient
@SpringBootApplication
public class IReceiverApplication {
public static void main(String[] args) {
SpringApplication.run(IReceiverApplication.class, args);
}
}
- 测试:
在消息的发送者项目中编写测试类。
@RunWith(SpringRunner.class)
@SpringBootTest(classes = ISendeApplication.class)
public class Test {
@Autowired
private ISendeService isendeService;
@org.junit.Test
public void testSend() {
Product product = new Product(01, "Test Sende");
for (int i = 0; i < 10; i++) {
// 将消息封装成Message
Message message = MessageBuilder.withPayload(product).build();
isendeService.send().send(message);
}
}
}
3.Stream的消息分区:
我们可能需要相同特征的消息能够总是被发送到同一个消费者上去处理,在消费组中我们可以保证消息不会被重复消费,但是在同组下有多个实例的时候,我们无法确定每次处理消息的是不是被同一消费者消费,此时我们需要借助于消息分区,消息分区之后,具有相同特征的消息就可以总是被同一个消费者处理了。
- 常用配置:
(1)给消费者设置消费组和主题:
设置消费组: spring.cloud.stream.bindings.<通道名>.group=<消费组名>
设置主题: spring.cloud.stream.bindings.<通道名>.destination=<主题名>
给生产者指定通道的主题:spring.cloud.stream.bindings.<通道名>.destination=<主题名>
(2)消费者开启分区,指定实例数量与实例索引:
(3)生产者指定分区键:
分区键: spring.cloud.stream.bindings.<通道名>.producer.partitionKeyExpress=<分区键>
分区数量: spring.cloud.stream.bindings.<通道名>.producer.partitionCount=<分区数量>
-
创建发送消息项目:
示例 - 修改POM文件:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-eureka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-config-server</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
</dependencies>
- 修改配置文件:
spring.application.name=config-server
server.port=9020
#设置服务注册中心地址
eureka.client.serviceUrl.defaultZone=http://admin:123456@eureka1:8761/eureka/,http://admin:123456@eureka2:8761/eureka/
#消息队列的链接配置
spring.rabbitmq.host=192.168.226.129
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
spring.rabbitmq.virtualHost=/
# 对应 MQ 是 exchange
spring.cloud.stream.bindings.outputProduct.destination=exchangeProduct
#通过该参数指定了分区键的表达式规则
spring.cloud.stream.bindings.outputProduct.producer.partitionKeyExpression=payload
#指定了消息分区的数量。
spring.cloud.stream.bindings.outputProduct.producer.partitionCount=2
- 创建实体类:
public class Product implements Serializable {
private int id;
private String name;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Product() {
super();
}
public Product(int id, String name) {
super();
this.id = id;
this.name = name;
}
@Override
public String toString() {
return "Product [id=" + id + ", name=" + name + "]";
}
}
- 创建接受消息的接口:
public interface ISendeService {
String OUTPUT = "outputProduct";
@Output(OUTPUT)
SubscribableChannel send();
}
- 修改启动类:
@EnableBinding(value = ISendeService.class)
@EnableEurekaClient
@SpringBootApplication
public class ISendeApplication {
public static void main(String[] args) {
SpringApplication.run(ISendeApplication.class, args);
}
}
-
创建两个消息的接收者项目:
示例 - 修改POM文件:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-eureka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-config-server</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
</dependencies>
- 修改配置文件:
spring.application.name=config-server
server.port=9023
#设置服务注册中心地址
eureka.client.serviceUrl.defaultZone=http://admin:123456@eureka1:8761/eureka/,http://admin:123456@eureka2:8761/eureka/
#消息队列的链接配置
spring.rabbitmq.host=192.168.226.129
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
spring.rabbitmq.virtualHost=/
# 对应 MQ 是 exchange
spring.cloud.stream.bindings.inputProduct.destination=exchangeProduct
# 具体分组 对应 MQ 是 队列名称 并且持久化队列
spring.cloud.stream.bindings.inputProduct.group=groupProduct
#开启消费者分区功能
spring.cloud.stream.bindings.inputProduct.consumer.partitioned=true
#指定了当前消费者的总实例数量
spring.cloud.stream.instanceCount=2
#设置当前实例的索引号,从 0 开始
spring.cloud.stream.instanceIndex=0
- 创建实体类:
private int id;
private String name;
- 接收消息的接口:
public interface IReceiveService {
String INPUT = "inputProduct";
@Input(INPUT)
SubscribableChannel receiver();
}
- 处理消息的类:
@Service
@EnableBinding({IReceiveService.class})
public class ReceiverService {
@StreamListener(IReceiveService.INPUT)
public void onReciver(Product product) {
//处理消息
System.out.println("Receiver-Partition:"+product.toString());
}
}
- 修改启动类:
@EnableBinding(value = IReceiveService.class)
@EnableEurekaClient
@SpringBootApplication
public class IReceiverApplication {
public static void main(String[] args) {
SpringApplication.run(IReceiverApplication.class, args);
}
}
- 测试:
在消息的发送者项目里,创建测试类。
@RunWith(SpringRunner.class)
@SpringBootTest(classes = ISendeApplication.class)
public class Test {
@Autowired
private ISendeService isendeService;
@org.junit.Test
public void testSend() {
Product product = new Product(01, "Test Sende");
for (int i = 0; i < 10; i++) {
// 将消息封装成Message
Message message = MessageBuilder.withPayload(product).build();
isendeService.send().send(message);
}
}
}
二、服务跟踪Sleuth:
1.为什么使用微服务跟踪?
2.创建Sleuth案例:
-
创建Sleuth-Consumer项目:
示例 - 修改POM文件:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-eureka</artifactId>
</dependency>
<!-- 添加 Feign 坐标 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-feign</artifactId>
</dependency>
<!-- 添加sleuth-product-service 坐标 -->
<dependency>
<groupId>com.zlw</groupId>
<artifactId>springcloud-sleuth-product-service</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-sleuth</artifactId>
</dependency>
</dependencies>
- 修改配置文件:
spring.application.name=sleuth-consumer
server.port=9020
#设置服务注册中心
eureka.client.serviceUrl.defaultZone=http://admin:123456@eureka1:8761/eureka/,http://admin:123456@eureka2:8761/eureka/
- 创建Service:
@FeignClient("sleuth-product-provider")
public interface ConsumerProductService extends ProductService {
}
- 创建Controller:
@RestController
public class ConsumerController {
@Autowired
private ConsumerProductService productService;
@RequestMapping(value = "/list", method = RequestMethod.GET)
public List<Product> list() {
List<Product> products = productService.findAll();
return products;
}
}
- 修改启动类:
@EnableEurekaClient
@EnableFeignClients
@SpringBootApplication
public class ConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
}
}
- 使用Logback日志输出,日志级别为DEBUG:
<?xml version="1.0" encoding="UTF-8" ?>
<configuration>
<!--定义日志文件的存储地址 勿在 LogBack 的配置中使用相对路径-->
<property name="LOG_HOME" value="${catalina.base}/logs/" />
<!-- 控制台输出 -->
<appender name="Stdout" class="ch.qos.logback.core.ConsoleAppender">
<!-- 日志输出编码 -->
<layout class="ch.qos.logback.classic.PatternLayout">
<!--格式化输出:%d表示日期,%thread表示线程名,%-5level:级别从左显示5个字符宽度%msg:日志消息,%n是换行符-->
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n
</pattern>
</layout>
</appender>
<!-- 按照每天生成日志文件 -->
<appender name="RollingFile" class="ch.qos.logback.core.rolling.RollingFileAppender">
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<!--日志文件输出的文件名-->
<FileNamePattern>${LOG_HOME}/server.%d{yyyy-MM-dd}.log</FileNamePattern>
<MaxHistory>30</MaxHistory>
</rollingPolicy>
<layout class="ch.qos.logback.classic.PatternLayout">
<!--格式化输出:%d表示日期,%thread表示线程名,%-5level:级别从左显示5个字符宽度%msg:日志消息,%n是换行符-->
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n
</pattern>
</layout>
<!--日志文件最大的大小-->
<triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
<MaxFileSize>10MB</MaxFileSize>
</triggeringPolicy>
</appender>
<!-- 日志输出级别 -->
<root level="DEBUG">
<appender-ref ref="Stdout" />
<appender-ref ref="RollingFile" />
</root>
<!--日志异步到数据库 -->
<!-- <appender name="DB" class="ch.qos.logback.classic.db.DBAppender">
日志异步到数据库
<connectionSource class="ch.qos.logback.core.db.DriverManagerConnectionSource">
连接池
<dataSource class="com.mchange.v2.c3p0.ComboPooledDataSource">
<driverClass>com.mysql.jdbc.Driver</driverClass>
<url>jdbc:mysql://127.0.0.1:3306/databaseName</url>
<user>root</user>
<password>root</password>
</dataSource>
</connectionSource>
</appender> -->
</configuration>
-
创建Sleuth-Service:
示例 修改POM文件:
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Dalston.SR5</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>
- 创建Service:
@RequestMapping("/product")
public interface ProductService {
// 查询所有商品
@RequestMapping(value = "/findAll", method = RequestMethod.GET)
public List<Product> findAll();
}
-
创建Sleuth-Provider项目:
示例 修改POM文件:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-eureka</artifactId>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>1.3.0</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<!-- 添加 product-service 坐标 -->
<dependency>
<groupId>com.zlw</groupId>
<artifactId>springcloud-sleuth-product-service</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-sleuth</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
- 修改配置文件:
spring.application.name=sleuth-product-provider
server.port=9001
#设置服务注册中心
eureka.client.serviceUrl.defaultZone=http://admin:123456@eureka1:8761/eureka/,http://admin:123456@eureka2:8761/eureka/
#----mysql-db-------
mybatis.type-aliases-package=com.book.product.pojo
mybatis.mapper-locations==classpath:com/book/product/mapper/*.xml
spring.datasource.driverClassName=com.mysql.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/book-product?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
spring.datasource.username=root
spring.datasource.password=root
- 修改Service:
@Service
public class ProductServiceImpl {
@Autowired
private ProductMapper productMapper;
// 查询所有商品
public List<Product> findAllProduct() {
ProductExample productExample = new ProductExample();
// selectByExampleWithBLOBs包括text文本信息
List<Product> list = productMapper.selectByExampleWithBLOBs(productExample);
return list;
}
}
- 修改Controller:
@RestController
public class ProductServiceController implements ProductService {
@Autowired
private ProductServiceImpl productServiceImpl;
@Override
public List<Product> findAll() {
return productServiceImpl.findAllProduct();
}
}