Spring Cloud Bus + kafka实现配置中心配置动态更新
Spring Cloud Bus与Spring Cloud Config结合kafka实现消息总线的功能.
架构
上面的架构图中,服务的配置更新需要通过向具体服务中的某个实例发送请求,再触发对整个服务集群的配置更新。虽然能实现功能,但是这样的结果是,我们指定的应用实例就会不同于集群中的其他应用实例,这样会增加集群内部的复杂度,不利于将来的运维工作
将上面的架构做了一些调整,如下图所示:
- 在Config Server中也引入Spring Cloud Bus,将配置服务端也加入到消息总线中来。
-
/bus/refresh
请求不在发送到具体服务实例上,而是发送给Config Server,并通过destination
参数来指定需要更新配置的服务或实例。
通过上面的改动,我们的服务实例就不需要再承担触发配置更新的职责。同时,对于Git的触发等配置都只需要针对Config Server即可,从而简化了集群上的一些维护工作。
Kafka
基于消息发布/订阅模式实现的消息系统,其主要设计目标如下:
- 消息持久化:以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间复杂度的访问性能。
- 高吞吐:在廉价的商用机器上也能支持单机每秒100K条以上的吞吐量
- 分布式:支持消息分区以及分布式消费,并保证分区内的消息顺序
- 跨平台:支持不同技术平台的客户端(如:Java、PHP、Python等)
- 实时性:支持实时数据处理和离线数据处理
- 伸缩性:支持水平扩展
基本概念:
- Broker:Kafka集群包含一个或多个服务器,这些服务器被称为Broker。
- Topic:逻辑上同Rabbit的Queue队列相似,每条发布到Kafka集群的消息都必须有一个Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个Broker上,但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)
- Partition:Partition是物理概念上的分区,为了提供系统吞吐率,在物理上每个Topic会分成一个或多个Partition,每个Partition对应一个文件夹(存储对应分区的消息内容和索引文件)。
- Producer:消息生产者,负责生产消息并发送到Kafka Broker。
- Consumer:消息消费者,向Kafka Broker读取消息并处理的客户端。
- Consumer Group:每个Consumer属于一个特定的组(可为每个Consumer指定属于一个组,若不指定则属于默认组),组可以用来实现一条消息被组内多个成员消费等功能。
搭建Kafka本地环境
官网 http://kafka.apache.org/downloads.html
目录结构
kafka
+-bin
+-windows
+-config
+-libs
+-logs
+-site-docs
Kafka的设计中依赖了ZooKeeper,所以我们可以在bin
和config
目录中除了看到Kafka相关的内容之外,还有ZooKeeper相关的内容。其中bin
目录存放了Kafka和ZooKeeper的命令行工具,bin
根目录下是适用于Linux/Unix的shell,而bin/windows
下的则是适用于windows下的bat。我们可以根据实际的系统来设置环境变量,以方便后续的使用和操作。而在config
目录中,则是用来存放了关于Kafka与ZooKeeper的配置信息。
启动kafka
--启动zookeeper
bin\windows\zookeeper-server-start.bat config\zookeeper.properties
-- 启动 kaffka
bin\windows\kafka-server-start.bat config/server.properties
-- 创建 Topic (可理解为创建消息队列)
bin\windows\kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
-- 生成消息
bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test
-- 消费消息
bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning
config server
pom
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
<version>2.0.1.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-bus</artifactId>
<version>2.0.0.RELEASE</version>
</dependency>
spring-cloud-starter-bus-kafka、spring-cloud-starter-stream-kafka、spring-cloud-bus 这个几个依赖配置是实现配置中心配置动态刷新需要的,其他参考config的pom。
bootstrap.yml
server:
port: 8769
spring:
application:
name: config-server
cloud:
config:
server:
git:
uri: http://admin@192.168.1.18:10010/r/vi.git
username: vi
password: hz
default-label: master
bus:
refresh:
enabled: true
kafka:
bootstrap-servers: 192.168.1.172:9092 #配置 kafka 服务器的地址和端口
consumer:
group-id: SpringCloud-bus
eureka:
client:
serviceUrl:
defaultZone: http://localhost:7001/eureka/,http://localhost:7002/eureka/
# native:
# search-locations: classpath:/shared
healthcheck:
enabled: true # 开启健康检查(依赖spring-boot-starter-actuator)
instance:
instance-id: ${spring.cloud.client.ip-address}:${server.port}
prefer-ip-address: true #以IP地址注册到服务中心
lease-renewal-interval-in-seconds: 5 # 心跳时间,即服务续约间隔时间(缺省为30s)
lease-expiration-duration-in-seconds: 15 # 发呆时间,即服务续约到期时间(缺省为90s)
logging.level.org.springframework.boot.autoconfigure: ERROR
management:
endpoints:
web:
exposure:
include: '*' #refresh
这里比config新增的配置
bus:
refresh:
enabled: true
kafka:
bootstrap-servers: 192.168.1.172:9092 #配置 kafka 服务器的地址和端口
consumer:
group-id: SpringCloud-bus
management:
endpoints:
web:
exposure:
include: '*' #refresh
注意:
如果是properties 的话 ‘’ 不用 加 ‘ ’ 单引号*
yml--include: '*' http://localhost:8769/actuator/refresh 刷新无效
yml--include: '*' http://localhost:8769/actuator/bus-refresh 刷新所有微服务
yml--include: 'refresh' http://localhost:8769/actuator/refresh 刷新无效
yml--include: 'refresh' http://localhost:8769/actuator/bus-refresh 不能访问
config client
pom
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
<version>2.0.1.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-bus</artifactId>
<version>2.0.0.RELEASE</version>
</dependency>
spring-cloud-starter-bus-kafka、spring-cloud-starter-stream-kafka、spring-cloud-bus 这个几个依赖配置是实现配置中心配置动态刷新需要的,其他参考config的pom。
bootstrap.yml
spring:
application:
name: config-client
cloud:
config:
name: application # 对应 {application} 部分
profile: dis-dev #指定的环境
label: master #指定分支
discovery:
enabled: true
service-id: config-server
bus:
refresh:
enabled: true
kafka:
bootstrap-servers: 192.168.1.172:9092
management:
endpoints:
web:
exposure:
include: '*'
eureka:
client:
serviceUrl:
defaultZone: http://localhost:7001/eureka/,http://localhost:7002/eureka/
instance:
instance-id: ${spring.cloud.client.ip-address}:${server.port}
prefer-ip-address: true #以IP地址注册到服务中心
# ip-address: #强制指定IP地址,默认会获取本机的IP地址
lease-renewal-interval-in-seconds: 5 # 心跳时间,即服务续约间隔时间(缺省为30s)
lease-expiration-duration-in-seconds: 15 # 发呆时间,即服务续约到期时间(缺省为90s)
logging:
config: classpath:log4j2-dev.xml
server:
port: 6020
这里比config新增的配置
bus:
refresh:
enabled: true
kafka:
bootstrap-servers: 192.168.1.172:9092
management:
endpoints:
web:
exposure:
include: '*'
注意:
- yml--include: '*' http://localhost:6020/actuator/refresh client1刷新,client2不刷新
- yml--include: '*' http://localhost:6020/actuator/bus-refresh 刷新所有微服务
- yml--include: 'refresh' http://localhost:6020/actuator/refresh client1刷新,client2不刷新
- yml--include: 'refresh' http://localhost:6020/actuator/bus-refresh 不能访问