在微服务架构的系统中,我们通常会使用轻量级的消息代理来构建一个共用的消息主题让系统中所有微服务实例都连接上来,由于该主题中产生的消息会被所有实例监听和消费,所以我们称它为消息总线。在总线上的各个实例都可以方便地广播一些需要让其他连接在该主题上的实例都知道的消息,例如配置信息的变更或者其他一些管理操作等。
由于消息总线在微服务架构系统中被广泛使用,所以它同配置中心一样,几乎是微服务架构中的必备组件。Spring Cloud作为微服务架构综合性的解决方案,对此自然也有自己的实现,这就是本章我们将要具体介绍的Spring Cloud Bus。通过使用Spring Cloud Bus,可以非常容易地搭建起消息总线,同时实现了一些消息总线中的常用功能,比如,配合Spring Cloud Config实现微服务应用消息的动态更新等。
在本章中,我们将从消息代理的基础开始,由浅入深介绍如何使用Spring Cloud Bus构建微服务架构中的消息总线。
消息代理(Message Broker)是一种消息验证、传输、路由的架构模式。它在应用程序之间起到通信调度并最小化应用之间的依赖的作用,使得应用程序可以高效地解耦通信过程。消息代理是一个中间件产品,它的核心是一个消息的路由程序,用来实现接收和分发消息,并根据设定好的消息处理流来转发给正确的应用。它包括独立的通信和消息传递协议,能够实现组织间的网络通信。设计代理的目的就是为了能够从应用程序中传入消息,并执行一些特别的操作,下面这些是在企业应用中,我们经常需要使用消息代理的场景:
▪️将消息路由到一个或多个目的地
▪️消息转化为其他的表现方式
▪️执行消息的聚集、消息的分解,并将结果发送到它们的目的地,然后重修组合响应返回给消息用户
▪️调用Web服务来检索数据
▪️响应事件或错误
▪️使用发布-订阅模式来提供内容或基于主题的消息路由。
目前已经有非常多的开源产品可以供大家使用,比如:
▪️ActiveMQ
▪️Kafka
▪️RabbitMQ
▪️RocketMQ
▪️...
当前版本的Spring Cloud Bus仅支持两款中间件产品:RabbitMQ和Kafka。在下面的章节中,我们将分别介绍如何使用这两款消息中间件与Spring Cloud Bus配合实现消息总线。
RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件,也称为面向消息的中间件。RabbitMQ服务器是用高性能、可伸缩而闻名的Erlang语言编写而成的,其集群和故障转移是构建在开放电信平台框架上的。
AMQP是Advanced Message Queuing Protocol的简称,它是一个面向消息中间件的开放式标准应用层协议。它定义了以下这些特性:
▪️消息方向
▪️消息队列
▪️消息路由(包括点到点和发布-订阅模式)
▪️可靠性
▪️安全性
AMQP要求消息的提供者和客户端接收者的行为要实现对不同供应商可以用相同的方式(比如SMTP、HTTP、FTP等)进行互相操作。在以往的中间件标准中,主要还是建立在API级别,比如JMS,集中于通过不同的中间件实现来建立标准化的程序间的互操作性,而不是在多个中间件产品实现互操作性。
AMQP与JMS不同,JMS定义了一个API和一组消息收发必须实现的行为,而AMQP是一个线路级协议。线路级协议描述的是通过网络发送的数据传输格式。因此,任何符合该数据格式的消息发送和接收工具都能互相兼容和进行操作,这样就能轻易实现跨技术平台的架构方案。
RabbitMQ以AMQP协议实现,所以它可以支持多种操作系统、多种编程语言,几乎可以覆盖所有主流的企业级技术平台。在微服务架构消息中间件的选型中,它是一个非常合适且优秀的选择。因此,在Spring Cloud Bus中包含了对Rabbit的自动化默认配置,在下面的章节中,我们将先从RabbitMQ的基础安装和使用开始,循序渐进地学习如何与Spring Cloud Bus进行整合实现消息总线。
在开始具体实践之前,我们先介绍一些关于RabbitMQ的基本概念,为后续的讲解做一些必要铺垫(如果对于RabbitMQ已经很熟悉的读者可以跳过本节,直接从“快速入门”一节开始阅读)。
▪️Broker:可以理解为消息队列服务器的实体,它是一个中间件应用,负责接收消息生产者的消息,然后将消息发送至消息接收者活着其他的broker。
▪️Exchange:消息交换机,是消息第一个到达的地方,消息通过它指定的路由规则,分发到不同的消息队列中去。
▪️Queue:消息队列,消息通过发送和路由之后最终到达的地方,到达Queue的消息即进入逻辑上等待消费的状态。每个消息都会被发送到一个或多个队列。
▪️Binding:绑定,它的作用就是把Exchange和queue按照路由规则绑定起来,也就是Exchange和Queue之间的虚拟连接。
▪️Routing Key:路由关键字,Exchange根据这个关键字进行消息投递。
▪️Vitual host:虚拟主机,它是对Broker的虚拟划分,将消费者、生产者和它们依赖的AMQP相关结构进行隔离,一般都是为了安全考虑。比如,我们可以在一个Broker中设置多个虚拟主机,对不同用户进行权限的分离。
▪️Connection:连接,代表生产者、消费者、Broker之间进行通信的物理网络。
▪️Channel:消息通道,用于连接生产者和消费者的逻辑结构。在客户端的每个连接里,可以建立多个Channel,每个Channel代表一个会话认为,通过Channel可以隔离同一连接中的不同交互内容。
▪️Producer:消息生产者,制造消息并发送消息的程序。
▪️Consumer:消息消费者,接收消息并处理消息的程序。
消息投递到队列的整个过程大致如下:
▪️客户端连接到消息队列服务器,打开一个Channel。
▪️客户端声明一个Exchange,并设置相关属性。
▪️客户端使用Routing Key,在Exchange和Queue之间建立好绑定关系。
▪️客户端投递消息到Exchange。
▪️Exchange接收到消息后,根据消息的Key和已经设置的Binding,进行消息路由,将消息投递到一个或多个Queue里。
Exchange也有几种类型。
▪️Direct交换机:完全根据Key进行投递。比如,绑定时设置了Routing Key为abc,那么客户端提交的消息,只有设置了Key为abc的才会被投递到队列。
▪️Topic交换机:对Key进行模式匹配后进行投递,可以使用符号#匹配一个或多个次,符号*匹配正好一个词。比如,abc.#匹配abc.def.ghi,abc.*只能匹配abc.def。
▪️Fanout交换机:不需要任何Key,它采取广播的模式,一个消息进来时,投递到与该交换机绑定的所有队列。
RabbitMQ支持消息的持久化,也就是将数据写在磁盘上。为了数据安全考虑,大多数情况下都会选择持久化。消息队列持久化。消息队列持久化包括3个部分。
▪️Exchange持久化,在声明时指定durable=>1。
▪️Queue持久化,在声明时指定durable=>1。
▪️消息持久化,在投递时指定delivery_mode=>2(1是非持久化)。
如果Exchange和Queue都是持久化的,那么它们之间的Binding也是持久化的。如果Exchange和Queue两者之间有一个持久化的,一个是非持久化的,就不允许建立绑定。
打开cd /usr/local/Cellar/rabbitmq/3.7.2;注意brew安装的软件都在/usr/local/Cellar中,启动如下操作
http://localhost:15672/;初始账户密码都是guest;
这样就已经完成了rabbitmq的安装。
创建一个springcloud的用户
其中,Tags标签是RabbitMQ中的角色分类,共有下面几种。
▪️none:不能访问management plugin
▪️management:用户可以通过AMQP做的任何事,外加如下内容:
▪️列出自己可以通过AMQP登入的virtual hosts。
▪️查看自己的virtual hosts中的queues、exchange和bindings。
▪️查看和关闭自己的channels和connections。
▪️查看有关自己的virtual hosts的“全局”统计信息,包含其他用户在这些virtual hosts中的活动。
▪️policymaker:management可以做的任何事,外加如下内容:
▪️查看、创建和删除自己的virtual hosts所属的policies和parameters。
▪️monitoring:management可以做的任何事,外加如下内容:
▪️列出所有virtual hosts,包括它们不能登录的virtual hosts。
▪️查看其他用户的connections和channels。
▪️查看节点级别的数据,如clustering和memory的使用情况。
▪️查看真正的关于所有virtual hosts的全局的统计信息。
▪️administrator:policymaker和monitoring可以做的任何事,外加如下内容:
▪️创建和删除virtual hosts
▪️查看、创建和删除users
▪️查看、创建和删除permissions
▪️关闭其他用户的connections。
pom.xml配置
通过上面的示例,我们在Spring Boot应用中引入spring-boot-starter-amqp模块,进行简单配置就完成了对RabbitMQ的消息生产和消费的开发内容。然而在实际应用中,还有很多内容没有演示,比如之前提到的一些概念:交换机、路由关键字、绑定虚拟主机等,这里不做更多的讲解,读者可以自行查阅RabbitMQ的官方教程,其中有更全面的讲解。在这里,我们需要重点理解的是,在整个生产消费过程中,生产和消费是一个异步操作,这也是在分布式系统中要使用消息代理的重要原因,以此我们可以使用通信来解耦业务逻辑。在这个例子中,读者可以进一步做一些测试,比如,不运行消费者,先运行生产者,此时可以看到在RabbitMQ Server管理页面的Queues选项卡下多了一些待处理的消息,这时我们再启动消费者,它就会处理这些消息,所以通过生产消费模式的异步操作,系统间调用酒没有同步调用需要那么高的实时性要求,同时也要容易控制处理的吞吐量以保证系统的正常运行等。
在上一节,我们已经介绍了关于消息代理、AMQP以及RabbitMQ的基础知识和使用方法。在下面的内容中,我们开始具体介绍Spring Cloud Bus的配置,并以一个Spring Cloud Bus与Spring Cloud Config结合的例子来实现配置内容的实时更新。
先回顾一下,在上一章Spring Cloud Config的介绍中,我们留了一个悬念:如何实现对配置信息的实时更新。虽然我们已经能够通过/refresh接口和Git仓库的Web Hook来实现Git仓库中的内容修改触发应用配置的话,随着系统的不断扩展,会变得越来越难以维护,而消息代理中间件是解决该问题最为合适的方案。是否还记得我们在介绍消息代理中的特点时提到过这样一个功能:消息代理中间件可以将消息路由到一个或多个目的地。利用这个功能,我们就能完美地解决该问题,下面来说说Spring Cloud Bus中的具体实现方案。
Spring Cloud Bus中的RabbitMQ整合使用了Spring Boot的ConnectionFactory,所以在Spring Cloud Bus中支持使用以spring.rabbitmq为前缀的Spring Boot配置属性,具体的配置属性、说明以及默认值如下所示:
▪️spring.rabbitmq.address:客户端连接的地址,有多个的时候使用逗号分隔,该地址可以是IP与Port的结合
▪️spring.rabbitmq.cache.channel.checkout-timeout:当缓存已满时,获取Channel的等待时间,单位为毫米
▪️spring.rabbitmq.cache.channel.size:缓存中保存的Channel数量
▪️spring.rabbitmq.cache.connection.mode:CHANNEL,连接缓存的模式
▪️spring.rabbitmq.cache.connection.size:缓存的连接数
▪️spring.rabbitmq.connection-timeout:连接超时参数,单位为毫秒;设置为“0”代表无穷大
▪️spring.rabbitmq.dynamic:true,默认创建一个AmqpAdmin的Bean
▪️spring.rabbitmq.host:localhost,RabbitMQ的主机地址
▪️spring.rabbitmq.listener.acknowledge-mode:容器的acknowledge模式
▪️spring.rabbitmq.listener.auto-startup:true,启动时自动启动容器
▪️spring.rabbitmq.listener.concurrency:消费者的最小数量
▪️spring.rabbitmq.listener.default-requeue-rejected:true,投递失败时是否重新排队
▪️spring.rabbitmq.listener.max-concurrency:消费者的最大数量
▪️spring.rabbitmq.listener.prefetch:在单个请求中处理的消息个数,它应该大于等于事务数量
▪️spring.rabbitmq.listener.retry.enabled:false,不论是不是重试的发布
▪️spring.rabbitmq.listener.retry.initial-interval:1000,第一次与第二次投递尝试的时间间隔
▪️spring.rabbitmq.listener.retry.max-attempts:3,尝试投递消息的最大数量
▪️spring.rabbitmq.listener.retry.max-interval:10000,两次尝试的最大时间间隔
▪️spring.rabbitmq.listener.retry.stateless:true,不论重试是有状态的还是无状态的
▪️spring.rabbitmq.listener.retry.transaction-size:在一个事务中处理的消息数量。为了获得最佳效果,该值应设置为小于等于每个请求中处理的消息个数,即spring.rabbitmq.listener.prefetch的值
▪️spring.rabbitmq.password:登录到RabbitMQ的密码
▪️spring.rabbitmq.port:5672,RabbitMQ的端口号
▪️spring.rabbitmq.pulisher-confirms:false,开启Pulisher Confirm机制
▪️spring.rabbitmq.publisher-returns:false,开启Publisher Return机制
▪️spring.rabbitmq.requested-heartbeat:请求心跳超时时间,单位为秒
▪️spring.rabbitmq.ssl.enabled:false,开启SSL支持
▪️spring.rabbitmq.ssl.key-store:保存SSL证书的地址
▪️spring.rabbitmq.ssl.key-store-password:访问SSL证书的地址使用的密码
▪️spring.rabbitmq.ssl.trust-store:SSL的可信地址
▪️spring.rabbitmq.ssl.trust-store-password:访问SSL的可信地址的密码
▪️spring.rabbitmq.ssl.algorithm:SSL算法,默认使用Rabbit的客户端算法库
▪️spring.rabbitmq.template.mandatory:false,启用强制消息
▪️spring.rabbitmq.template.receive-timeout:0,receive()方法的超时时间
▪️spring.rabbitmq.template.retry.enabled:false,设置为true的时候RabbitTemplate能够实现重试
▪️spring.rabbitmq.template.retry.initial-interval:1000,第一次与第二次发布消息的时间间隔
▪️spring.rabbitmq.template.retry.max-attempts:3,尝试发布消息的最大数量
▪️spring.rabbitmq.template.retry.max-interval:10000,尝试发布消息的最大时间间隔
▪️spring.rabbitmq.template.retry.multiplier:1.0,上一次尝试时间间隔的乘数
▪️spring.rabbitmq.username:登录到RabbitMQ的用户名
▪️spring.rabbitmq.virtual-host:连接到RabbitMQ的虚拟主机
Spring Cloud Bus除了支持RabbitMQ的自动化配置之外,还支持现在被广泛应用的Kafka。在本节中,我们将搭建一个Kafka的本地环境,并且通过它来尝试使用Spring Cloud Bus对Kafka的支持,实现消息总线的功能。
Kafka是一个由LinkedIn开发的分布式消息系统,它于2011年年初开源,现在由著名的Apache基金会维护与开发。Kafka使用Scala实现,被用于LinkedIn的活动流和运营数据处理的管道,现在也被诸多互联网企业广泛地用作数据流管道和消息系统。
Kafka是基于消息发布-订阅模式实现的消息系统,其主要设计目标如下所述。
▪️消息持久化:以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上的数据也能保证常数时间复杂度的访问性能。
▪️高吞吐:在廉价的商用机器上也能支持单机每秒10万条以上的高吞吐量。
▪️分布式:支持消息分区以及分布式消费,并保证分区内的消息顺序。
▪️跨平台:支持不同技术的客户端(如Java、PHP、Python等)。
▪️实时性:支持实时数据处理和离线数据处理。
▪️伸缩性:支持水平扩展。
Kafka中涉及的一些基本概念,如下所示。
▪️Broker:Kafka集群包含一个或多个服务器,这些服务器被称为Broker。
▪️Topic:逻辑上同RabbitMQ的Queue队列相似,每条发布到Kafka集群的消息都必须有一个Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个Broker上,但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)。
▪️Partition:Partition是物理概念上的分区,为了提供系统吞吐率,在物理上每个Topic会分成一个或多个Partition,每个Partition对应一个文件夹(存储对应分区的消息内容和索引文件)
▪️Producer:消息生产者,负责生产消息并发送到Kaflka Broker。
▪️Consumer:消息消费者,向Kafka Broker读取消息并处理的客户端。
▪️Consumer Group:每个Conusmer属于一个特定的组(可为每个Consumer指定属于一个组,若不指定则属于默认组),组可以用来实现一个消息被组内多个成员消费等功能。
在对Kafka有了一些基本了解之后,下面我们来尝试搭建一个Kafka服务端,并体验一下基于Kafka的消息生产与消费。
首先,我们需要从官网上下载安装介质。下载地址:http://kafka.apache.org/downloads
由于Kafka的设计中依赖了Zookeeper,所以我们在bin和config目录中除了看到Kafka相关的内容之外,还有Zookeeper相关的内容。其中bin目录中存放了Kafka和Zookeeper的命令行工具,bin根目录下存放的是适用于Linux/UNIX的shell,而bin/windows下存放的则是适用于Windows下单bat。我们可以根据实际的系统来设置环境变量,以方便后续的使用和操作。而config目录,则用来存放关于Kafka与Zookeeper的配置信息。
下面我们来尝试启动Zookeeper和Kafka来进行消息的生产和消费。示例中所有的命令均以配置了Kafka的环境变量为例。
▪️启动了Zookeeper:执行命令zookeeper-server-start config/zookeeper.properties,该命令需要指定ZooKeeper的配置文件位置才能正确启动,Kafka的压缩包中包含了其默认配置,开发与测试环境基本不需要修改,所以这里不做详细介绍,对于线上的调优需求,请读者自行查看官方文档进行操作。
启动zookeeper-server,进入kafka解压包:
bin/zookeeper-server-start.sh config/zookeeper.properties
从控制台信息中我们可以看到,Zookeeper从指定的config/zookeeper.properties配置文件中读取信息并绑定2181端口启动服务。有时候启动失败,可查看一下端口是否被占用,可以kill掉占用进程或通过修改config/zookeeper.properties配置文件中的clientport内容以绑定其他端口号来启动Zookeeper。
▪️启动Kafka:执行命令kafka-server-start config/server.properties,该命令也需要指定Kafka配置文件的正确位置,如上命令中指向了解压目录包含的默认配置。若在测试时,使用外部集中环境的Zookeeper的话,我们可以在该配置文件中zookeeper.connect参数来设置Zookeeper的地址和端口,它默认会连接本地2181端口的Zookeeper;如果需要设置多个Zookeeper节点,可以为这个参数配置多个Zookeeper地址,并用逗号隔开。比如zookeeper.connect=127.0.01:3000,127.0.0.1:3001,127.0.0.1:3002。除此之外,该配置文件中还提供了关于服务端连接、日志等配置参数,具体的线上配置可根据实际情况进行调整。
启动kafka-server,进入Kafka解压包:
bin/kafka-server-start.sh config/server.properties
▪️创建Topic:执行命令kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test。通过该命令,创建了一个名为test的Topic,该Topic包含一个分区和一个Replica。在创建完成后,可以使用kafka-topics --list --zookeeper localhost:2181命令来查看当前的Topic。
创建一个topic,进入kafka/bin;
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
./kafka-topics.sh --list --zookeeper localhost:2181
另外,如果不使用kafka-topics命令来手工创建,直接使用下面的内容进行消息创建时也会自动创建Topic。
▪️创建消息生产者:执行命令kafka-console-producer --broker-list localhost:9092 --topic test。kafka-console-producer命令可以启动Kafka基于命令行的消息生产客户端,启动后可以直接在控制台中输入消息来发送,由于此时并没有消费者,所以这些输入的消息都会被阻塞在名为test的Topics中,直到消费者将其消费掉。
创建一个消息生产者,进入kafka/bin:
./kafka-console-producer.sh --broker-list localhost:9092 --topic test
▪️创建消费者:执行命令kafka-console-consumer --zookeeper localhost:2181 --topic test --from-beginning。kafka-console-consumer命令启动的是Kafka基于命令行的消息消费客户端,启动之后,马上可以在控制台中看到输出了之前我们在消息生产客户端中发送的消息。我们可以再次打开之前的消息生产客户端来发送消息,并观察消费者这边对消息的输出来体验Kafka对消息的基础处理。
创建一个消息消费者,进入kafka/bin:
./kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
在介绍Kafka之前,我们已经通过引入spring-cloud-starter-bus-ampq模块,完成了使用RabbitMQ来实现消息总线。若我们要使用Kafka来实现消息总线时,只需把spring-cloud-starter-bus-amqp替换成spring-cloud-starter-bus-kafka模块,在pom.xml的dependency节点中进行修改,具体如下:
如果在启动Kafka时均采用了默认配置,那么我们不需要其他配置就能在本地实现从RabbitMQ到Kafka的切换,可以尝试把刚刚搭建的Zookeeper、Kafka启动起来,并将修改为spring-cloud-starter-bus-kafka模块的config-server和config-client启动起来。
从控制台的输出内容我们可以看到,config-server连接到了Kafka中,并使用了名为springCloudBus的Topic。
此时,我们可以使用kafka-topics --list --zookeeper localhost:2181命令来查看当前的Kafka中的Topic。若已成功启动了config-server并配置正确,可以在Kafka中看到一句多了一个名为springCloudBus的Topic。
./kafka-topics.sh --list --zookeeper localhost:2181
我们再启动配置了spring-cloud-starter-bus-kafka模块的config-client,可以看到控制台中输出了如下内容:
可以看到,config-client启动时输出了类似的内容,它们都订阅了名为springCloudBus的Topic。从这里我们也可以知道,在消息总线上的节点,从结构上来说,不论是config-server还是config-client,它们都是对等的。
在启动了config-server和config-client之后,为了更明显地观察消息总线刷新配置的效果,我们可以在本地启动多个不同端口的config-client。此时,我们的config-server以及多个config-client都已经连接到了由Kafka实现的消息总线上。我们可以先访问各个config-client上的/from请求,查看它获取到的配置内容。然后,修改Git中对应的参数内容,再访问各个config-client上的/from请求,可以看到配置内容并没有改变。最后,我们向config-server发送POST请求:/bus/refresh,此时再去访问各个config-client上的/from请求,就能获得最新的配置信息,各客户端上配置都已经加载为最新的Git配置内容。
从config-client的控制台中,我们可以看到如下内容:
操作补充:
RefreshListener监听类记录了收到远程刷新请求,并刷新了from属性的日志,在下一节中,我们将根据消息内容与日志输出信息作为线索来探索Spring Cloud Bus的工作机制。
在上面的例子中,由于Kafka、Zookeeper均运行于本地,在自动化配置的支持下,我们没有在测试程序中通过配置信息来指定Kafka、Zookeeper的配置信息,就完成了本地消息总线的试验。但是在实际应用中,Kafka和Zookeeper一般都会独立部署,所以在应用中需要为Kafka和Zookeeper配置一些连接信息等。Kafka的整合与RabbitMQ不同,在Spring Boot 1.3.7中并没有直接提供Starter模块,而是采用了Spring Cloud Stream的Kafka模块,所以对于Kafka的配置均采用了spring.cloud.stream.kafka前缀,具体的配置内容我们可以参考第十章的“绑定器配置”一节中关于Kafka配置的内容。一些属性如下:
▪️spring.cloud.stream.kafka.binder.brokers:localhost,Kafka的服务端列表。
▪️spring.cloud.stream.kafka.binder.defaultBrokerPort:9092,Kafka服务器端的默认端口,当brokers属性中没有配置端口信息,就会使用默认端口。
▪️spring.cloud.stream.kafka.binder.zkNodes:localhost,Kafka服务端连接的Zookeeper节点列表。
▪️spring.cloud.stream.kafka.binder.defaultzkport:2181,当zkNodes属性中没有配置端口信息时,就会使用这个默认端口。
在整合Kafka实现了消息总线之后,我们不妨继续使用Kafka提供的控制台消费者来看看,当执行/bus/refresh时,消息消费者都获得了什么。通过前文我们从控制台中获得的信息可以知道,Spring Cloud Bus使用了名为 springCloudBus的Topic,所以我们可以使用命令kafka-console-consumer --zookeeper localhost:2181 --topic springCloudBus,启动对springCloudBus的消费者控制台来进行观察。
启动消费者控制台之后,我们向config-server发送POST请求:/bus/refresh,此时在控制台中可以看到类似如下的内容:
./kafka-console-consumer.sh --zookeeper localhost:2181 --topic springCloudBus
下面,我们来详细理解消息中的信息内容。
▪️type:消息的事件类型。在上面的李忠忠,包含了RefreshRemoteApplicationEvent和AckRemoteApplicationEvent。其中RefreshRemoteApplicationEvent事件就是我们用来刷新配置的事件,而AckRemoteApplicationEvent是响应消息已经正确接收的告知消息事件。
▪️timestamp:消息的时间戳。
▪️originService:消息的来源服务实例。
▪️destinationService:消息的目标服务实例。上面示例中的*:**代表了总线上的所有服务实例。如果需要通过destination参数来定位具体要刷新的应用实例即可,比如发起/bus/refresh?destinationService=didispace请求,就可以得到如下的刷新事件消息,其中destinationService为didispace:**,表示总线上所有didispace服务的实例。
▪️id:消息的唯一标识。
上面的消息内容是RefreshRemoteApplicationEvent和AckRemoteApplicationEvent类型共有的,下面几个属性是AckRemoteApplicationEvent所特有的,分别表示如下含义:
▪️ackId:Ack消息对应的消息来源。我们可以看到第一条AckRemoteApplicationEvent的ackId对应了RefreshRemoteApplicationEvent的id,说明这条Ack是告知该RefreshRemoteApplicationEvent事件的消息已经被收到。
▪️ackDestinationService:Ack消息的目标服务实例。可以看到这里使用的是*:**,所以消息总线上所有的实例都会收到该Ack消息。
▪️event:Ack消息的来源事件。可以看到上例中的两个Ack均来源于刷新配置的RefreshRemoteApplicationEvent事件,我们在测试的时候由于启动了两个config-client,所以有两个实例接收到了配置刷新事件,同时它们都会返回一个Ack消息。由于ackDestinationService为*:**,所以两个config-client都会收到对RefreshRemoteApplicationEvent事件的Ack消息。
通过上面的分析,我们已经得到了两个非常重要的线索RefeshRemoteApplicationEvent和AckRemoteApplicationEvent。我们不妨顺着这两个事件类来详细看看Spring Cloud Bus的源码,以帮助我们理解它的运行机制。
顺着RefeshRemoteApplicationEvent和AckRemoteApplicationEvent,我们可以整理出如下的事件关系图。
可以看到,其中RefreshRemoteApplicationEvent和AckRemoteApplicationEvent这些我们已经解除过的事件都继承了RemoteApplicationEvent抽象类,而RemoteApplicationEvent继承自Spring Framework的ApplicationEvent,可以断定,Spring Cloud Bus也采用了Spring的事件驱动模型。
如果读者对Spring的事件驱动模型已经非常了解,那么可以跳过这一小节,直接看后面的分析。如果你还不清楚它的原理,建议通过本小节的内容来理解其基本原理,以帮组阅读和理解后续的源码分析内容。
Spring的事件驱动模型中包含了三个基本概念:事件、事件监听者和事件发布者,如下图所示。
▪️事件:Spring中定义了事件的抽象类ApplicationEvent,它继承自JDK的EventObject类。从图中我们可以看到,事件包含了两个成员变量:timestamp,该字段用于存储事件发生的时间戳,以及父类中source,该字段表示源事件对象。当我们需要自定义事件的时候,只需要继承ApplicatinEvent,比如RemoteApplicationEvent、RefershRemoteApplicationEvent等,可以在自定义的Event中增加一些事件的属性来给事件监听者处理。
▪️事件监听者:Spring中定义了事件监听者的接口ApplicationListener,它继承自JDK的EventListener接口,同时ApplicationListener接口限定了ApplicationEvent子类作为该接口中的onApplicationEvent(E event);函数的参数。所以,每一个ApplicationListener都是针对某个ApplicationEvent子类的监听和处理者。
那么,事件与监听者是如何关联起来的呢?我们看下图:
▪️事件发布者:Spring中定义了ApplicationEventPublisher和ApplicationEventMulticater两个接口用来发布事件。其中ApplicationEventPublisher接口定义了发布事件。其中ApplicationEventPublisher接口定义了发布事件的函数publishEvent(ApplicationEvent event)和publishEvent(Object event);而ApplicationEventMulticaster接口中定义了对ApplicationListener的维护操作(比如新增、移除等)以及将ApplicationEvent多路广播给可用ApplicationListener的操作。
ApplicationEventPublisher的publishEvent实现在AbstractApplicationContext中,具体如下:
可以看到,它最终会调用ApplicationEventMulticaster的multicastEvent来具体实现发布事件给监听者的操作。而ApplicationEventMulticaster在Spring的默认实现位于SimpleApplicationEventMulticaster中,具体如下:
SimpleApplicationEventMulticaster通过遍历维护的ApplicationListener集合找到对应ApplicationEvent的监听器,然后调用监听器的onApplicationEvent函数来对具体事件做出处理操作。
在对Spring的事件模型有了一定的理解之后,下面我们来详细介绍Spring Cloud Bus中的事件定义。首先,从RemoteApplicationEvent抽象类开始:
先来看看RemoteApplicationEvent类上修饰的注解。
▪️@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type"):Jackson对多态类型的处理注解,当进行序列号时,会使用子类的名字作为type属性的值,比如之前示例中的"type" : "RefreshRemoteApplicationEvent"。
▪️JsonIngoreProperties("source"):序列化的时候忽略source属性,source是ApplicationEvent的父类EventObject的属性,用来定义事件的发生源。
再来看看它的属性:originService、destinationService、id,这些内容都可以在RemoteApplicationEvent的子类事件消息中找到,比如:
下面,我们再来分别看看RemoteApplicationEvent的几个具体实现的具体类。
▪️RefreshRemoteApplicationEvent事件类:该事件用于远程刷新应用的配置信息。它的实现非常简单,只是继承了RemoteApplicationEvent,并没有增加其他内容。从之前的示例中我们也能看到,消息中的内容与RemoteApplicationEvent中包含的属性完全一致。
▪️AckRemoteApplicationEvent事件类:该事件用于告知某个事件消息已经呗接收,通过该消息我们可以监控各个事件消息的响应。从其成员属性中,我们可以找到之前示例中所总结的,比RefreshRemoteApplicationEvent事件的消息多出的几个属性:ackId、ackDestinationService以及event。
▪️EnvironmentChangeRemoteApplicationEvent事件类:该事件用于动态更新消息总线上每个节点的Spring环境属性。可以看到,该类中定义了一个Map类型的成员变量,而接收消息的节点就是根据该Map对象中的属性来覆盖本地的Spring环境属性。
▪️SentApplicationEvent事件类:细心的读者可能已经发现,该类的结构和内容与RemoteApplicationEvent非常相似,不同的是:该类不是抽象类,并且多一个成员属性Class type。SentApplicationEvent事件比较特殊,它主要用于发送信号来表示一个远程的事件已经在系统中被发送到某些地方了,从它的继承关系中,我们可以知道它本身并不是一个远程的事件(不是继承自RemoteApplicationEvent),所以它不会被发送到消息总线上去,而是在本地产生(通常是由于响应了某个远程的事件)。由于该事件的id属性能够匹配消费者AckRemoteApplication消息中的ackId,所以应用程序可以通过监听这个事件来监控远程事件消息的消费事件。
在了解了Spring Cloud Bus中的事件类之后,我们来看看另外一个重要元素:事件监听器。通过整理源码,可以得到下面的类图关系。
其中,RefreshLinstener和EnvironmentChangeListener都继承了Spring事件模型中的监听器接口ApplicationListener。我们先来看看RefreshListener:
从范型中我们可以看到该监听器就是针对我们之前所介绍的RefreshRemoteApplicationEvent事件的,其中onApplicationEvent函数中调用了ContextRefresher中的refresh()函数进行配置属性的刷新。
再来看看EnvironmentChangeListener监听器。
它是针对EnvironmentChangeRemoteApplication事件的监听类,在处理类中,可以看到它从EnvironmentChangeRemoteApplicationEvent中获取了之前提到的事件中定义的Map对象,然后通过遍历来更新EnvironmentManager中的属性内容。
除了上面介绍的RefreshListener和EnvironmentChangeListener监听器外,还有一个与它们都有点不同的TraceListener监听器。
从之前整理的类图和源码中,我们都可以看到该监听器并没有实现ApplicationListener接口,但可以看到这里使用了@EventListener注解。该注解是从Spring 4.2开始提供的新功能,通过它可以自动地将函数注册为一个ApplicationListener的实现。所以在该类中,实际上等价于实现了两个监听器,一个监听AckRemoteApplicationEvent事件,一个监听SentApplicationEvent事件。
在这两个监听处理函数中调用了类似的方法:this.repository.add(getReceivedTrace(event));其中TraceRepositiry是对Trace跟踪信息的操作接口,而它的默认实现是spring-boot-actuatr模块的InMemoryTraceRepository,具体如下
可以看到,默认的Trace跟踪信息存储并没有用到特别的数据库或消息系统,而是采用了内存存储的方式。如上代码所示,通过LinkedList集合和capacity属性的定义,在add(Map map)函数中进行循环存储,所以默认的Trace跟踪实现只能存储和查询最近的100条跟踪信息。
那么跟踪事件都记录了那些内容呢?我们继续看TraceListener中getSentTrace和getReceivedTrace的具体实现:
可以看到,这两个函数会收集关于发送和接收到的Ack事件信息,并且两个函数获得的内容就是事件定义相关的一些属性,看到这里搭建是否感觉似曾相识?是的,这些信息与之前我们通过Kafka的控制台工具获取的消息内容非常类似。既然Spring Cloud Bus已经只需在配置文件中将下面的属性设置为true即可:
通过请求配置主机的/trace接口,比如http://localhost:8882/trace,可以得到如下信息,
注意:要先调用http://localhost:8888/bus/refresh;
与我们分析的内容一样,该请求返回了最近的Send和Ack消息内容。
如果希望针对AckRemoteApplicationEvent或是SentApplicationEvent做一些特殊处理,我们也可以通过@EventListener注解在应用程序中编写自己的处理逻辑,或者重写TraceRepository来改造跟踪的存储等。
原则上每一个消息总线上的应用都可以用来跟踪Ack消息,但是大多数情况下我们把这个任务交给更核心的服务(比如特定的监控服务),这样在该服务中我们就能在Ack消息中实现更复杂的逻辑进行预警和善后工作。
通过上面的分析,我们已经了解了Spring Cloud Bus中事件以及监听器的定义,下面来看看这些事件是如何发布给监听器进行处理的。
在org.springframework.cloud.bus包下,我们可以找到关于Spring Cloud Bus启动时加载的一些基础类和接口,包括自动化配置类BusAutoConfiguration、属性定义类BusProperties等。我们可以从Spring Cloud Bus的自动化配置类中看看它在启动的时候都加载了什么内容:
我们先来看看在该自动化配置类中,都定义了哪些成员。
▪️MessageChannel cloudBusOutboundChannel:该接口定义了发送消息的抽象方法。
▪️ServiceMatcher serviceMatcher:该对象中提供了下面两个重要函数,用来判断事件的来源服务是否为自己,以及判断目标是否为自己,以此作为依据是否要响应消息进行事件的处理。
▪️ChannelBindingServiceProperties bindings:定义了消息服务的绑定属性。
▪️BusProperties bus:该对象定义了Spring Cloud Bus的属性,具体如下所示。
从中可以看到,Spring Cloud Bus的属性前缀使用了spring.cloud.bus。destination和enabled属性分别定义了默认的队列(Queue)或主题(Topic)是否连接到消息总线,所以我们可以通过spring.cloud.bus.destination来修改消息总线使用的队列或主题名称,以及使用spring.cloud.bus.enabled属性来设置应用是否要连接到消息总线上。
另外,在该配置类中为Env、Refresh、Ack、Trace4种已经实现的事件分别创建了配置对象,这些配置都是BusProperties的内部类。从下面的源码中,我们可以看到对于这4种事件,Env、Refresh、Ack均是默认开启的,只有Trace事件需要通过修改配置来开启,就如之前我们介绍的“事件跟踪”的时候配置spring.cloud.bus.trace.enabled=true属性那样。
▪️ApplicationEventPublisher:Spring事件模型中用来发布事件的接口,也就是我们之前介绍的事件以及监听的桥梁。
除了定义的这些成员变量之外,还能看到这里定义了两个监听方法acceptLocal和acceptRemote。
其中,acceptLocal方法如下所示,它通过@EventListener(classes =RemoteApplicationEvent.class)注解修饰。之前已经介绍过该注解,可以将该函数理解为RemoteApplicationEvent事件的监听器,但是在其实现中并非所有的RemoteApplicationEvent事件都会处理。根据if中的条件,可以看到在该监听处理后续的处理,而后续的处理就是通过消息管道将该事件发送出去。所以,该监听器的功能就是监听本地事件来进行消息的发送。
再来看看acceptRemote方法。该方法中使用了@StreamListener注解修饰,该注解的作用是该函数注册为消息代理为消息代理上数据流的事件监听器,注解中的属性值SpringCloudBusClient.INPUT指定了监听的通道名。同时,回头看该函数所在类的定义,使用了@EnableBinding注解,该注解用来实现与消息代理的连接,注解中的属性值SpringCloudBusClient.class声明了输入和输出通道的定义(这部分内容源自Spring Cloud Stream,在下一章中,我们会对这些内容做详细介绍,这里我们只需理解它用来绑定消息代理的输入与输出,以实现向消息总线上发送和接收消息即可)。
通过上面的分析,我们已经可以知道Spring Cloud Bus通过acceptRemote方法来监听消息代理的输入通道,并根据事件类型和配置内容来确定是否要发布事件给我们之前分析的几个事件监听器来对事件做具体的处理;而acceptLocal方法用来监听本地的事件,针对事件来源是自己,并且事件类型不是AckRemoteApplicationEvent的内容通过消息代理的输入通道发送到总线上去。
在介绍了Spring Cloud Bus中实现的事件模型之后,我们已经知道每个节点是如何响应消息总线上的事件了。那么这些发送到消息总线上用来触发各个节点的事件处理的动作是如何实现的呢?回想一下之前在实现配置属性刷新时,我们在修改了Git仓库上的配置信息之后,往总线上的某个节点发送一个请求/bus/refresh来触发总线上的所有节点进行配置刷新;我们在连接到消息总线的应用启动时,也能在控制台中看到类似下面的输出:
从上面的日志信息中可以看到,在org.springframework.cloud.bus.endpoint包下的RefreshBusEndpoint和EnvironmentBusEndpoint分别创建了两个控制端点:/bus/refresh和/bus/env。通过整理org.springframework.cloud.bus.endpoint包下的内容,我们可以得到如下类图:
从图中可以发现,Spring Cloud Bus中的Endpoint也是通过spring-boot-actuator模块来实现的。下面,简单介绍一下spring-boot-actuator模块中的几个重要元素。
▪️Endpoint:该接口中定义了监控端点需要暴露的一些有用信息,比如,id、是否开启标识、是否开启敏感信息标识等。
▪️AbstractEndpoint:该抽象类是对Endpoint的基础实现,在该抽象类中引入了Environment接口对象,从而对接口暴露信息的控制可以通过配置文件的方式来控制。
▪️MvcEndpoint接口:该接口定义了Endpoint接口在MVC层的策略。在这里可以通过使用Spring MVC的@RequestMapping注解来定义端点暴露的接口地址。
下面我们来看看Spring Cloud Bus是如何扩展Endpoint的。
▪️BusEndpoint:该类继承自AbstractEndpoint。从类上的注解@ConfigurationProperties配置可以知道,Spring Cloud Bus实现的端点配置属性需要以endpoints.bus开头,通过该类的构造函数(配合AbstractEndpoint中的构造函数),我们可以知道默认id为bus,并且端点默认敏感标识为true:
▪️AbstractBuspoint类:是实现Spring Cloud Bus中端点的重要基类,它实现了MvcEndpoint接口来暴露MVC层的接口,同时关联了BusEndpoint对象。通过下面的源码,我们看到,getPath、isSensitive和getEndpointType都是委托给BusEndpoint来获取的,从而实现通过Environment配置接口。
默认实现的几个端点都继承自AbstractBusEndpoint类来实现MVC层接口的暴露和配置,下面我们来看看具体的两个实现端点。
▪️实现配置刷新的端点RefreshBusEndpoint类。通过下面的源码,我们可以看到,在该类中定义了refresh的POST请求,由于在BusEndpoint默认构造去时id为bus,而AbstractBusEndpoint中refresh请求的完整路径为/bus/refresh。同时,该请求通过@RequestParam注解还定义了一个可选的参数destination,正如在之前的示例中介绍的,该参数用于指定刷新的服务实例。在请求处理部分直接调用了父类中的publish函数将RefreshRemoteApplicationEvent事件发布出来,实现在总线上发布消息的功能。
▪️EnvironmentBusEndpoint的实现与RefreshBusEndpoint类似,通过暴露/bus/env的POST请求接口,并提供了Map类型的params参数设定需要更新的配置信息,以及同refresh接口一样的destination参数指定需要更新的服务实例,来触发环境参数更新的消息总线控制。
由于目前版本的Spring Cloud Bus只实现了RabbitMQ和Kafka的封装,虽然大部分情况下,这两个产品的特性已经涵盖我们大部分的业务场景,但是由于一些特殊需求或是遗留系统等其他因素,有些团队不得不使用其他的消息代理,这个时候我们就需要扩展消息代理的支持。实际上,通过之前对源码的分析,我们可以看到,Spring Cloud Bus在绑定具体消息代理的输入与输出通道时,均使用了抽象接口的方式,所以真正的实现来自于spring-cloud-starter-bus-amqp和spring-cloud-starter-bus-kafka的依赖。
我们可以查看spring-cloud-starter-bus-amqp和spring-cloud-starter-bus-kafka的依赖,可以看到它们分别依赖了spring-cloud-starter-stream-rabbit和spring-cloud-starter-stream-kafka,真正实现与这些消息代理进行交互操作的是Spring Cloud Stream。所以,当我们要在其他消息代理上使用Spring Cloud Bus消息总线时,只需要去实现一套指定代理的绑定器即可。
如果需要給我修改意见的发送邮箱:erghjmncq6643981@163.com
本博客的代码示例已上传GitHub:分布式配置中心
资料参考:《Spring Cloud 微服务实战》
转发博客,请注明,谢谢。