Java实现本地任务聚合与分布式任务聚合

场景介绍:在高并发场景,如果调用链是A ->B,A不需要立马获取请求的最终结果(可以理解为异步),而A的请求逻辑是可以批量处理,这时候就可以使用聚合批量处理请求。使用聚合,可以极大的提高服务的处理能力,是高并发场景中效果最好的优化之一。
下面介绍两种任务聚合的实现方式,目前代码只针对了处理聚合的时间间隔,没有控制可聚合的最大数量,如有需要可以在代码上加以处理。

聚合实现的基础类

需要实现聚合,至少需要两个方法,一个为插入一个聚合任务的子任务的方法,一个为处理聚合后的任务。此外还需要控制聚合时间,比如把5秒内的请求数据聚合。
基础类如下:

public abstract class Polymerization {
    protected int polTime = 5000;
    public Polymerization(int polTime){
        this.polTime = polTime;
    }

    /**
     * 插入一个聚合任务的子任务
     * @param data 任务数据
     * @param polymerizationId 任务类型的id
     */
    public abstract void pushPolymerization(String data,String polymerizationId);

    /**
     * 处理聚合后的任务
     * @param data 任务聚合后数据列表
     * @param polymerizationId 任务类型的id
     */
    public abstract void dealData(List<String> data, String polymerizationId);
}

本地任务聚合的实现方式

实现思路

因为只是本地任务聚合,只需要考虑多线程安全问题。从效率、性能考虑,直接只使用一个ConcurrentLinkedQueue类型的成员变量,配合内部类封装时间,任务id,任务数据。逻辑代码可以无锁执行,而ConcurrentLinkedQueue本身是高性能多线程安全类。所以在性能方面是非常优秀的。实现代码如下,逻辑已注释:

public class PolymerizationLocalDemo extends Polymerization{
    private ConcurrentLinkedQueue<PolData> dataQueue = new ConcurrentLinkedQueue<>();

    public PolymerizationLocalDemo(int polTime) {
        super(polTime);
    }

    /**
     * 聚合任务封装类
     */
    @Data
    @AllArgsConstructor
    private static class PolData{
        /**
         * 任务数据
         */
        private String data;
        /**
         * 任务类型id
         */
        private String polymerizationId;
        /**
         * 任务的出生时间
         */
        private long time;
    }
    @Override
    public void pushPolymerization(String data, String polymerizationId){
        //直接插入到队列尾部
        dataQueue.add(new PolData(data,polymerizationId,System.currentTimeMillis()));
    }
    @Override
    @Async
    public void dealData(List<String> data,String polymerizationId){
        //todo 这里加处理逻辑就好
    }
    @Scheduled(fixedDelay = 1000)
    public void timeJob() {
        PolData head = dataQueue.peek();
        long now = System.currentTimeMillis();
        //读取头部数据,当头部数据距离当前大于polTime,就处理头部数据时间+polTime时间范围内的数据
        if(head == null|| head.getTime()+polTime>now){
            return;
        }
        List<PolData> data = new LinkedList<>();
        //如果当前队列不为空且头部数据时间属于聚合时间范围内,则poll出头部数据
        while (dataQueue.peek()!=null&&dataQueue.peek().getTime() < head.getTime()+polTime) {
            data.add(dataQueue.poll());
        }
        //根据任务类型id分类
        Map<String,List<PolData>> mapDate = data.stream().collect(Collectors.groupingBy(PolData::getPolymerizationId));
        //分类后调用dealData处理聚合
        for(Map.Entry<String,List<PolData>> entry:mapDate.entrySet()){
            dealData(entry.getValue().stream().map(PolData::getData).collect(Collectors.toList()), entry.getKey());
        }
    }
}

分布式任务聚合的实现方式

实现思路

因为是分布式环境,所以要聚合的话需要使用redis作为中间件来存储信息和过期信息。并用rua脚本来保证操作的原子性。需要定时任务定时插入空数据去判断是否可以聚合了。这样设计逻辑比较简单,垃圾数据量可控制。如果需要进一步优化,则需要用两个lua脚本来实现。代码如下,逻辑已注释

public class PolymerizationDemo extends Polymerization{
    /**
     * lua脚本,大致逻辑:
     * 首先拿取当前类型任务的到期时间
     * 然直接插入redis队列
     * 如果当前时间小于过期时间,返回0和队列的长度
     * 如果当前时间已经大于过期时间,则把所有数据返回,并删除队列数据,并重新设置聚合到期时间
     **/
    private static final String SCRIPT_LUA = "" +
            "  local ts = redis.call('GET', KEYS[2])" +
            "local size =redis.call('RPUSH', KEYS[1], ARGV[1])"+
            "  if tonumber( ARGV[2]) < tonumber(ts) then" +
            "    return {0,  size}" +
            "  else" +
            "    redis.call('set', KEYS[2], ARGV[3])" +
            "    local items = redis.call('LRANGE', KEYS[1], -1, size)" +
            "    redis.call('DEL', KEYS[1])" +
            "    return items" +
            "  end";
    @Autowired
    private StringRedisTemplate redis;

    private static final String EXPIRE_KEY = "expire_key_";
    private static final String DATA_KEY = "expire_key_";

    public PolymerizationDemo(int polTime) {
        super(polTime);
    }


    public List evalScript(String lua, List<String> keys, Object... values) {
        DefaultRedisScript<List> redisScript = new DefaultRedisScript<List>(lua, List.class);
        return redis.execute(redisScript,keys,values);
    }
    @Override
    public void pushPolymerization(String data, String polymerizationId){
        //当前任务类型的一次聚合的最终时间的key
        String timeExpireKey = EXPIRE_KEY+polymerizationId;
        //当前任务类型保存数据的队列的key
        String listDateKey = DATA_KEY+polymerizationId;
        long timeNow = System.currentTimeMillis();
        //如果当前聚合成功后,新的聚合到期时间
        long timePreExpire = timeNow+polTime;
        List<String> keys = Lists.newArrayList(timeExpireKey,listDateKey);
        //执行rua脚本
        List<String> result = (List<String>) evalScript(SCRIPT_LUA,keys,data,timeNow,timePreExpire);
        // 如果第一个返回0,表示还没有到聚合的时候
        if(!"0".equals(result.get(0))){
            dealData(result,polymerizationId);
        }
    }
    @Override
    @Async
    public void dealData(List<String> data,String polymerizationId){
        //移除无用的心跳数据
        while (data.remove("empty")){

        }
        //逻辑
    }
    @Scheduled(fixedDelay = 1000)
    public void timeJob(){
        List<String> idList =  Lists.newArrayList("id1","id2","id3");
        for(String id:idList){
            //尝试插入特殊标记empty的数据来尝试聚合
            pushPolymerization("empty",id);
        }
    }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,919评论 6 502
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,567评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,316评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,294评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,318评论 6 390
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,245评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,120评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,964评论 0 275
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,376评论 1 313
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,592评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,764评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,460评论 5 344
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,070评论 3 327
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,697评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,846评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,819评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,665评论 2 354

推荐阅读更多精彩内容