需求简介
spring cloud 本身实现了消息总线机制,机制如图1。spring cloud 本身实现了变量修改/bus/env
和/bus/refresh
两个接口,我们需要扩展一个自己的刷新缓存的接口,来应对业务需求。业务代码是看源码修改出来的,并不代表理解都对。
实现
介绍
一般来说微服务一上需要实现接收用户信息的接口,并且将消息传递给消息总线。其他微服务实现接收消息总线分发信息的服务。为了简化业务所有的微服务都具有接收用户指令和接收消息总线指令的能力。
AbstractBusEndpoint Bus接口实现
这个接口来实现对用户的访问实现,需要继承AbstractBusEndpoint,我们提供了/bus/removecache
接口来供用户访问。
@ManagedResource
public class GuavaCacheBusEndpoint extends AbstractBusEndpoint {
public GuavaCacheBusEndpoint(ApplicationEventPublisher context, String id, BusEndpoint delegate) {
super(context, id, delegate);
}
@RequestMapping(
value = {"removecache"},
method = {RequestMethod.POST}
)
@ResponseBody
@ManagedOperation
public void removecache(@RequestParam Map<String, String> params, @RequestParam(value = "destination",required = false) String destination) {
this.publish(new GuavaCacheChangeRemoteApplicationEvent(this, this.getInstanceId(), destination, params));
}
}
RemoteApplicationEvent 被传输的消息(GuavaCacheChangeRemoteApplicationEvent)
这个类需要继承RemoteApplicationEvent
牵扯到消息总线序列化等信息,我们可以在其中增加Map来实现带参数传递
@JsonTypeInfo(
use = JsonTypeInfo.Id.NAME,
property = "type"
)
@JsonIgnoreProperties({"source"})
public class GuavaCacheChangeRemoteApplicationEvent extends RemoteApplicationEvent {
private final Map<String, String> values;
private GuavaCacheChangeRemoteApplicationEvent() {
this.values = null;
}
public GuavaCacheChangeRemoteApplicationEvent(Object source, String originService, String destinationService, Map<String, String> values) {
super(source, originService, destinationService);
this.values = values;
}
public Map<String, String> getValues(){
return values;
}
}
BusAutoConfiguration Bus消息自动注册
实现spring config 配置解析和加载我们的自定义配置,我们定义了spring.cloud.bus.guava.enabled
和endpoints.spring.cloud.bus.guava.enabled
来启用消息总线消息监听和用户访问消息监听。
@Configuration
public class BusGuavaAutoConfiguration extends BusAutoConfiguration {
public BusGuavaAutoConfiguration(){}
@Configuration
protected static class BusGuavaConfiguration {
protected BusGuavaConfiguration() {}
@Bean
@ConditionalOnProperty(
value = {"spring.cloud.bus.guava.enabled"},
matchIfMissing = true
)
public GuavaCacheChangeListener guavaCacheChangeListener() {
return new GuavaCacheChangeListener();
}
@Configuration
@ConditionalOnClass({Endpoint.class})
@ConditionalOnProperty(
value = {"endpoints.spring.cloud.bus.guava.enabled"},
matchIfMissing = true
)
protected static class GuavaBusEndpointConfiguration {
protected GuavaBusEndpointConfiguration() {}
@Bean
public GuavaCacheBusEndpoint environmentBusEndpoint(ApplicationContext context, BusEndpoint busEndpoint) {
return new GuavaCacheBusEndpoint(context, context.getId(), busEndpoint);
}
}
}
}
ApplicationListener实现
ApplicationListener来实现监听消息总线的监听器,消息总线返回后会执行这里。
public class GuavaCacheChangeListener implements ApplicationListener<GuavaCacheChangeRemoteApplicationEvent> {
@Override
public void onApplicationEvent(GuavaCacheChangeRemoteApplicationEvent guavaCacheChangeRemoteApplicationEvent) {
Map<String, String> values = guavaCacheChangeRemoteApplicationEvent.getValues();
//TODO
}
}
转换器配置
RemoteApplicationEvent需要被spring cloud 内置的BusJacksonMessageConverter
转换器扫描到才可以实现转换,在App启动类中增加注解@RemoteApplicationEventScan(basePackages = "你的package地址")