前言
在微服务架构的系统中,我们通常会使用轻量级的消息代理来构建一个共用的消息主题让系统中所有微服务实例都能连接上来,由于该主题中产生的消息会被所有实例监听和消费,所以我们称它为消息总线。在总线上的各个实例都可以方便地广播一些需要让其他连接在该主题上的实例都知道的消息,例如配置信息的变更或者其他一些管理操作等。
由于消息总线在微服务架构系统的广泛使用,所以它同配置中心一样,几乎是微服务架构中的必备组件。spring cloud
作为微服务架构综合性的解决方案,对此自然也有自己的实现,这就是spring cloud bus
。通过spring cloud bus
,可以非常容易的搭建起消息总线,同时实现了一些消息总线中的常用功能,比如配合spring cloud config
实现微服务应用配置信息的动态更新等。
消息代理
消息代理(message broker)是一种消息验证,传输,路由的架构模式。它在应用程序之间起到通信并最小化应用之间的依赖的作用,使得应用程序可以高效地解耦通信过程。消息代理是一个中间件产品,它的核心是一个消息的路由程序,用来实现接收和分发消息,并根据设定好的消息处理流来转发给正确的应用。它包括独立的通信和消息传递协议,能够实现组织内部和组织间的网络通信。设计代理的目的就是为了能够从应用程序中传入消息,并执行一些特别的操作,下面这些是企业应用中,我们经常使用消息代理的场景:
- 将消息路由到一个或多个目的地。
- 消息转化为其他的表现方式。
- 执行消息的聚集,消息的分解,并将结果发送到它们的目的地,然后重新组合响应返回给消息用户。
- 调用web服务来检索数据。
- 响应事件或错误。
- 使用发布-订阅模式来提供内容和基于主题的消息路由。
目前已经有非常多的开源产品可以供大家使用,比如:
- activemq
- kafka
- rabbitmq
- rocketmq
- ...
当前版本的spring cloud bus
仅支持两款中间件产品:rabbitmq
和kafka
。
rabbitmq实现消息总线
rabbitmq
是实现了高级消息队列协议(AMQP)的开源消息代理软件,也称为面向消息的中间件。Rabbitmq服务是高性能,可伸缩性而闻名的Erlang语言编写而成的,其集群和故障转移是构建在开放电信平台框架的。
AMQP是Advanced Message Queuing Protocol
的简称,它是一个面向消息中间件的开发式标准应用层协议,它定义了以下这些特性:
- 消息方向
- 消息队列
- 消息路由(包括点到点和发布-订阅模式)
- 可靠性
- 安全性
AMQP要求消息的提供者和客户端接收者的行为要实现对不同的供应商可以用相同的方式(比如SMTP,HTTP,FTP等)进行互相操作。在以往的中间件标准中,主要还是建立在api级别的,比如jms,集中于通过不同的中间件实现来建立标准化的程序间的互操作性,而不是在多个中间件产品间实现互操作性。
AMQP与JMS不同,JMS定义了一个API和一组消息收发必须要实现的行为,而AMQP是一个线路级协议。线路级协议描述的是通过网络发送的数据传输格式。因此,任何符合该数据格式的消息发送和接收工具都能互相兼容和进行操作,这样就能轻易实现跨技术平台的架构方案。
RabbitMQ以AMQP协议实现,所以它可以支持多种操作系统,多种编程语言,几乎可以覆盖所有主流的企业级技术平台。在微服务架构消息中间件的选型中,它是一个非常适合且优秀的选择。因此,在spring cloud bus
中包含了对rabbit
的自动化默认配置。
基本概念
介绍一些Rabbitmq的基本概念,
- Broker:可以理解成消息队列服务器的实体,它是一个中间件应用,负责接收消息生产者的消息,然后将消息发送到消息接收者或者其他的Broker。
- Exchange:消息交换机,是消息第一个到达的地方,消息通过它指定的路由规则,分发到不同的消息队列中去。
- Queue:消息队列,消息通过发发送和路由之后最终到达的地方,到达Queue的消息即进入逻辑上等待消费的状态。每个消息都会被发送到一个或多个队列。
- Binding:绑定,它的作用就是把Exchange和Queue按照路由规则绑定起来,也就是Exchange和Queue之间的虚拟连接。
- Routing Key:路由关键字,Exchange根据这个关键字进行消息投递。
- Virtual host:虚拟主机,它是对Broker的虚拟划分,将消费者,生产者和它们的依赖的AMQP相关结构进行隔离,一般都是为了安全考虑。比如,我们可以在一个Broker中设置多个虚拟主机,对不同用户进行权限的分离。
- Connection:连接,代表生产者,消费者,Broker之间进行通信的物理网络。
- Channel:消息通道,用于连接生产者和消费者的逻辑结构。在客户端的每个连接里,可建立多个Channel,每个Channel代表一个会话任务,通过Channel可以隔离同一个连接中的不同交互内容。
- Producer:消息生产者,制造消息并发送消息的程序。
- Consumer:消息消费者,接收消息并处理消息的程序。
消息投递到队列的整个过程大致如下:
1.客户端连接到消息队列服务器,打开一个Channel。
2.客户端声明一个Exchange,并设置相关属性。
3.客户端声明一个Queue,并设置相关属性。
4.客户端使用Routing Key,在Exchange和Queue之间建立好绑定关系。
5.客户端投递消息到Exchange。
- Exchange接收到消息后,根据消息的key和已经设置的Binding,进行消息路由,将消息投递到一个或多个Queue里。
Exchange也有几种类型。
1.Direct交互机:完全根据Key进行投递。比如,绑定时设置了Routing Key为abc,那么客户端提交的消息,只有设置了key为Routing Key的才会被投递到队列。
2.Topic交互机:对Key进行模式匹配后进行投递,可以使用符号#匹配一个或多个词,符号*匹配正好一个词。比如,abc.#匹配abc.def.ghi, abc.*只匹配abc.def.
3.Fanout交互机:不需要任何Key,它采用广播的模式,一个消息进来时,投递到与该交互机绑定的所有队列。
Rabbitmq支持消息持久化,也就是将数据写在磁盘上。为了数据安全考虑,大多数情况下都会选择持久化。消息队列持久化包括三个部分:
- Exchange持久化,在声明时指定durable >=1.
- Queue持久化,在声明时指定durable => 1.
- 消息持久化,在投递时指定delivery_mode => 2(1是非持久化)。
如果Exchange和Queue都是持久化,那么它们之间的Binding也是持久化的。如果Exchange和Queue两者之间有一个是持久化的,一个是非持久化的,就不允许建立绑定。
安装
快速入门
在springboot
中整合Rabbitmq
是一个非常容易的事情,
- 新建一个
spring boot
工程,命名为springboot-rabbitmq
- 在pom文件中引入依赖,其中
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.4.5.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
</dependencies>
- 在
application.properties
中配置关于Rabbitmq
的连接和用户信息,
spring:
application:
name: springboot-rabbitmq
rabbitmq:
host:
port: 5672
username:
password:
- 创建生产者
Sender
。通过注入AmqpTemplate
接口的实例来实现消息的发送,AmqpTemplate
接口定义了一套针对AMQP
协议的基础操作,在spring boot
中会根据配置来注入其具体的实现。
我们发送一字符串到zhihao.miao.order
队列中,
@Component
public class Sender {
@Autowired
private AmqpTemplate amqpTemplate;
public void send(){
String context = "hello "+ LocalDateTime.now().toString();
System.out.println("Sender: "+context);
this.amqpTemplate.convertAndSend("zhihao.miao.order",context);
}
}
- 创建消息消费者
Receiver
。通过@RabbitListener
注解定义该类对指定队列的监听,并用@RabbitHandler
注解来指定对消息的处理方法(不同的消息格式,@RabbitHandler
配置的方法的入参就不用,默认是byte[] 类型)。所以,该消费者实现了对zhihao.miao.order
队列的消费,消费操作作为输出消息的字符串内容。
@Component
@RabbitListener(queues = "zhihao.miao.order")
public class Receiver {
@RabbitHandler
public void process(String hello){
System.out.println("Receiver: "+hello);
}
}
- 创建
RabbitMQ
的配置类RabbitConfig
,用来配置队列,交换机,路由等高级信息。这里我们只配置队列,已完成一个基本的生产消费过程。
这一步相当于自动创建的过程,如果在控制台上已经创建了该队列,此步骤可以省略。
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfig {
@Bean
public Queue queue(){
return new Queue("zhihao.miao.order");
}
}
- 创建启动主类
@SpringBootApplication
public class RabbitMQApplication {
public static void main(String[] args) {
SpringApplication.run(RabbitMQApplication.class,args);
}
}
- 创建单元测试类,用来调用消息生产
@RunWith(SpringJUnit4ClassRunner.class)
@SpringApplicationConfiguration(classes = RabbitMQApplication.class)
public class RabbitMQApplicationTest {
@Autowired
private Sender sender;
@Test
public void setSender() throws Exception{
sender.send();
}
}
-
启动应用主类,在控制台看到创建了一个连接rabbitmq的连接
查看控制面板,查看连接信息
-
运行单元测试类,发送消息
整合spring cloud bus
定义了四个项目,config-server-eureka
(spring cloud config server
服务),eureka-server
(eureka
服务),order-service
(订单服务,也是spring cloud config
客户端),user-service
(用户服务,也是spring cloud config
的客户端),在git远程仓库中定义了二个项目,分别是user-service-config
和order-service-config
项目。
- 对其进行改造,修改pom文件,在
user-service
和order-service
中增加
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>
- 在
user-service
和order-service
中的配置文件中增加关于Rabbitmq
的连接和用户信息
spring:
rabbitmq:
host:
port: 5672
username:
password:
- 启动
config-server-eureka
,在启动user-service
和order-service
,我们可以在user-service
和order-service
的控制台上看到如下的内容,在启动的时候多了一个/bus/refresh
请求,
- 访问两个服务的请求
http://192.168.5.4:7070/user/index
和http://192.168.5.4:6060/order/index
查看配置中的配置内容,比如我order-service-config
的生产环境配置的配置信息是:
spring:
datasource:
username: '{cipher}af9b9ea63ce1c027d78c1c3414b425ad6f0093c20c69ad144eacb5a8b4522e7c'
check:
uri: pro-1.0
上面的spring.datasource.username
是zhihao.miao
的对称加密
user-service-config
配置的配置信息是:
spring:
datasource:
username: user-pro
check:
uri: pro-2.0
还有就是order-service
我启动了二个服务,6061服务,访问http://192.168.5.4:6061/order/index
内容和6060的结果一样
- 接着修改两个配置文件的内容,
user-service-config
和order-service-config
的check.url
属性,修改如下:
spring:
datasource:
username: '{cipher}af9b9ea63ce1c027d78c1c3414b425ad6f0093c20c69ad144eacb5a8b4522e7c'
check:
uri: pro-2.0
user-service
服务的配置文件:
spring:
datasource:
username: user-pro
check:
uri: pro-1.0
访问对应的页面,发现这些配置都没有生效,发送/bus/refresh
到order-service
和user-service
,order-service
服务
curl -X POST http://localhost:6060/bus/refresh
发现6060的控制台和6061的控制台都打印了很多输出内容,
这样只要请求order-service
服务上的一个实例就可以更新order-service
服务的所有实例的配置,依靠消息总线的功能实现。
user-service
服务也是,
curl -X POST http://localhost:7070/bus/refresh
原理分析
整个方案的架构如下图所示,其中包含了git仓库,config server
以及几个微服务应用的实例,这些微服务应用的实例中都引入了spring cloud bus
,所以它们都连接到了rabbitmq的消息总线上了。
当我们将系统启动起来之后,图中的"server A"的三个实例会请求Config Server
以获取配置信息,config server
根据应用配置规则从git仓库中获取配置信息并返回。
此时,我们需要修改"server A"的属性。首先,通过git管理工具去仓库中修改对应的属性值,但是这个修改并不会触发"server A"实例的属性更新。我们向"server A"的实例3发送post请求,访问/bus/refresh
接口。此时。“server A”的实例1和实例2从总线中获取到,并重新从config server
中获取它们的配置信息,从而实现配置信息的动态更新。
而从git仓库中配置的修改到发起/bus/refresh
的post请求这一步可以通过git仓库的web hook
来自动触发。由于所有连接到消息总线上的应用都会接收到更新请求,所以在web hook
中就不需要维护所有节点内容进行更新,从而解决通过web hook
来逐个进行刷新的问题。(一般不会使用web hook
功能)
使用git仓库的web hook进行消息总线的事件自动触发
URL:就是自动刷新的地址
当配置文件进行修改时会自动触发刷新事件,导致配置文件刷新。
指定刷新范围
局部刷新
我们通过向服务实例请求spring cloud bus
的/bus/refresh
接口,从而触发了总线上其他服务实例的/refresh
。但是在一些特殊场景下,我们希望可以刷新微服务中某个具体实例的配置。
spring cloud bus
对这种场景也有很好的支持,/bus/refresh
接口提供了一个destination
参数,用于指定具体要刷新的应用程序。比如,可以刷新user-service
的6061端口的服务/bus/refresh?destination=customers:6061
,此时总线上的各个应用实例会根据destination
属性的值来判断是否为自己的实例名,若符合才进行配置刷新,若不符合则忽略该消息。
再去修改一下order-service-config
的配置内容,执行刷新:
curl -X POST http://localhost:6061/bus/refresh?destination=order-service:6061
此时从控制台上也可以看出,6061的控制台上有刷新克隆仓库配置到本地的日志,而同一个服务的不同实例6060去没有日志输出,再去访问url请求验证一下
http://192.168.5.4:6061/order/index
配置已经改了。
http://192.168.5.4:6060/order/index
配置没有改变。
默认情况下,ApplicationContext ID
是spring.application.name:server.port
(也就是上面destination参数后面的order-service:6061),详见org.springframework.boot.context.ContextIdApplicationContextInitializer.getApplicationId(ConfigurableEnvironment
) 方法。
destination
参数除了可以定位具体的实例之外,还可以用来定位具体的服务。定位服务的原理是通过spring的PathMatecher(路径匹配)来实现的,比如/bus/refresh?destination=customers:**
,该请求会触发customers
服务的所有实例进行刷新。
再去修改order-service
服务的仓库order-service-config
的pro
环境的配置:
spring:
datasource:
username: '{cipher}af9b9ea63ce1c027d78c1c3414b425ad6f0093c20c69ad144eacb5a8b4522e7c'
check:
uri: pro-3.0
执行/bus/refresh
刷新,访问对应的页面发现同一个order-service
服务的不同实例都配置都是刷新了。
curl -X POST http://localhost:6061/bus/refresh?destination=order-service:**
应用的上下文id必须不一样
我们上面知道ApplicationContext id是由三部分组成(name,profile,index),name是spring.application.name,profile当前指定的配置文件,index是${vcap.application.instance_index:${spring.application.index:${server.port:${PORT:null}}}}
组成.
The bus tries to eliminate processing an event twice, once from the original ApplicationEvent and once from the queue. To do this, it checks the sending application context id againts the current application context id. If multiple instances of a service have the same application context id, events will not be processed. Running on a local machine, each service will be on a different port and that will be part of the application context id. Cloud Foundry supplies an index to differentiate. To ensure that the application context id is the unique, set spring.application.index to something unique for each instance of a service. For example, in lattice, set spring.application.index=${INSTANCE_INDEX} in application.properties (or bootstrap.properties if using configserver).
spring cloud bus
执行一次刷新就能自动刷新一个服务下的不同实例。 为了达到这个目的,它会检查发送应用程序上下文id是否一样。 如果服务的多个实例具有相同的应用程序上下文id,则一次刷新不能刷新这个服务下的所有实例。 我们在本地机器上运行,每个服务将在不同的端口上,这个时候的ApplicationContext id
是不一样的,因为端口不一致。而如果实际生产中都是一个服务的不同实例是部署到不同的服务器上的,端口,应用名,当前配置文件(pro)都是一致的,这样就会出现刷新事件的不能传播。 Cloud Foundry
提供区分的索引来标识一个服务的不同实例的ApplicationContext id
是唯一的。 为了确保应用程序上下文id是唯一的,请将spring.application.index
设置为服务的每个实例唯一的值。 例如,在application.properties
中设置spring.application.index = $ {INSTANCE_INDEX}
(如果使用configserver
,请设置bootstrap.properties
)。
自己没有去测试。当使用下面的架构优化后通过访问configserver
的url和destination参数是不是只需要配置configserver
的spring.application.index
的不一致即可还是configserver
和configclient
都要去配置。(如果是通过刷新configclient
则肯定要配置configclient
的spring.application.index
就行了)
参考资料
Addressing all instances of a service
Application Context ID must be unique
架构优化
既然spring cloud bus
的/bus/refresh
接口提供了针对服务和实例进行配置更新的参数,那么我们的架构也可以相应的做出一些调整。在之前的demo中。服务的配置更新需要通过向具体服务中的某个实例发送请求,再触发对整个服务集群的配置更新,虽然能实现功能,但是这样的结果是,我们指定的应用实例会不同于集群中的其他应用实例,这样会增加集群内部的复杂度,不利于将来的运维工作。比如,需要对服务实例进行迁移,那么我们不得不修改web hook
中的配置等。所以要尽可能地让服务集群中的各个节点是对等的。
我们主要做了下面的改动:
1.在config server
中引入了spring cloud bus
,将配置服务端也加入到消息总线来。
2./bus/refresh
请求不再发送到具体的服务实例上,而是发送给config server
,并通过destination
参数类指定需要更新配置的服务或实例。
通过上面的改动,我们的服务实例不需要再承担触发配置更新的职责。同时,对于git的触发等配置都只需要针对config server
即可。从而简化了集群上的一些维护工作。
进行改造吧,
config-server-eureka
服务,
加入依赖:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>
配置:
spring:
rabbitmq:
host:
port: 5672
username:
password:
启动config-server-eureka
服务,然后发现控制台上有/bus/refresh
端点进行输出。
就以order-service
来进行测试吧,启动二个实例(6060,6061),首先查看order-service-config
的pro
配置文件内容:
spring:
datasource:
username: '{cipher}af9b9ea63ce1c027d78c1c3414b425ad6f0093c20c69ad144eacb5a8b4522e7c'
check:
uri: pro-2.0
访问localhost:6060/order/index
结果显示是
username=zhihao.miao,check.uri===pro-2.0。
我们对配置文件进行修改
spring:
datasource:
username: '{cipher}af9b9ea63ce1c027d78c1c3414b425ad6f0093c20c69ad144eacb5a8b4522e7c'
check:
uri: pro-3.0
通过去访问config-server-eureka
服务提供的刷新端点(localhost:9090/bus/refresh
)进行配置刷新,
curl -X POST http://localhost:9090/bus/refresh?destination=order-service:**
我们发现这命令的时候,config-server-eureka
的控制台输出
而实际的order-service(6060,6061)
服务的控制台也输出一些操作,比如说刷新,从远程仓库git clone
新的代码配置等等,说明我们的架构优化是成功的。
追踪总线事件
Bus events (subclasses of RemoteApplicationEvent) can be traced by setting spring.cloud.bus.trace.enabled=true. If you do this then the Spring Boot TraceRepository (if it is present) will show each event sent and all the acks from each service instance. Example (from the /trace endpoint):
通过设置spring.cloud.bus.trace.enabled=true
可以追踪消息总线上的事件传播。可以通过每个服务的/trace的端点来追踪所有事件的发起和消息回执。
在user-service
中加入配置,如下,
spring:
cloud:
bus:
trace:
enabled: true
然后重启服务,访问url
http://localhost:7070/trace
页面显示如下:
显示RefreshRemoteApplicationEvent
事件从user-service:7070
发起,传播到user-service
下的所有节点。
参考资料
Tracing Bus Events
传播自己的事件
略
参考资料
Broadcasting Your Own Events