基于Redission使用Redis的Stream

        redisson = Redisson.create(config);

        RStream<String, String> stream = redisson.getStream("test3");
        //初始化,不知为啥,但不这样做create不到group
        stream.add("0","0");
        //创建一个group,一个group需要在stream数据添加前创建,否则这个group只能读它创建以后写入stream的数据
        stream.createGroup("testGroup31");
        //往stream添加消息
        for(Integer i=0;i<30;i++){
            stream.add(i.toString(), i.toString());
        }

        //消费消息
        for(Integer i=0;i<6;i++){
            Integer finalI = i;
            Thread t = new Thread( ()->{
                try {
                    Thread.sleep(1000);
                }catch (Exception e){

                }

                Map<StreamMessageId, Map<String, String>> s = stream.readGroup("testGroup31", "consumer"+ finalI.toString(),1);
                if(s!=null && s.size()>0){
                    for (Map.Entry<StreamMessageId, Map<String, String>> entry : s.entrySet()) {
                        Map<String, String> m2 = entry.getValue();
                        for(Map.Entry<String,String> entry1:m2.entrySet()){
                            System.out.println(Thread.currentThread().getName()+" : Key = " + entry1.getKey() + ", Value = " + entry1.getValue());
                        }
                        //消费了消息,要应答一下
                        stream.ack("testGroup31",entry.getKey());
                        //如果消费了消息想删除,可以删除掉
                        //stream.remove(entry.getKey());
                    }
                }

            });
            t.start();
        }
image.png
image.png

这里,有个group名字叫testGroup31,里面有消费者6个,pending代表目前有6个数据被读取了,但没有ack。last_delivered-id代表这个group目前读到哪条消息。

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • 关于Mongodb的全面总结 MongoDB的内部构造《MongoDB The Definitive Guide》...
    中v中阅读 32,101评论 2 89
  • 从三月份找实习到现在,面了一些公司,挂了不少,但最终还是拿到小米、百度、阿里、京东、新浪、CVTE、乐视家的研发岗...
    时芥蓝阅读 42,422评论 11 349
  • ORACLE自学教程 --create tabletestone ( id number, --序号usernam...
    落叶寂聊阅读 1,170评论 0 0
  • 《行旅河山》,一部散文集,出自汪志敏之手。 文章多是一些旧事的回忆,整理成册也颇有一番趣味,让人细细品味。人生其实...
    jessewu233阅读 329评论 0 0
  • 昨天开始头有些隐隐作痛,没有在意,今天下午开始持续痛到现在,再一次深刻的认识到身体是革命的本钱。总是等到生病的时候...
    泡沫XY阅读 135评论 0 0