公司使用Disconf来管理分布式配置,Disconf使用zookeeper来提供配置存储,同时预留了通知接口以接收配置变更。
1.配置Disconf更新通知入口
Disconf预留有IDisconfUpdatePipeline
接口,实现该接口可以在Disconf控制台保存的配置文件发生变更时实时得到消息, 代码如下:
@Slf4j
@Service
@Scope("singleton")
public class CustomDisconfReloadUpdate implements IDisconfUpdatePipeline {
/**
* Store changed configuration file's path
*/
private static final ThreadLocal<String> newConfigFilePath = new ThreadLocal<>();
@Autowired
ApplicationEventPublisher publisher;
@Autowired
RouteLocator routeLocator;
/**
* 发布zuul refresh事件
*/
public void refreshRoute() {
RoutesRefreshedEvent routesRefreshedEvent = new RoutesRefreshedEvent(routeLocator);
publisher.publishEvent(routesRefreshedEvent);
}
@Override
public void reloadDisconfFile(String key, String filePath) throws Exception {
log.info("------Got reload event from disconf------");
log.info("Change key: {}, filePath: {}", key, filePath);
newConfigFilePath.set(filePath);
refreshRoute();
}
@Override
public void reloadDisconfItem(String key, Object content) throws Exception {
log.info("------Got reload event from disconf with item------");
log.info("Change key: {}, content: {}", key, content);
}
/**
* Checkout configFilePath and remove the ThreadLocal value content
* @return
*/
public static String getConfigFilePath() {
String path = newConfigFilePath.get();
newConfigFilePath.remove();
return path;
}
}
- 1.使用
ThreadLocal
来保存变更了的配置文件在本地的路径以供线程后续执行时读取使用 - 2.使用
RoutesRefreshedEvent
事件机制来刷新Zuul的路由表
2.自定义RouteLocator实现路由表刷新功能
Zuul默认使用的是SimpleRouteLocator
作为路由发现入口,然而并不支持动态刷新;Zuul预留了RefreshableRouteLocator
接口以支持动态路由表刷新,以下是自定义类实现路由刷新功能:
@Slf4j
public class CustomZuulRouteLocator extends SimpleRouteLocator implements RefreshableRouteLocator {
private ZuulProperties properties;
private SpringClientFactory springClientFactory;
public CustomZuulRouteLocator(String servletPath, ZuulProperties properties, SpringClientFactory springClientFactory) {
super(servletPath, properties);
this.properties = properties;
this.springClientFactory = springClientFactory;
}
@Override
public void refresh() {
doRefresh();
}
@Override
protected Map<String, ZuulProperties.ZuulRoute> locateRoutes() {
LinkedHashMap<String, ZuulProperties.ZuulRoute> routesMap = new LinkedHashMap<>();
// This will load the old routes which exist in cache
routesMap.putAll(super.locateRoutes());
try {
// If server can load routes from file which means the file changed, using the new config routes
Map<String, ZuulProperties.ZuulRoute> newRouteMap = loadRoutesFromDisconf();
if(newRouteMap.size() > 0) {
log.info("New config services list: {}", Arrays.toString(newRouteMap.keySet().toArray()));
routesMap.clear();
routesMap.putAll(newRouteMap);
}
} catch (Exception e) {
// For every exception, do not break the gateway working
log.error(e.getMessage(), e);
}
LinkedHashMap<String, ZuulProperties.ZuulRoute> values = new LinkedHashMap<>();
for (Map.Entry<String, ZuulProperties.ZuulRoute> entry : routesMap.entrySet()) {
String path = entry.getKey();
// Prepend with slash if not already present.
if (!path.startsWith("/")) {
path = "/" + path;
}
if (StringUtils.hasText(this.properties.getPrefix())) {
path = this.properties.getPrefix() + path;
if (!path.startsWith("/")) {
path = "/" + path;
}
}
values.put(path, entry.getValue());
}
return values;
}
/**
* Read the new config file and reload new route and service config
* @return
*/
private Map<String, ZuulProperties.ZuulRoute> loadRoutesFromDisconf() {
log.info("----load configuration----");
Map<String, ZuulProperties.ZuulRoute> latestRoutes = new LinkedHashMap<>(16);
String configFilePath = CustomDisconfReloadUpdate.getConfigFilePath();
if(configFilePath != null) {
String newConfigContent = readFileContent(configFilePath);
ObjectMapper objectMapper = new ObjectMapper(new YAMLFactory());
try {
Map<String, Object> configResult = objectMapper.readValue(newConfigContent, Map.class);
// Store all serviceId with ribbon configuration
List<String> allServiceIds = new ArrayList<>();
// This Node used to deal with Zuul route configuration
if(configResult.containsKey("zuul")) {
Map<String, Object> zuulConfig = (Map<String, Object>) configResult.get("zuul");
if(zuulConfig.containsKey("routes")) {
Map<String, Object> routes = (Map<String, Object>) zuulConfig.get("routes");
if(routes != null) {
for(Map.Entry<String, Object> tempRoute : routes.entrySet()) {
String id = tempRoute.getKey();
Map<String, Object> routeDetail = (Map<String, Object>) tempRoute.getValue();
ZuulProperties.ZuulRoute zuulRoute = generateZuulRoute(id, routeDetail);
// Got list with config serviceId
if(zuulRoute.getServiceId() != null) {
allServiceIds.add(zuulRoute.getServiceId());
}
latestRoutes.put(zuulRoute.getPath(), zuulRoute);
}
}
}
}
// deal with all serviceIds and read properties from yaml configuraiton
if(allServiceIds.size() > 0) {
allServiceIds.forEach(temp -> {
// Exist this serviceId
if(configResult.containsKey(temp)) {
Map<String, Object> serviceRibbonConfig = (Map<String, Object>) configResult.get(temp);
if(serviceRibbonConfig.containsKey("ribbon")) {
Map<String, Object> ribbonConfig = (Map<String, Object>) serviceRibbonConfig.get("ribbon");
if(ribbonConfig.containsKey("listOfServers")) {
String listOfServers = (String) ribbonConfig.get("listOfServers");
String ruleClass = (String) ribbonConfig.get("NFLoadBalancerRuleClassName");
String[] serverList = listOfServers.split(",");
dealLoadBalanceConfig(temp, ruleClass, Arrays.asList(serverList));
}
}
}
});
}
} catch (IOException e) {
log.error(e.getMessage(), e);
}
}
return latestRoutes;
}
/**
* Change loadbalancer's serverList configuration
*/
private void dealLoadBalanceConfig(String serviceId, String newRuleClassName, List<String> servers) {
DynamicServerListLoadBalancer loadBalancer = (DynamicServerListLoadBalancer) springClientFactory.getLoadBalancer(serviceId);
if(loadBalancer != null) {
// 1.Reset listOfServer content if listOfServers changed
List<String> newServerList = new ArrayList<>();
servers.forEach(temp -> {
// remove the empty characters
temp = temp.trim();
newServerList.add(temp);
});
List<Server> oldServerList = loadBalancer.getServerListImpl().getUpdatedListOfServers();
// Judge the listOfServers changed or not
boolean serversChanged = false;
if(oldServerList != null) {
if(oldServerList.size() != newServerList.size()) {
serversChanged = true;
} else {
for(Server temp : oldServerList) {
if(!newServerList.contains(temp.getId())) {
serversChanged = true;
break;
}
}
}
} else {
serversChanged = true;
}
// listOfServers has changed
if(serversChanged) {
log.info("ServiceId: {} has changed listOfServers, new: {}", serviceId, Arrays.toString(servers.toArray()));
loadBalancer.setServerListImpl(new ServerList<Server>() {
@Override
public List<Server> getInitialListOfServers() {
return null;
}
@Override
public List<Server> getUpdatedListOfServers() {
List<Server> newServerConfigList = new ArrayList<>();
newServerList.forEach(temp -> {
Server server = new Server(temp);
newServerConfigList.add(server);
});
// Using the new config listOfServers
return newServerConfigList;
}
});
}
// Reset loadBalancer rule
if(loadBalancer.getRule() != null) {
String existRuleClassName = loadBalancer.getRule().getClass().getName();
if(!newRuleClassName.equals(existRuleClassName)) {
log.info("ServiceId: {}, Old rule class: {}, New rule class: {}", serviceId, existRuleClassName, newRuleClassName);
initializeLoadBalancerWithNewRule(newRuleClassName, loadBalancer);
}
} else {
initializeLoadBalancerWithNewRule(newRuleClassName, loadBalancer);
log.info("ServiceId: {}, Old rule class: Null, Need rule class: {}", serviceId, newRuleClassName);
}
}
}
/**
* Change loadBalancer's rule
* @param ruleClassName
* @param loadBalancer
*/
private void initializeLoadBalancerWithNewRule(String ruleClassName, DynamicServerListLoadBalancer loadBalancer) {
try {
// Create specify class instance
Class clazz = Class.forName(ruleClassName);
Constructor constructor = clazz.getConstructor();
IRule rule = (IRule) constructor.newInstance();
// Bind loadBalancer with this Rule
rule.setLoadBalancer(loadBalancer);
loadBalancer.setRule(rule);
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
/**
* Generate zuul route object from configuration
* @param id
* @param routeDetail
* @return
*/
private ZuulProperties.ZuulRoute generateZuulRoute(String id, Map<String, Object> routeDetail) {
ZuulProperties.ZuulRoute route = new ZuulProperties.ZuulRoute();
route.setId(id);
if(routeDetail.containsKey("path")) {
route.setPath((String) routeDetail.get("path"));
}
if(routeDetail.containsKey("serviceId")) {
route.setServiceId((String) routeDetail.get("serviceId"));
}
if(routeDetail.containsKey("url")) {
route.setUrl((String) routeDetail.get("url"));
}
if(routeDetail.containsKey("stripPrefix")) {
route.setStripPrefix((Boolean) routeDetail.get("stripPrefix"));
}
return route;
}
/**
* Read config file from local file system
* @param configFile
* @return
*/
private String readFileContent(String configFile) {
try {
FileInputStream inputStream = new FileInputStream(new File(configFile));
String content = IOUtils.toString(inputStream, "UTF-8");
return content;
} catch (IOException e) {
log.error(e.getMessage(), e);
}
return null;
}
}
- 1.在第一步中发布的refreshEvent最终会调用
refresh()
方法中的doRefresh()
方法,而doRefresh()
则在父类实现中调用locateRoutes()
来获取新的路由表 - 2.由于我司这里使用yaml配置文件,因此需要解析配置文件数据比较路由规则是否发生变更,这里使用的是
jackson-dataformat-yaml
解析 - 3.由于我司使用
Ribbon
进行客户端负载均衡,因此在路由表发生变化的情况下,负载均衡的规则也有可能变化了,这部分的动态刷新需要通过修改DynamicServerListLoadBalancer
来完成
3.替换默认的SimpleRouteLocator
在完成上述两步之后,Zuul的动态路由功能基本支持了,现在就差把系统默认的SimpleRouteLocator
给替换成我们自己的实现类,只需使用Bean注解启用自定义类即可:
@Bean
public CustomZuulRouteLocator routeLocator() {
CustomZuulRouteLocator customZuulRouteLocator = new CustomZuulRouteLocator(serverProperties.getServlet().getServletPrefix(), zuulProperties, springClientFactory);
return customZuulRouteLocator;
}