背景
我们有这样一个场景,需求是需要从某东查询出来的商品,在其他渠道进行上架销售,中间进行商品信息查询的时候需要过滤一下敏感词,这个敏感词呢是记录在数据库中的,商品查询又是实时的,因此需要将敏感词从数据库加载到缓存里面,调用的时候进行查询过滤。
技术架构
我们使用的是springboot+dubbo的架构。加上缓存又是一个比较常用的技术手段,就想着能将缓存这块规范化,开发周期尽可能短,能快速上线,因此,如何做缓存的设计就需要好好考虑。
因为使用的是springboot,加上我们自己平时经常做一些插件的扩展,所以决定,使用springboot集成吧,因此最后的架构是这样的。
image.png
基本步骤:
- 编写刷新缓存类的统一接口
- 具体刷新类实现接口,书写自己的缓存刷新逻辑
- 容器启动,扫描注解,实例化所有的实现类
- 配置类从Apollo拉取指定的namespace中用来刷新的key,做key与实例之间的映射
- 监听Apollokey的变化,检查需要加载的bean,调用对应bean的refresh方法进行数据的刷新
上代码
拉取Apollo配置到本地缓存
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import javax.annotation.PostConstruct;
import com.ailk.ecs.esf.base.exception.BusinessException;
import com.ailk.ecs.esf.conf.ApolloProperties;
import com.ailk.ecs.esf.conf.EsfProperties;
import com.ailk.ecs.jd.common.cache.WatchableCacheLoader;
import com.ailk.ecs.jd.common.cache.WatchableKVCacheLoader;
import com.ailk.ecs.jd.common.cache.WatchedCacheRefreshManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;
import com.ctrip.framework.apollo.Config;
import com.ctrip.framework.apollo.spring.annotation.ApolloConfig;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@Component
public class CacheResource {
//K-V形式缓存
private static final String IS_KV = "yes";
private static Map<String, WatchableCacheLoader> cacheLoaders = Maps.newHashMap();
private static Map<String, WatchableKVCacheLoader> kvCacheLoaders = Maps.newHashMap();
private static List<String> paths = Lists.newArrayList();
private static List<String> kvPaths = Lists.newArrayList();
private static Logger log = LoggerFactory.getLogger(CacheResource.class);
@ApolloConfig(value = "cacheRefresh")
private Config cacheConfig;
@Autowired
private ApolloProperties apolloProperties;
@Autowired
private ApplicationContext appCtx;
private CacheResource() {
}
public static Map<String, WatchableCacheLoader> getCacheLoaders() {
return cacheLoaders;
}
public static Map<String, WatchableKVCacheLoader> getKVCacheLoaders() {
return kvCacheLoaders;
}
public static List<String> getPaths() {
return paths;
}
public static List<String> getKVPaths() {
return kvPaths;
}
@SuppressWarnings("unchecked")
private <T extends WatchableCacheLoader> T initLoader(String clzString) {
paths.add(clzString);
return (T) appCtx.getBean(clzString);
}
@SuppressWarnings("unchecked")
private <T extends WatchableKVCacheLoader> T initKVLoader(String clzString) {
kvPaths.add(clzString);
return (T) appCtx.getBean(clzString);
}
@PostConstruct
public void init() {
log.info(" ApolloListner init caches");
try {
Set<String> cacheKeys = cacheConfig.getPropertyNames();
for (String cacheKey : cacheKeys) {
putCaches(cacheKey);
}
log.info("初始化缓存列表完成 ");
//创建黑名单定时刷新timer
new Timer().schedule(new TimerTask() {
@Override
public void run() {
//每隔BlackUserCache
WatchedCacheRefreshManager.getCaches().refresh("BlackUserCache");
}
}, 3600000, 3600000);
}
catch (Exception e) {
log.error("初始化缓存列表失败 ", e);
throw e;
}
}
private void putCaches(String cacheKey) {
try {
String[] sections = cacheKey.split(",");
if (sections.length == 2 && sections[1].equalsIgnoreCase(IS_KV)) {
kvCacheLoaders.put(cacheKey, initKVLoader(cacheKey));//urlCache,yes
}
else {
cacheLoaders.put(cacheKey, initLoader(cacheKey));//xxxCache
}
}
catch (Exception e) {
// 测试环境不抛错,因测试用例只加载部分缓存
if (!"local".equals(EsfProperties.getProperty("udpCallBackSystem"))) {
log.error("cacheKey={}", cacheKey, e);
throw new BusinessException("9999", "非测试环境,缓存读取失败。请检查缓存配置,如是测试环境请配置 udpCallBackSystem 为 local",
e.getCause());
}
}
}
}
Apollo监听
import com.ailk.ecs.esf.tracer.utils.TraceUtils;
import com.ailk.ecs.jd.common.cache.WatchedCacheRefreshManager;
import com.ctrip.framework.apollo.model.ConfigChangeEvent;
import com.ctrip.framework.apollo.spring.annotation.ApolloConfigChangeListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.util.Set;
@Component
public class CacheWatcherManager {
private static final Logger log = LoggerFactory.getLogger(CacheWatcherManager.class);
@ApolloConfigChangeListener("cacheRefresh")
private void someOnChange(ConfigChangeEvent changeEvent) {
Set<String> cacheKeys = changeEvent.changedKeys();
for (String key : cacheKeys) {
log.info(" ApolloListner refresh host: {}, hostName: {},refreshkey: {}",
TraceUtils.getHostIP(), TraceUtils.getHostName(), key);
if (CacheResource.getPaths().contains(key)) {
log.info("refresh cache {} start", key);
WatchedCacheRefreshManager.getCaches().refresh(key);
log.info("refresh cache {} end", key);
}
else if (CacheResource.getKVPaths().contains(key)) {
log.info("refresh kvcache {} start", key);
WatchedCacheRefreshManager.getKVCache(key).refresh(key);
log.info("refresh kvcache {} end", key);
}
else {
log.info(" cache key {} not exist", key);
}
}
}
}
定义好接口
public interface WatchableCacheLoader {
public Object load();
}
具体需要刷新的逻辑
import com.ailk.ecs.jd.common.cache.WatchableCacheLoader;
import com.ailk.ecs.jd.common.cache.Watched;
import com.ailk.ecs.jd.common.cache.WatchedCacheRefreshManager;
import com.ailk.ecs.jd.eface.impl.union.utils.SensitiveFilterUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.util.*;
@Component("sensitiveCache")
public class SensitiveCache implements WatchableCacheLoader {
private static Logger log = LoggerFactory.getLogger(SensitiveCache.class);
@Override
public Object load() {
log.info("开始刷新缓存:SensitiveCache");
List<Map<String,String>> list = CacheUtils.getSensitive();
Set sensitiveWordSet = new HashSet(list.size());
for (Map<String,String> map : list) {
sensitiveWordSet.add(map.get("KEYWORD"));
}
return SensitiveFilterUtils.init(sensitiveWordSet);
}
public static HashMap getSensitiveMap() {
return (HashMap) WatchedCacheRefreshManager.getCache("sensitiveCache");
}
}
缓存刷新管理类
import com.ailk.ecs.jd.common.cache.WatchableCacheLoader;
import com.ailk.ecs.jd.common.cache.WatchableKVCacheLoader;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.Maps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.Map;
@Component
public class WatchedCacheRefreshManager {
private static final Logger log = LoggerFactory.getLogger(WatchedCacheRefreshManager.class);
private static Map<String, WatchableCacheLoader> loaders =CacheResource.getCacheLoaders();
private static Map<String, WatchableKVCacheLoader> kvLoaders =CacheResource.getKVCacheLoaders();
private static LoadingCache<String, Object> caches;
private static Map<String, LoadingCache<Object, Object>> kvCaches =Maps.newHashMap();
@Autowired
CacheResource cacheResource;
public WatchedCacheRefreshManager() {
//do noting
}
public static Object getCache(String cacheKey) {
return caches.getUnchecked(cacheKey);
}
public static LoadingCache<Object, Object> getKVCache(String cacheKey) {
return kvCaches.get(cacheKey);
}
public static LoadingCache<String, Object> getCaches() {
return caches;
}
public static void clear() {
caches = null;
kvCaches = null;
}
private static void initLoadingCaches() {
//初始化普通缓存,缓存还未加载
caches = CacheBuilder.newBuilder()
.recordStats()
.removalListener(notification-> log.info("{} was removed, cause is {}", notification.getKey(), notification.getCause()))
.build(new CacheLoader<String, Object>() {
@Override
public Object load(String key) {
WatchableCacheLoader loader = loaders.get(key);
return loader.load();
}
});
//初始化KV缓存,缓存还未加载
for (Map.Entry<String,WatchableKVCacheLoader> entry : kvLoaders.entrySet()) {
final WatchableKVCacheLoader loader = entry.getValue();
kvCaches.put(entry.getKey(), CacheBuilder.newBuilder().build(new CacheLoader<Object, Object>() {
@Override
public Object load(Object key) {
return loader.load(key);
}
}));
}
}
@PostConstruct
public void initCaches() {
log.info(" manager init and refresh caches");
initLoadingCaches();
for (String key : loaders.keySet()) {
caches.refresh(key);
}
}
}
核心业务流程完毕