场景
统计一个批量接口会有多少数据,这个接口的QPS在100万级别。有几种方案:
- 每次调用都串行计算一次;
- 每次调用使用线程池并行计算。
由于并发量特别的大,第1种场景肯定不适合,这会把相应时间拉长。第二种方法每次请求过来都放到一个线程池里面请求,比第一种强很多,用这种方式基本上可以解决80%左右的需求了。那么还有能优化的地方么?答案是有的。
Cache + 线程池
一般在大的公司都有一些监控系统,可以将监控的数据上报到监控系统中。上面两个场景都是每次请求都会调用上报接口,这样特别浪费资源也可能出现性能问题。是否可以想一个办法减少上报次数呢?我们可以使用cache汇总在一起,打包通过线程池异步上报。是不是这种方式会更好一些。
实现
怎么实现呢? 首先我们需要一个cache,这次我们使用Guava Cache。
Guava Cache 是google开发开源项目Guava中带有的功能,只提供堆缓存,也就是说重启机器后就没有了,特点:小巧玲珑,性能最好。
private volatile static Cache<String, MutableInt> metricCache = null;
public static Cache<String, MutableInt> getMetricCache(){
if (metricCache == null) {
synchronized (this) {
if (metricCache == null) {
metricCache = initMetricCache();
return metricCache;
}
}
}
return metricCache;
}
private static Cache<String, MutableInt> initMetricCache(){
Cache<String, MutableInt> initMetricCache = CacheBuilder.newBuilder()
// 设置缓存个数
.maximumSize(1024)
// 设置cache中的数据在写入之后的存活时间为1秒
.expireAfterWrite(1, TimeUnit.MINUTES)
// 设置并发数为8,即同一时间最多只能有5个线程往cache执行写入操作
.concurrencyLevel(8)
// 声明一个监听器,缓存项被移除时做一些额外操作。这里使用异步线程池的形式实现,更加高效。
.removalListener(RemovalListeners.asynchronous(new RemovalListener<String, MutableInt>(){
@Override
public void onRemoval(RemovalNotification<String, MutableInt> notification) {
// 删除后的逻辑操作,这里是上报到监控系统中
metricForCount(notification.getKey(), notification.getValue().intValue());
}
},
// 自定义线程池,这里就不在把实现的代码粘进来了
taskExecutor.getTaskExecutor()))
.build();
return initMetricCache;
}
对上面的代码进行分析:
-
CacheBuilder.newBuilder()
创建一个Guava Cache,设置一些配置; - 在调用时考虑到高效性,使用了一个小技巧延迟加载,参考
getMetricCache()
实现; - 在Guava Cache中使用
removalListener
特性,结合我们的需求,当统计记录达到一定的数量后,删除掉并在监听的线程池中实现上报。
应用
看着很牛B,怎么使用呢?
public static void logMetricForCount(final String key, final int count) {
try {
MutableInt logMetric = getMetricCache().get(key, new Callable<MutableInt>() {
@Override
public MutableInt call() throws Exception {
return new MutableInt(0);
}
});
// 计数
logMetric.add(count);
if(logMetric.intValue() > 500){
// 当计数达到500个时删除此key,从而触发上面配置的removalListener
getMetricCache().invalidate(key);
}
} catch (Exception e) {
logger.warn("统计{}信息次数{}异常", key, count, e);
}
}
在实战的计数操作,apache提供了MutableInt专门用于高效计数的类。还使用到Guava Cache的特性。
MutableInt logMetric = getMetricCache().get(key, new Callable<MutableInt>() {
@Override
public MutableInt call() throws Exception {
return new MutableInt(0);
}
});
当没有get到数据时,自动初始化一个。是不是很棒!
代码是不是就到此结束了? 不是的。我们在开发代码时需要考虑高效。Guava Cache在设计时也考虑到高效性,不过如果不仔细阅读使用文档,也会给自己买坑。
Guava Cache清理什么时候发生?使用CacheBuilder构建的缓存不会"自动"执行清理和回收工作,也不会在某个缓存项过期后马上清理,也没有诸如此类的清理机制。相反,它会在写操作时顺带做少量的维护工作,或者偶尔在读操作时做——如果写操作实在太少的话。
如果你的缓存是高吞吐的,那就无需担心缓存的维护和清理等工作。如果你的 缓存只会偶尔有写操作,而你又不想清理工作阻碍了读操作,那么可以创建自己的维护线程,以固定的时间间隔调用Cache.cleanUp()。ScheduledExecutorService可以帮助你很好地实现这样的定时调度。
对于高并发量的情况下,我们还需要写一个线程去定时cleanUp。
Runnable metrciCacheCleanUpTask = new Runnable() {
@Override
public void run() {
getMetricCache().cleanUp();
} catch (Exception e) {
logger.error("定时cleanUp方法异常",e);
}
}
};
// 使用线程池每分钟执行一次
commTaskScheduler.scheduleWithFixedDelay(metrciCacheCleanUpTask, 60000);
线程池相关的实现可以参考我以前的blog,微分享-spring线程池实战
Guava Cache CacheLoader还提供了数据加载机制,有兴趣的话可以研究一下。