在SpringBoot中使用RedisTemplate重新消费Redis Stream中未ACK的消息

消费组从stream中获取到消息后,会分配给自己组中其中的一个消费者进行消费,消费者消费完毕,需要给消费组返回ACK,表示这条消息已经消费完毕了。

当消费者从消费组获取到消息的时候,会先把消息添加到自己的pending消息列表,当消费者给消费组返回ACK的时候,就会把这条消息从pending队列删除。(每个消费者都有自己的pending消息队列)

消费者可能没有及时的返回ACK。例如消费者消费完毕后,宕机,没有及时返回ACK,此时就会导致这条消息占用2倍的内存(stream中保存一份, 消费者的的pending消息列表中保存一份)

关于Stream的基础姿势,可以先看看这篇贴帖子
如何在Springboot中使用Redis5的Stream

开始之前,通过Redis客户端模拟一点数据

1,新打开Redis客户端(我们称之为:生产端), 创建streamm,名称叫做:my_stream

XADD my_stream * hello world

随便添加一条消息,目的是为了初始化stream

2,创建一个消费组,名称叫做:my_group

XGROUP CREATE my_stream my_group $

3,再新启动一个Redis客户端(我们称之为:消费端1),使用消费组进行阻塞消费,指定消费者:my_consumer1

XREADGROUP GROUP my_group  my_consumer1  BLOCK 0 STREAMS my_stream >

4,再新启动一个Redis客户端(我们称之为:消费端2),使用消费组进行阻塞消费,指定消费者:my_consumer2

XREADGROUP GROUP my_group  my_consumer2  BLOCK 0 STREAMS my_stream >

5,通过生产端,推送3条消息

XADD my_stream * message1 Hello
XADD my_stream * message2 SpringBoot
XADD my_stream * message3 Community

生产端

image.png

消费端1

image.png

消费端2

image.png

可以看到,一共Push了3条消息,它们的ID分别是

  • 1605524648266-0 (message1 )
  • 1605524657157-0 (message2)
  • 1605524665215-0 (message3)

现在的状况是,消费者1,消费了2条消息(message1和message3),消费者2,消费了1条消息(message2)。都是消费成功了的,但是它们都还没有进行ACK

在客户端,消费者消费到一条消息后会立即返回,需要重新执行命令,来回到阻塞状态

ACK消息

现在我们打算,把消费者1,消费的那条message1进行ACK

XACK my_stream my_group  1605524648266-0

获取指定消费组中,待确认(ACK)的消息

查看消费组的所有待确认消息统计

127.0.0.1:6379> XPENDING my_stream  my_group
1) (integer) 2       # 消费组中,所有消费者的pending消息数量
2) "1605524657157-0" # pending消息中的,最小消息ID
3) "1605524665215-0" # pending消息中的,最大消息ID
4) 1) 1) "my_consumer1"  # 消费者1
      2) "1"             # 有1条待确认消息
   2) 1) "my_consumer2"  # 消费者2
      2) "1"             # 有2条待确认消息

查看消费者1的待确认消息详情

127.0.0.1:6379> XPENDING my_stream  my_group 0 + 10 my_consumer1
1) 1) "1605524665215-0"  # 待ACK消息ID
   2) "my_consumer1"     # 所属消费者
   3) (integer) 847437   # 消息自从被消费者获取后到现在过去的时间(毫秒) - idle time
   4) (integer) 1        # 消息被获取的次数 - delivery counter

这条命令,表示查询消费组my_group中消费者my_consumer1的opending队列,开始ID是0,结束ID是最大,最多检索10个结果。

现在的情况就是,一共3条消息,消费者1消费了2条,ack了1条。消费者2消费了1条,没有ack。消费者1和2,各自的pending队列中都有一条未ack的消息

如何实现将未被成功消费的消息获取出来重新进行消费?之前的演示,目的都是为了造一些数据,所以是用的客户端命令,从这里开始,所有的演示,都会使用spring-data-redis中的RedisTemplate

遍历消费者的pending列表,读取到未ACK的消息,直接进行ACK

import java.time.Duration;
import java.util.List;
import java.util.Map;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
import org.springframework.data.domain.Range;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.PendingMessages;
import org.springframework.data.redis.connection.stream.PendingMessagesSummary;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.core.StreamOperations;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.test.context.junit4.SpringRunner;

import io.springboot.jwt.SpringBootJwtApplication;

@RunWith(SpringRunner.class)
@SpringBootTest(classes = SpringBootJwtApplication.class, webEnvironment = WebEnvironment.RANDOM_PORT)
public class RedisStreamTest {
    
    private static final Logger LOGGER = LoggerFactory.getLogger(RedisStreamTest.class);
    
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    
    @Test
    public void test() {
        StreamOperations<String, String, String> streamOperations = this.stringRedisTemplate.opsForStream();
        
        // 获取my_group中的pending消息信息,本质上就是执行XPENDING指令
        PendingMessagesSummary pendingMessagesSummary = streamOperations.pending("my_stream", "my_group");
        
        // 所有pending消息的数量
        long totalPendingMessages = pendingMessagesSummary.getTotalPendingMessages();
        
        // 消费组名称
        String groupName= pendingMessagesSummary.getGroupName();
        
        // pending队列中的最小ID
        String minMessageId = pendingMessagesSummary.minMessageId();
        
        // pending队列中的最大ID
        String maxMessageId = pendingMessagesSummary.maxMessageId();
        
        LOGGER.info("消费组:{},一共有{}条pending消息,最大ID={},最小ID={}", groupName, totalPendingMessages, minMessageId, maxMessageId);
        

        // 每个消费者的pending消息数量
        Map<String, Long> pendingMessagesPerConsumer = pendingMessagesSummary.getPendingMessagesPerConsumer();
        
        pendingMessagesPerConsumer.entrySet().forEach(entry -> {
            
            // 消费者
            String consumer = entry.getKey();
            // 消费者的pending消息数量
            long consumerTotalPendingMessages = entry.getValue();
            
            LOGGER.info("消费者:{},一共有{}条pending消息", consumer, consumerTotalPendingMessages);
            
            if (consumerTotalPendingMessages > 0) {
                // 读取消费者pending队列的前10条记录,从ID=0的记录开始,一直到ID最大值
                PendingMessages pendingMessages = streamOperations.pending("my_stream", Consumer.from("my_group", consumer), Range.closed("0", "+"), 10);
                
                // 遍历所有Opending消息的详情
                pendingMessages.forEach(message -> {
                    // 消息的ID
                    RecordId recordId =  message.getId();
                    // 消息从消费组中获取,到此刻的时间
                    Duration elapsedTimeSinceLastDelivery = message.getElapsedTimeSinceLastDelivery();
                    // 消息被获取的次数
                    long deliveryCount = message.getTotalDeliveryCount();
                    
                    LOGGER.info("openg消息,id={}, elapsedTimeSinceLastDelivery={}, deliveryCount={}", recordId, elapsedTimeSinceLastDelivery, deliveryCount);
                    
                    /**
                     * 演示手动消费的这个判断非常的针对,目的就是要读取消费者“my_consumer1”pending消息中,ID=1605524665215-0的这条消息
                     */
                    if (consumer.equals("my_consumer1") && recordId.toString().equals("1605524665215-0")) {
                        // 通过streamOperations,直接读取这条pending消息,
                        List<MapRecord<String, String, String>> result = streamOperations.range("my_stream", Range.rightOpen("1605524665215-0", "1605524665215-0"));
                        
                        // 开始和结束都是同一个ID,所以结果只有一条
                        MapRecord<String, String, String> record = result.get(0);
                        
                        // 这里执行日志输出,模拟的就是消费逻辑
                        LOGGER.info("消费了pending消息:id={}, value={}", record.getId(), record.getValue());
                        
                        // 如果手动消费成功后,往消费组提交消息的ACK
                        Long retVal = streamOperations.acknowledge("my_group", record);
                        LOGGER.info("消息ack,一共ack了{}条", retVal);
                    }
                });
            }
        });
    }
}

这种方式就是,遍历消费组的pending消息情况,再遍历每个消费者的pending消息id列表,再根据id,直接去stream读取这条消息,进行消费Ack。

输出日志

消费组:my_group,一共有2条pending消息,最大ID=1605524657157-0,最小ID=1605524665215-0
消费者:my_consumer1,一共有1条pending消息
openg消息,id=1605524665215-0, elapsedTimeSinceLastDelivery=PT1H9M4.061S, deliveryCount=1
消费了pending消息:id=1605524665215-0, value={message3=Community}
消息ack,一共ack了1条
消费者:my_consumer2,一共有1条pending消息
openg消息,id=1605524657157-0, elapsedTimeSinceLastDelivery=PT1H9M12.172S, deliveryCount=1

最终的结果就是,消费者1的唯一一条pending消息被Ack了,这里有几个点要注意

  1. 遍历消费者pending列表时候,最小/大消息id,可以根据XPENDING指令中的结果来,我写0 - +,只是为了偷懒

  2. 遍历到消费者pending消息的时候,可以根据elapsedTimeSinceLastDelivery(idle time)和deliveryCount(delivery counter)做一些逻辑判断,elapsedTimeSinceLastDelivery越长,表示这条消息被消费了很久,都没Ack,deliveryCount表示重新投递N次后(下文会讲),都没被消费成功,可能是消费逻辑有问题,或者是Ack有问题。

再次查看XPENDING信息

127.0.0.1:6379> XPENDING my_stream  my_group 
1) (integer) 1
2) "1605524657157-0"
3) "1605524657157-0"
4) 1) 1) "my_consumer2"
      2) "1"

消费者1,唯1条待ack的消息看,已经被我们遍历出来手动消费,手动ack了,所以只剩下消费者2还有1条pending消息。。

通过XCLAIM改变消息的消费者

如果一个消费者,一直不能消费掉某条消息,或者说这个消费者因为某些消息,永远也不能上过线了,那么可以把这个消费者的pending消息,转移到其他的消费者pending列表中,重新消费

其实我们这里要做的事情,就是把“消费者2”的唯一1条pending消息“ 1605524657157-0”(message2),交给“消费者1”,重新进行消费。

Redis命令的实现

XCLAIM my_stream  my_group my_consumer1 10000 1605524657157-0

1605524657157-0这条消息,重新给my_group中的my_consumer1进行消费,前提条件是这条消息的idle time大于了10秒钟(从获取消息到现在超过10秒都没Ack)。

Java客户端的实现

import java.time.Duration;
import java.util.List;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.stream.ByteRecord;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.test.context.junit4.SpringRunner;

import io.springboot.jwt.SpringBootJwtApplication;

@RunWith(SpringRunner.class)
@SpringBootTest(classes = SpringBootJwtApplication.class, webEnvironment = WebEnvironment.RANDOM_PORT)
public class RedisStreamTest {
    
    private static final Logger LOGGER = LoggerFactory.getLogger(RedisStreamTest.class);
    
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    
    @Test
    public void test() {
        List<ByteRecord> retVal = this.stringRedisTemplate.execute(new RedisCallback<List<ByteRecord>>() {
            @Override
            public List<ByteRecord> doInRedis(RedisConnection connection) throws DataAccessException {
                // XCLAIM 指令的实现方法
                return connection.streamCommands().xClaim("my_stream".getBytes(), "my_group", "my_consumer1", Duration.ofSeconds(10), RecordId.of("1605524657157-0"));
            }
        });
        
        for (ByteRecord byteRecord : retVal) {
            LOGGER.info("改了消息的消费者:id={}, value={}", byteRecord.getId(), byteRecord.getValue());
        }
    }
}

日志输出

改了消息的消费者:id=1605524657157-0, value={[B@10b4f345=[B@63de4fa}

再次查看XPENDING信息

127.0.0.1:6379> XPENDING my_stream  my_group 
1) (integer) 1
2) "1605524657157-0"
3) "1605524657157-0"
4) 1) 1) "my_consumer1"
      2) "1"

可以看到,消息 “1605524657157-0”(message2),已经从“消费者2”名下,转移到了”消费者1”,接下来要做的事情,就是遍历“消费者1”的pending列表,消费掉它。

读取pending消息列表,进行消费

最开始在控制,演示了通过客户端,进行消费者阻塞消费的时候,写了一条命令

XREADGROUP GROUP my_group  my_consumer1  BLOCK 0 STREAMS my_stream >

其中最后那个>,表示ID,是一个特殊字符,如果不是,当ID不是特殊字符>时, XREADGROUP不再是从消息队列中读取消息, 而是从消费者的的pending消息列表中读取历史消息。(一般将参数设为0-0,表示读取所有的pending消息)

Redis命令

127.0.0.1:6379> XREADGROUP GROUP my_group  my_consumer1  BLOCK 0 STREAMS my_stream 0
1) 1) "my_stream"
   2) 1) 1) "1605524657157-0"
         2) 1) "message2"
            2) "SpringBoot"

读取到了,消费者1,pending消息中的唯一一条消息记录

Java实现

import java.util.List;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.StreamOperations;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.test.context.junit4.SpringRunner;

import io.springboot.jwt.SpringBootJwtApplication;

@RunWith(SpringRunner.class)
@SpringBootTest(classes = SpringBootJwtApplication.class, webEnvironment = WebEnvironment.RANDOM_PORT)
public class RedisStreamTest {
    
    private static final Logger LOGGER = LoggerFactory.getLogger(RedisStreamTest.class);
    
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    
    @SuppressWarnings("unchecked")
    @Test
    public void test() {
        
        StreamOperations<String, String, String> streamOperations = this.stringRedisTemplate.opsForStream();
        
        // 从消费者的pending队列中读取消息
        List<MapRecord<String, String, String>>  retVal = streamOperations.read(Consumer.from("my_group", "my_consumer1"), StreamOffset.create("my_stream", ReadOffset.from("0")));
        
        // 遍历消息
        for (MapRecord<String, String, String> record : retVal ) {
            // 消费消息
            LOGGER.info("消息id={}, 消息value={}", record.getId(), record.getValue());
            // 手动ack消息
            streamOperations.acknowledge("my_group", record);
        }
    }
}

这种方式,就是直接从消费者的pending队列中读取数据,手动进行消费,然后Ack

日志

消息id=1605524657157-0, 消息value={message2=SpringBoot}

再次查看XPENDING信息

127.0.0.1:6379> XPENDING my_stream  my_group 
1) (integer) 0
2) (nil)
3) (nil)
4) (nil)

没了,一条都没,全部已经Ack了。

死信

死信,就是一直没法被消费的消息,可以根据这个两个属性idle timedelivery counter 进行判断

idle time 当消息被消费者读取后,就会开始计时,如果一个pending消息的idle time很长,表示这消息,可能是在Ack时发生了异常,或者说还没来得及Ack,消费者就宕机了,导致一直没有被Ack,当消息发生了转移,它会清零,重新计时。

delivery counter,它表示转移的次数,每当一条消息的消费者发生变更的时候,它的值都会+1,如果一条pending消息的delivery counter值很大,表示它在多个消费者之间进行了多次转移都没法成功消费,可以人工的读取,消费掉。

最后

redis5的stream,可以说功能还是蛮强大(设计上狠狠借鉴了一把Kakfa)。如果应用规模并不大,需要一个MQ服务,我想Stream的你可以试试看,比起自己搭建kakfa,RocketMQ之类的,来的快当而且更好维护。


首发:https://springboot.io/t/topic/3001

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 222,183评论 6 516
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 94,850评论 3 399
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 168,766评论 0 361
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,854评论 1 299
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,871评论 6 398
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 52,457评论 1 311
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,999评论 3 422
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,914评论 0 277
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 46,465评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,543评论 3 342
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,675评论 1 353
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 36,354评论 5 351
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 42,029评论 3 335
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,514评论 0 25
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,616评论 1 274
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 49,091评论 3 378
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,685评论 2 360

推荐阅读更多精彩内容