场景介绍:在高并发场景,如果调用链是A ->B,A不需要立马获取请求的最终结果(可以理解为异步),而A的请求逻辑是可以批量处理,这时候就可以使用聚合批量处理请求。使用聚合,可以极大的提高服务的处理能力,是高并发场景中效果最好的优化之一。
下面介绍两种任务聚合的实现方式,目前代码只针对了处理聚合的时间间隔,没有控制可聚合的最大数量,如有需要可以在代码上加以处理。
聚合实现的基础类
需要实现聚合,至少需要两个方法,一个为插入一个聚合任务的子任务的方法,一个为处理聚合后的任务。此外还需要控制聚合时间,比如把5秒内的请求数据聚合。
基础类如下:
public abstract class Polymerization {
protected int polTime = 5000;
public Polymerization(int polTime){
this.polTime = polTime;
}
/**
* 插入一个聚合任务的子任务
* @param data 任务数据
* @param polymerizationId 任务类型的id
*/
public abstract void pushPolymerization(String data,String polymerizationId);
/**
* 处理聚合后的任务
* @param data 任务聚合后数据列表
* @param polymerizationId 任务类型的id
*/
public abstract void dealData(List<String> data, String polymerizationId);
}
本地任务聚合的实现方式
实现思路
因为只是本地任务聚合,只需要考虑多线程安全问题。从效率、性能考虑,直接只使用一个ConcurrentLinkedQueue类型的成员变量,配合内部类封装时间,任务id,任务数据。逻辑代码可以无锁执行,而ConcurrentLinkedQueue本身是高性能多线程安全类。所以在性能方面是非常优秀的。实现代码如下,逻辑已注释:
public class PolymerizationLocalDemo extends Polymerization{
private ConcurrentLinkedQueue<PolData> dataQueue = new ConcurrentLinkedQueue<>();
public PolymerizationLocalDemo(int polTime) {
super(polTime);
}
/**
* 聚合任务封装类
*/
@Data
@AllArgsConstructor
private static class PolData{
/**
* 任务数据
*/
private String data;
/**
* 任务类型id
*/
private String polymerizationId;
/**
* 任务的出生时间
*/
private long time;
}
@Override
public void pushPolymerization(String data, String polymerizationId){
//直接插入到队列尾部
dataQueue.add(new PolData(data,polymerizationId,System.currentTimeMillis()));
}
@Override
@Async
public void dealData(List<String> data,String polymerizationId){
//todo 这里加处理逻辑就好
}
@Scheduled(fixedDelay = 1000)
public void timeJob() {
PolData head = dataQueue.peek();
long now = System.currentTimeMillis();
//读取头部数据,当头部数据距离当前大于polTime,就处理头部数据时间+polTime时间范围内的数据
if(head == null|| head.getTime()+polTime>now){
return;
}
List<PolData> data = new LinkedList<>();
//如果当前队列不为空且头部数据时间属于聚合时间范围内,则poll出头部数据
while (dataQueue.peek()!=null&&dataQueue.peek().getTime() < head.getTime()+polTime) {
data.add(dataQueue.poll());
}
//根据任务类型id分类
Map<String,List<PolData>> mapDate = data.stream().collect(Collectors.groupingBy(PolData::getPolymerizationId));
//分类后调用dealData处理聚合
for(Map.Entry<String,List<PolData>> entry:mapDate.entrySet()){
dealData(entry.getValue().stream().map(PolData::getData).collect(Collectors.toList()), entry.getKey());
}
}
}
分布式任务聚合的实现方式
实现思路
因为是分布式环境,所以要聚合的话需要使用redis作为中间件来存储信息和过期信息。并用rua脚本来保证操作的原子性。需要定时任务定时插入空数据去判断是否可以聚合了。这样设计逻辑比较简单,垃圾数据量可控制。如果需要进一步优化,则需要用两个lua脚本来实现。代码如下,逻辑已注释
public class PolymerizationDemo extends Polymerization{
/**
* lua脚本,大致逻辑:
* 首先拿取当前类型任务的到期时间
* 然直接插入redis队列
* 如果当前时间小于过期时间,返回0和队列的长度
* 如果当前时间已经大于过期时间,则把所有数据返回,并删除队列数据,并重新设置聚合到期时间
**/
private static final String SCRIPT_LUA = "" +
" local ts = redis.call('GET', KEYS[2])" +
"local size =redis.call('RPUSH', KEYS[1], ARGV[1])"+
" if tonumber( ARGV[2]) < tonumber(ts) then" +
" return {0, size}" +
" else" +
" redis.call('set', KEYS[2], ARGV[3])" +
" local items = redis.call('LRANGE', KEYS[1], -1, size)" +
" redis.call('DEL', KEYS[1])" +
" return items" +
" end";
@Autowired
private StringRedisTemplate redis;
private static final String EXPIRE_KEY = "expire_key_";
private static final String DATA_KEY = "expire_key_";
public PolymerizationDemo(int polTime) {
super(polTime);
}
public List evalScript(String lua, List<String> keys, Object... values) {
DefaultRedisScript<List> redisScript = new DefaultRedisScript<List>(lua, List.class);
return redis.execute(redisScript,keys,values);
}
@Override
public void pushPolymerization(String data, String polymerizationId){
//当前任务类型的一次聚合的最终时间的key
String timeExpireKey = EXPIRE_KEY+polymerizationId;
//当前任务类型保存数据的队列的key
String listDateKey = DATA_KEY+polymerizationId;
long timeNow = System.currentTimeMillis();
//如果当前聚合成功后,新的聚合到期时间
long timePreExpire = timeNow+polTime;
List<String> keys = Lists.newArrayList(timeExpireKey,listDateKey);
//执行rua脚本
List<String> result = (List<String>) evalScript(SCRIPT_LUA,keys,data,timeNow,timePreExpire);
// 如果第一个返回0,表示还没有到聚合的时候
if(!"0".equals(result.get(0))){
dealData(result,polymerizationId);
}
}
@Override
@Async
public void dealData(List<String> data,String polymerizationId){
//移除无用的心跳数据
while (data.remove("empty")){
}
//逻辑
}
@Scheduled(fixedDelay = 1000)
public void timeJob(){
List<String> idList = Lists.newArrayList("id1","id2","id3");
for(String id:idList){
//尝试插入特殊标记empty的数据来尝试聚合
pushPolymerization("empty",id);
}
}
}