记一次缓存更新的需求

背景

我们有这样一个场景,需求是需要从某东查询出来的商品,在其他渠道进行上架销售,中间进行商品信息查询的时候需要过滤一下敏感词,这个敏感词呢是记录在数据库中的,商品查询又是实时的,因此需要将敏感词从数据库加载到缓存里面,调用的时候进行查询过滤。

技术架构

我们使用的是springboot+dubbo的架构。加上缓存又是一个比较常用的技术手段,就想着能将缓存这块规范化,开发周期尽可能短,能快速上线,因此,如何做缓存的设计就需要好好考虑。
因为使用的是springboot,加上我们自己平时经常做一些插件的扩展,所以决定,使用springboot集成吧,因此最后的架构是这样的。

image.png

基本步骤:

  • 编写刷新缓存类的统一接口
  • 具体刷新类实现接口,书写自己的缓存刷新逻辑
  • 容器启动,扫描注解,实例化所有的实现类
  • 配置类从Apollo拉取指定的namespace中用来刷新的key,做key与实例之间的映射
  • 监听Apollokey的变化,检查需要加载的bean,调用对应bean的refresh方法进行数据的刷新

上代码

拉取Apollo配置到本地缓存


import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;

import javax.annotation.PostConstruct;

import com.ailk.ecs.esf.base.exception.BusinessException;
import com.ailk.ecs.esf.conf.ApolloProperties;
import com.ailk.ecs.esf.conf.EsfProperties;
import com.ailk.ecs.jd.common.cache.WatchableCacheLoader;
import com.ailk.ecs.jd.common.cache.WatchableKVCacheLoader;
import com.ailk.ecs.jd.common.cache.WatchedCacheRefreshManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;

import com.ctrip.framework.apollo.Config;
import com.ctrip.framework.apollo.spring.annotation.ApolloConfig;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;

@Component
public class CacheResource {

    //K-V形式缓存
    private static final String IS_KV = "yes";
    private static Map<String, WatchableCacheLoader> cacheLoaders = Maps.newHashMap();
    private static Map<String, WatchableKVCacheLoader> kvCacheLoaders = Maps.newHashMap();
    private static List<String> paths = Lists.newArrayList();
    private static List<String> kvPaths = Lists.newArrayList();
    private static Logger log = LoggerFactory.getLogger(CacheResource.class);

    @ApolloConfig(value = "cacheRefresh")
    private Config cacheConfig;
    @Autowired
    private ApolloProperties apolloProperties;

    @Autowired
    private ApplicationContext appCtx;

    private CacheResource() {

    }

    public static Map<String, WatchableCacheLoader> getCacheLoaders() {
        return cacheLoaders;
    }

    public static Map<String, WatchableKVCacheLoader> getKVCacheLoaders() {
        return kvCacheLoaders;
    }

    public static List<String> getPaths() {
        return paths;
    }

    public static List<String> getKVPaths() {
        return kvPaths;
    }

    @SuppressWarnings("unchecked")
    private <T extends WatchableCacheLoader> T initLoader(String clzString) {
        paths.add(clzString);
        return (T) appCtx.getBean(clzString);
    }

    @SuppressWarnings("unchecked")
    private <T extends WatchableKVCacheLoader> T initKVLoader(String clzString) {
        kvPaths.add(clzString);
        return (T) appCtx.getBean(clzString);
    }

    @PostConstruct
    public void init() {
        log.info(" ApolloListner init caches");
        try {
            Set<String> cacheKeys = cacheConfig.getPropertyNames();
            for (String cacheKey : cacheKeys) {
                putCaches(cacheKey);
            }
            log.info("初始化缓存列表完成 ");
            //创建黑名单定时刷新timer
            new Timer().schedule(new TimerTask() {
                @Override
                public void run() {
                    //每隔BlackUserCache
                    WatchedCacheRefreshManager.getCaches().refresh("BlackUserCache");
                }
            }, 3600000, 3600000);
        }
        catch (Exception e) {
            log.error("初始化缓存列表失败 ", e);
            throw e;
        }
    }

    private void putCaches(String cacheKey) {
        try {
            String[] sections = cacheKey.split(",");
            if (sections.length == 2 && sections[1].equalsIgnoreCase(IS_KV)) {
                kvCacheLoaders.put(cacheKey, initKVLoader(cacheKey));//urlCache,yes
            }
            else {
                cacheLoaders.put(cacheKey, initLoader(cacheKey));//xxxCache
            }
        }
        catch (Exception e) {
            // 测试环境不抛错,因测试用例只加载部分缓存
            if (!"local".equals(EsfProperties.getProperty("udpCallBackSystem"))) {
                log.error("cacheKey={}", cacheKey, e);
                throw new BusinessException("9999", "非测试环境,缓存读取失败。请检查缓存配置,如是测试环境请配置 udpCallBackSystem 为 local",
                        e.getCause());
            }
        }
    }
}

Apollo监听


import com.ailk.ecs.esf.tracer.utils.TraceUtils;
import com.ailk.ecs.jd.common.cache.WatchedCacheRefreshManager;
import com.ctrip.framework.apollo.model.ConfigChangeEvent;
import com.ctrip.framework.apollo.spring.annotation.ApolloConfigChangeListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import java.util.Set;

@Component
public class CacheWatcherManager {

    private static final Logger log = LoggerFactory.getLogger(CacheWatcherManager.class);

    @ApolloConfigChangeListener("cacheRefresh")
    private void someOnChange(ConfigChangeEvent changeEvent) {
        Set<String> cacheKeys = changeEvent.changedKeys();
        for (String key : cacheKeys) {
            log.info(" ApolloListner refresh host: {}, hostName: {},refreshkey: {}",
                    TraceUtils.getHostIP(), TraceUtils.getHostName(), key);
            if (CacheResource.getPaths().contains(key)) {
                log.info("refresh cache {} start", key);
                WatchedCacheRefreshManager.getCaches().refresh(key);
                log.info("refresh cache {} end", key);
            }
            else if (CacheResource.getKVPaths().contains(key)) {
                log.info("refresh kvcache {} start", key);
                WatchedCacheRefreshManager.getKVCache(key).refresh(key);
                log.info("refresh kvcache {} end", key);
            }
            else {
                log.info(" cache key {} not exist", key);
            }
        }
    }

}

定义好接口


public interface WatchableCacheLoader {
    public Object load();
}

具体需要刷新的逻辑


import com.ailk.ecs.jd.common.cache.WatchableCacheLoader;
import com.ailk.ecs.jd.common.cache.Watched;
import com.ailk.ecs.jd.common.cache.WatchedCacheRefreshManager;
import com.ailk.ecs.jd.eface.impl.union.utils.SensitiveFilterUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import java.util.*;

@Component("sensitiveCache")
public class SensitiveCache implements WatchableCacheLoader {
    private static Logger log = LoggerFactory.getLogger(SensitiveCache.class);

    @Override
    public Object load() {
        log.info("开始刷新缓存:SensitiveCache");
        List<Map<String,String>> list = CacheUtils.getSensitive();
        Set sensitiveWordSet = new HashSet(list.size());
        for (Map<String,String> map : list) {
            sensitiveWordSet.add(map.get("KEYWORD"));
        }
        return SensitiveFilterUtils.init(sensitiveWordSet);
    }

    public static HashMap getSensitiveMap() {
        return (HashMap) WatchedCacheRefreshManager.getCache("sensitiveCache");
    }

}

缓存刷新管理类


import com.ailk.ecs.jd.common.cache.WatchableCacheLoader;
import com.ailk.ecs.jd.common.cache.WatchableKVCacheLoader;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.Maps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.Map;

@Component
public class WatchedCacheRefreshManager {

    private static final Logger log = LoggerFactory.getLogger(WatchedCacheRefreshManager.class);
    private static Map<String, WatchableCacheLoader> loaders =CacheResource.getCacheLoaders();
    private static Map<String, WatchableKVCacheLoader> kvLoaders =CacheResource.getKVCacheLoaders();
    private static LoadingCache<String, Object> caches;
    private static Map<String, LoadingCache<Object, Object>> kvCaches =Maps.newHashMap();

    @Autowired
    CacheResource cacheResource;

    public WatchedCacheRefreshManager() {
        //do noting
    }

    public static Object getCache(String cacheKey) {
        return caches.getUnchecked(cacheKey);
    }

    public static LoadingCache<Object, Object> getKVCache(String cacheKey) {
        return kvCaches.get(cacheKey);
    }

    public static LoadingCache<String, Object> getCaches() {
        return caches;
    }

    public static void clear() {
        caches = null;
        kvCaches = null;
    }

    private static void initLoadingCaches() {
        //初始化普通缓存,缓存还未加载
        caches = CacheBuilder.newBuilder()
                .recordStats()
                .removalListener(notification-> log.info("{} was removed, cause is {}", notification.getKey(), notification.getCause()))
                .build(new CacheLoader<String, Object>() {
                    @Override
                    public Object load(String key) {
                        WatchableCacheLoader loader = loaders.get(key);
                        return loader.load();
                    }
                });
        //初始化KV缓存,缓存还未加载
        for (Map.Entry<String,WatchableKVCacheLoader> entry : kvLoaders.entrySet()) {
            final WatchableKVCacheLoader loader = entry.getValue();
            kvCaches.put(entry.getKey(), CacheBuilder.newBuilder().build(new CacheLoader<Object, Object>() {
                @Override
                public Object load(Object key) {
                    return loader.load(key);
                }
            }));
        }
    }

    @PostConstruct
    public void initCaches() {
        log.info(" manager init and refresh caches");
        initLoadingCaches();

        for (String key : loaders.keySet()) {
            caches.refresh(key);
        }
    }

}

核心业务流程完毕

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容