Redis 使用list队列高速消费数据

测试目的

本次测试目的是 消费 Redis List类型 里的数据 以各种方式来快速消费,得到最佳消费方式。消费框架为 spring boot,消费工具库为 lettuce,结合redisredisTemplate 的 api 来载入和消费数据,消费数据量分别为 1.5w、2w、10w。消费数据会提前加载到 Redis list 中,消费api 为 redisredisTemplate.opsForList().rightPop(key, Duration.ofSeconds(3)),该api 被封装为 redisSdk.LRightPopBlock(key,second)。

使用 ForkJoinPool 的方式测试

ForkJoinPool 的优势在于,可以充分利用多cpu,多核cpu的优势,把一个任务拆分成多个“小任务”,把多个“小任务”放到多个处理器核心上并行执行;当多个“小任务”执行完成之后,再将这些执行结果合并起来即可。线程的数量是通过构造函数传入,如果没有向构造函数中传入希望的线程数量,那么当前计算机可用的CPU数量会被设置为线程数量作为默认值。

测试机配置

CPU 类型 基准速度 系统类型 内存 内核 逻辑CPU
Intel(R) Core(TM) i7-8565U CPU @ 1.80GHz 1.99 GHz 64位 8G 4 8

单线程测试

单线程不存在 ForkJoinPool 的方式,所以这里没用到。

    @Override
    public void run(String... args) throws Exception {
        long sumTime = 0;
        while(true){
            try{
                long startTime = System.currentTimeMillis();
                String str = redisSdk.LRightPopBlock("a",3);
                if(!StrUtil.isEmpty(str)) {
                    long endTime = System.currentTimeMillis();
                    sumTime += endTime - startTime;
                    log.info("处理数据 {}, 累计时间 {}",str,sumTime);
                }
            }catch(Exception e){
                log.error(e.getMessage());
            }
        }
    }
测试结果

处理 15000 条数据,所需耗时 444s

ForkJoinPool 多线程消费代码

    public void test2(){
        int fcore = 2;
        ForkJoinPool forkJoinPool = new ForkJoinPool(fcore);
        AtomicInteger count = new AtomicInteger(0);
        AtomicInteger breaks = new AtomicInteger(0);

        long startTime = System.currentTimeMillis();
        IntStream.range(0,fcore).forEach(j ->{
            forkJoinPool.execute(() -> {
                while (true) {
                    try {
                        String str = redisSdk.LRightPopBlock("a", 3);
                        if (!StrUtil.isEmpty(str)) {
                            log.info("累计处理 {} 条数据", count.incrementAndGet());
                        } else {
                            breaks.incrementAndGet();
                            break;
                        }
                    } catch (Exception e) {
                        log.error(e.getMessage());
                    }
                }
            });
        });

        while(true) {
            try {
                if(count.get() >= 15000) {
                    break;
                }
                Thread.sleep(500);
            }catch(Exception e){
                e.printStackTrace();
            }
        }
        long endTime = System.currentTimeMillis();
        log.info("总耗时 {}",endTime - startTime - 1000);
    }
测试结果
并行线程数 耗时 消费数据 备注
2 210s 15000
4 108s 15000
6 79s 15000
8 81s 15000
16 31.234s 15000
32 36.271s 15000

ThreadPoolExecutor 测试

ThreadPoolExecutor 是一般线程池,ThreadPoolExecutor 我这里使用的是定长方式,和 Executors.newFixedThreadPool() 是一样的。代码如下:

    public void test5(){
        int fxcore = 4;

        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(fxcore, fxcore, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());

        AtomicInteger count = new AtomicInteger(0);
        AtomicInteger breaks = new AtomicInteger(0);

        long startTime = System.currentTimeMillis();
        IntStream.range(0,fxcore).forEach(j ->{
            threadPoolExecutor.execute(() -> {
                while (true) {
                    try {
                        String str = redisSdk.LRightPopBlock("a", 3);
                        if (!StrUtil.isEmpty(str)) {
                            log.info("累计处理 {} 条数据", count.incrementAndGet());
                        } else {
                            breaks.incrementAndGet();
                            break;
                        }
                    } catch (Exception e) {
                        log.error(e.getMessage());
                    }
                }
            });
        });
        while(true) {
            try {
                if(count.get() >= 15000) {
                    break;
                }
                Thread.sleep(500);
            }catch(Exception e){
                e.printStackTrace();
            }
        }
        long endTime = System.currentTimeMillis();
        log.info("总耗时 {}",endTime - startTime - 1000);
    }
测试结果
并行线程数 耗时 消费数据 备注
4 111.124s 15000
8 55.325s 15000
16 29.04s 15000
32 14.57s 15000
总结

ThreadPoolExecutor 开的线程越多,处理速度越快

ForkJoinPool + ThreadPoolExecutor 测试

ForkJoinPool + ThreadPoolExecutor 的测试方案是我自己想的一套,如果说 ForkJoinPool 可以充分让多核 cpu 处理任务,那让每个cpu在建立自己的多线程处理会不会更快呢?测试代码如下:

    public void test2(){
        int fcore = 4;
        int fxcore = 16;

        ForkJoinPool forkJoinPool = new ForkJoinPool(fcore);
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(fxcore, fxcore, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());

        AtomicInteger count = new AtomicInteger(0);
        AtomicInteger breaks = new AtomicInteger(0);

        long startTime = System.currentTimeMillis();
        IntStream.range(0,fcore).forEach(j ->{
            forkJoinPool.execute(() -> {
                for(int i=0;i< fxcore/fcore ;i++) {
                    threadPoolExecutor.execute(() -> {
                        while (true) {
                            try {
                                String str = redisSdk.LRightPopBlock("a", 3);
                                if (!StrUtil.isEmpty(str)) {
                                    log.info("累计处理 {} 条数据", count.incrementAndGet());
                                } else {
                                    breaks.incrementAndGet();
                                    break;
                                }
                            } catch (Exception e) {
                                log.error(e.getMessage());
                            }
                        }
                    });
                }
            });
        });
        while(true) {
            try {
                if(count.get() >= 15000) {
                    break;
                }
                Thread.sleep(500);
            }catch(Exception e){
                e.printStackTrace();
            }
        }
        long endTime = System.currentTimeMillis();
        log.info("总耗时 {}",endTime - startTime - 1000);
    }
测试结果
ForkJoinPool线程数 ThreadPool线程数 消费数据 耗时 备注
4 16 15000 28.55s
6 16 15000 37.55s 并行越多反而越慢
4 32 15000 15.53s
6 32 15000 16.15s 并行越多反而越慢,但线程多反而快
总结

看来并没有什么用,还是用 ThreadPool 速度最快。

Pipelined

查了很多方式,查到redis Pipelined也可以 使用rpop,代码如下:

    public void test7(int i){
        long startTime = System.currentTimeMillis();
        List<String> list = redisSdk.getTemplate().executePipelined(new RedisCallback() {
            @Override
            public String doInRedis(RedisConnection connection) throws DataAccessException {
                for(int i=0;i<20000;i++) {
                    connection.rPop("a".getBytes());
                }
                return null;
            }
        });
        long endTime = System.currentTimeMillis();
        log.info("共取到 {} 耗时 {}",list.size(),endTime-startTime);
    }
测试结果

这种方式不像上面单条消费,这里可以自己存到list,然后在写消费程序。注意这里如果redis list数据没有那么多,可能取到的是 null 值,该null值为对象,不为字符串。这里的测试每次都会打开一个新的连接

数据量 时间
1W 980ms
2W 1790ms
3W 2880ms
多线程测试数据并发
    private List<String> list1 = new ArrayList<>();

    private List<String> list2 = new ArrayList<>();

    private List<String> list3 = new ArrayList<>();


    public static void main(String[] args) {
        SpringApplication.run(RealTimeLibraryApplication.class, args);
    }


    @Override
    public void run(String... args) throws Exception {
        long[] a = LongStream.range(0, 10000).toArray();
        String[] aArr = Stream.of(Arrays.toString(a)).collect(Collectors.joining("", "[", "]")).split(",");
        redisSdk.LLeftSet("a", aArr);
        System.out.println("初始化完毕");
        
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(3, 3, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
        for(int i=0;i<3;i++) {
            int finalI = i;
            threadPoolExecutor.execute(() -> test7(finalI));
        }
        while(true) {
            if(list1.size() > 0 && list2.size() > 0 && list3.size() >0){
                Thread.sleep(3000);
                break;
            }
            Thread.sleep(1000);
        }
        // disjoint true 就是没有交集 false 就是有交集
        log.info("list1 与 list2 交集存在为 {}", Collections.disjoint(list1,list2)?"否":"是");
        log.info("list1 与 list2 交集存在为 {}",Collections.disjoint(list1,list3)?"否":"是");
        log.info("list1 与 list2 交集存在为 {}",Collections.disjoint(list2,list3)?"否":"是");
    }

    public void test7(int i){
        long startTime = System.currentTimeMillis();
        List<String> list = redisSdk.getTemplate().executePipelined(new RedisCallback() {
            @Override
            public String doInRedis(RedisConnection connection) throws DataAccessException {
                for(int i=0;i<20000;i++) {
                    connection.rPop("a".getBytes());
                }
                return null;
            }
        });
        long endTime = System.currentTimeMillis();
        log.info("共取到 {} 耗时 {}",list.size(),endTime-startTime);
        while(list.contains(null)){
            list.remove(null);
        }
        list.forEach(System.out::println);
        if(i ==0){
            list1.addAll(list);
        }
        if(i ==1){
            list2.addAll(list);
        }
        if(i ==2){
            list3.addAll(list);
        }
    }

测试结果发现数据差不多是均匀的分布在每个list中(每个list在3000+多),且数据没有重复,所以上集群是没有问题的。

业务数据模拟测试代码
    @Override
    public void run(String... args) throws Exception {
        List<String> list1 = new ArrayList<>();
        for(int i=0;i<10000;i++){
            list1.add("{\"sysId\":41040020001,\"mpntId\":1000070001,\"attrId\":1,\"dataItemId\":1000001,\"timestamp\":1603789510000,\"value\":1,\"key\":\"41040020001:1000070001:1:1000001\"}");
        }
        List<String> list2 = new ArrayList<>();
        for(int i=0;i<10000;i++){
            list2.add("{\"sysId\":41040020001,\"mpntId\":1000070001,\"attrId\":1,\"dataItemId\":1000002,\"timestamp\":1603789510000,\"value\":1,\"key\":\"41040020001:1000070001:1:1000002\"}");
        }
        List<String> list3 = new ArrayList<>();
        for(int i=0;i<10000;i++){
            list3.add("{\"sysId\":41040020001,\"mpntId\":1000070001,\"attrId\":1,\"dataItemId\":1000012,\"timestamp\":1603789510000,\"value\":1,\"key\":\"41040020001:1000070001:1:1000012\"}");
        }
        List<String> list4 = new ArrayList<>();
        for(int i=0;i<10000;i++){
            list4.add("{\"sysId\":41040020001,\"mpntId\":1000070001,\"attrId\":1,\"dataItemId\":1000013,\"timestamp\":1603789510000,\"value\":1,\"key\":\"41040020001:1000070001:1:1000013\"}");
        }
        List<String> list5 = new ArrayList<>();
        for(int i=0;i<10000;i++){
            list5.add("{\"sysId\":41040020001,\"mpntId\":1000070001,\"attrId\":1,\"dataItemId\":1000014,\"timestamp\":1603789510000,\"value\":1,\"key\":\"41040020001:1000070001:1:1000014\"}");
        }

        redisSdk.sAddAll("power:station:keys", Arrays.asList("41040020001"));


        redisSdk.lLeftAddAll("41040020001",list1);
        redisSdk.lLeftAddAll("41040020001",list2);
        redisSdk.lLeftAddAll("41040020001",list3);
        redisSdk.lLeftAddAll("41040020001",list4);
        redisSdk.lLeftAddAll("41040020001",list5);



        System.out.println("初始化完毕");

        mainService.start();

    }
测试结果打印

在测试代码里面针对不同的 dataItemId 各创建1W条数据。

获取数量 获取时间 整个流程所耗时间
3W 996 ms 1030 ms
3W 1084 1303 ms
3W 1006 ms 1032 ms
3W 982 ms 1027 ms

最终结论

以上的 Pipelined 测试为表现最好的一个,能批量处理数据,减少网络IO,在实际业务应用中测试的表现也优于其他方式,故针对实时库的消费方式定为此方式。

lettuce jedis 性能比较

使用的都是spring redis提供的开发API,在数据包大小差不多一样的情况下,没用到链接池。

lettuce(ms) jedis(ms)
606 231
490 202
436 183
382 173
381 167
373 158
... 150
... 149
... 148
... ...

jedis 稳定在 148-149ms
lettuce 稳定在 373ms

补:这里引发的一个问题就是,我到底使用多少个线程会比较好呢?如果线程执行的是计算型任务可以核数 * 2,因为数值计算快,给太多线程会频繁切换线程。如果是io型(任务型,业务型),线程可以给几百上千都没问题,但是太多的线程会占用内存,所以根据内存分配。把处理器想象成一个队列(不恰当想象),那么多线程只是等待被执行而已。切记小心,防止oom。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。