Redis做轻量级消息队列的3种玩法

Redis也可以做轻量级的消息队列:基于List的队列模式、PubSub多播的发布订阅模式、以及5.0之后提供的Stream。
特别是Stream,可以消息持久化、高可用、消息可以指定offset进行反复消费、支持发布订阅按消费组多播、使用消费者的pending_ids机制保证消息传递的可靠性等等,基本已经具有了真正的消息队列中间件的功能了。而PubSub模式在一些对消息传递可靠性和可追溯性不严格的场景(比如内网非一致性的消息通知)也有一定的使用价值。

一、快速入门

1、用list类型模拟队列

典型的队列模式,rpush右边放入,lpop左边取出。举例:

127.0.0.1:7001> rpush queue A
-> Redirected to slot [13011] located at 122.51.112.187:7003
(integer) 1
122.51.112.187:7003> rpush queue B
(integer) 2
122.51.112.187:7003> rpush queue C
(integer) 3
122.51.112.187:7003> lpop queue
"A"
122.51.112.187:7003> lpop queue
"B"
122.51.112.187:7003> lpop queue
"C"
122.51.112.187:7003> lpop queue
(nil)
2、PUB|SUB

PubSub解决了list做队列1个消息只能被单个消费者消费的问题,可以1个消息被多个消费组的消费组收到,即所谓的发布订阅模式。

用redis-cli先来用一下:

127.0.0.1:7001> subscribe testTopic
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "testTopic"
3) (integer) 1

返回subscript testTopic 1意思是订阅testTopic成功,然后阻塞等待消息,再开一个终端连接进行publish:

127.0.0.1:7001> publish testTopic msg1
(integer) 1

然后订阅者侧连接收到消息:

1) "message"
2) "testTopic"
3) "msg1"

message testTopic msg1,意思是收到了message,主题是testTopic,内容是msg1

可以同时订阅多个主题,

subscribe testTopic1 testTopic2 testTopic3

也可以按照模式匹配的方式来订阅主题psubscribe,比如订阅所有以test开头的主题

psubscribe test*

PubSub的消息没有持久化,发布消息之后如果没有消费者、那么消息会直接丢弃,如果消费者刚好此时宕机、重启后也不会收到宕机时发布的消息,所以PubSub来做消息队列的场景十分有限。

Redis5.0之后提供了更强大的Redis Stream数据结构,解决了上述问题。

3、Redis Stream

发布订阅模式,一个消息发布到stream里,多个消费组订阅了这个stream的话都可以收到这个消息,且消息是持久化的。消费组在stream里用last_delivered_id来定位偏移量、也就是消费到哪个消息了。从设计上看确实跟kafka非常像,Redis作者也说借鉴了kafka。

从数据结构上来说,stream是个链表,节点是消息,消息有ID,消息内容是一系列的k-v对。stream也有消费组和消费者的概念,每个消费者组用last_delivered_id游标指向链表中的节点、来表示消费到哪个ID的消息了,类似kafka里的offset偏移量,每个消费者组里可以有多个消费者,一个消息只会被投递给消费者组里的一个消费者,可以类比RocketMQ里的集群消息。另外,每个消费者内部还有一个 pending_ids数组,它记录着这个消费者已经被客户端读取了的、但客户端没有回复ACK的消息,由此确保消息至少被消费1次。

添加消息和直接读取消息命令:

xadd testTopic * name douchuzi  #向testTopic这个stream添加"name douchuzi"这个消息,*表示消息ID由Redis自动生成

消息ID可以由Redis自动生成,生成的ID类似"1641816045243-0",表示1641816045243这个时间戳这1ms生成的第0个消息。当然消息ID也可以由客户端自己生成。

xlen testTopic #查看有多少消息
xrange testTopic - + #从头到尾返回所有消息
xread count 1 streams testTopic 0 #从ID大于0处开始读取1个消息
xread count 1 block 0 streams testTopic $ #阻塞的从ID大于$处开始读取1个消息,$表示最后一个消息的ID

消费组命令:

xgroup create testTopic group1 $ #创建消费组group1,其last_delivered_id是$

xgroup的完整玩法:

xgroup [CREATE key groupname id-or-$] [SETID key groupname id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]
#让消费者consumer1-1从group1消费者组的testTopic stream中拿最新的、并且没有被发送给其他消费者处理的entry
xreadgroup group group1 consumer1-1 count 1 block 0 streams testTopic >

发送ACK,刚才阻塞xreadgroup之后从别的客户端xadd了1条消息,然后xreadgroup阻塞结束,消息收到。这时候我们看下consumer1-1的pending_ids:

127.0.0.1:7001> xpending testTopic group1 - + 1000  consumer1-1
1) 1) "1641819182032-0"
   2) "consumer1-1"
   3) (integer) 695470
   4) (integer) 1

xpending查询消费组group1中消费者consumer1-1的1000条未收到客户端回复的消息。

接着我们xack回复一下,再看看pending_ids:

127.0.0.1:7001> xack testTopic group1 1641819182032-0
(integer) 1
127.0.0.1:7001> xpending testTopic group1 - + 1000  consumer1-1
(empty array)

可见,xack之后,consumer1-1的pending_ids为空了,Redis Stream用这个办法来确保消息一定被投递。

二、实战开发:基于SpringBoot开发Redis消息队列

下面具体实战一下,用SpringBoot来做Redis的消息队列开发,笔者使用的SpringBoot版本是2.3.7.RELEASE,其默认的客户端是lettuce 5.3.5.RELEASE,测试所用的Redis为6节点Cluster,可以参照笔者的文章Redis分布式缓存搭建 - 简书 (jianshu.com)进行搭建。

1、PubSub

发布消息,比较简单,直接用RedisTemplate:

@Service
public class RedisDao {
    
    @Autowired
    RedisTemplate<String, Object> redisTemplate;
    
    /**
     * PubSub发布消息
     * */
    public void publishMessage(ChannelTopic channelTopic, String message) {
        redisTemplate.convertAndSend(channelTopic.getTopic(), message);
    }
}

订阅消息,使用Spring提供的消息容器RedisMessageListenerContainer以及消息到达监听接口MessageListener

//配置PubSub消息容器
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer() {
    RedisMessageListenerContainer container = new RedisMessageListenerContainer();
    container.setConnectionFactory(redisConnectionFactory());
    
    /**这里可以使用自定义注解来发现所有的MessageHandler,
     * 然后循环container.addMessageListener来达到自动配置消息订阅者的目的
     * 这样开发只需要编写MessageHandler的实现类就可以了
     */
    MessageHandler handler = new MessageHandlerImpl();
    container.addMessageListener(new MessageListener() {

        @Override
        public void onMessage(Message message, byte[] pattern) {
            handler.handleMsg(message);
        }
        
    }, handler.getChannelTopic());
    
    return container;
}

MessageHandler接口及其实现是MessageHandlerImpl业务层代码:

/**
 * PubSub订阅消息
 * 
 * */
public interface MessageHandler {
    
    //订阅消息到达后的逻辑处理
    public void handleMsg(Message msg);
    
    //消息的Topic
    public ChannelTopic getChannelTopic();
}

@Slf4j
public class MessageHandlerImpl implements MessageHandler{

    @Override
    public void handleMsg(Message msg) {
        
        try {   
            String msgChannel = new String(msg.getChannel(), "utf-8");
            String msgBody = new String(msg.getBody(), "utf-8");
            log.info("收到消息:");
            log.info("Message channel : {}" , msgChannel);
            log.info("Message body : {}" , msgBody);
            
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
    }

    @Override
    public ChannelTopic getChannelTopic() {
        return new ChannelTopic("testPubSub");
    }

}

另外需要注意,Redis Cluster模式下如果客户端用的是Lettuce,需要配置客户端自适应刷新,在集群主备故障切换的时候、客户端能够自动切换到故障主节点对应的从节点去。详见笔者的文章Redis分布式缓存搭建 - 简书 (jianshu.com)

好了,测试一下:

@Slf4j
@SpringBootApplication
public class RedismqApplication {

    public static void main(String[] args) {
        ConfigurableApplicationContext context = SpringApplication.run(RedismqApplication.class, args);
        RedisDao redisDao = context.getBean(RedisDao.class);
        redisDao.publishMessage(new ChannelTopic("testPubSub"), "肥兔子爱豆畜子");
    }

}

收到消息:
Message channel : testPubSub
Message body : 肥兔子爱豆畜子

2、Redis Stream

跟PubSub类似,也是需要消息容器MessageContainer、Listener这俩东西。

/**
 * 发送消息到指定stream
 * */
public void publishStreamMessage(String stream, Object message) {
    ObjectRecord<String, String> record =                 StreamRecords.newRecord().ofObject(JSON.toJSONString(message)).withStreamKey(stream);
    RecordId recordId = stringRedisTemplate.opsForStream().add(record);
    log.info("消息已发送,消息ID:{}" , recordId.getValue());
}

消息监听,实现StreamListener接口:

import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.stream.StreamListener;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class StreamMessageListener implements StreamListener<String, ObjectRecord<String,String>>{
    
    private StringRedisTemplate redisTemplate;

    public StreamMessageListener(StringRedisTemplate stringRedisTemplate) {
        redisTemplate = stringRedisTemplate;
    }
    
    @Override
    public void onMessage(ObjectRecord<String, String> message) {

        RecordId id = message.getId();
        String topic = message.getStream();
        String msgBody = message.getValue();
        
        log.info("收到主题{}消息ID={}, 消息内容{}", topic, id.getValue(), msgBody);
        
        String group = "some-service"; //消费组,使用服务名
        redisTemplate.opsForStream().acknowledge(topic, group, id.getValue());
    }

}

配置消息容器,将StreamListener的实现注册到消息容器StreamMessageListenerContainer

@Configuration
public class RedisStreamListenerContainerConfig {
    
    @Autowired
    private RedisConnectionFactory redisConnectionFactory;
    
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    
    @Bean
    public StreamMessageListenerContainer redisStreamListenerContainer() {
        StreamMessageListenerContainerOptions options = 
                StreamMessageListenerContainerOptions.builder()
                                                    .batchSize(100)
                                                    .pollTimeout(Duration.ZERO)
                                                    .targetType(String.class)
                                                    .build();
        StreamMessageListenerContainer container = StreamMessageListenerContainer.create(redisConnectionFactory, options);
        
        String GroupName = "some-service";  //消费组命名一般用服务名
        String consumerName = "127.0.0.1:8080"; //消费者命名一般用服务集群下每个节点的ip:port,可以区分是哪个节点消费
        String stream = "testTopic"; //stream名称,即topic
        
        container.receive(Consumer.from(GroupName, consumerName), 
                            StreamOffset.create(stream, ReadOffset.lastConsumed()),
                            new StreamMessageListener(stringRedisTemplate)); //将Listener添加到监听容器
        
        container.start(); //启动消息容器
        
        return container;
    }
}

测试:

消息已发送,消息ID:1641872849111-0

收到主题testTopic消息ID=1641872849111-0, 消息内容{"name":"stream-肥兔子爱豆畜子"}

总结说明:

1、为了方便消息的格式笔者统一用了String类型,用fastjson做序列化以后进行传输。

2、StreamListener.onMessage收到消息进行处理以后,手工调用ack进行回复,不然服务端给当前消费者缓存的pending_ids会越来越大、占用内存。

3、消费组笔者一般用服务名来区分,服务下挂多个节点,那么每个节点可以用ip:port作为唯一标识,所以用ip:port作为消费组下的消费组名称。

参考:《Redis深度历险:核心原理与应用实践》

       [Redis 的发布订阅功能在 SpringBoot 中的应用 - 知乎 (zhihu.com)](https://zhuanlan.zhihu.com/p/59065399)

       [Stream消息队列在SpringBoot中的实践与踩坑 | (lolico.me)](https://lolico.me/2020/06/28/Using-stream-to-implement-message-queue-in-springboot/)

      [redis — 基于Spring Boot实现Redis stream实时流事件处理_Haqiu.Hwang的博客-CSDN博客](https://blog.csdn.net/qq_38658567/article/details/109376888)
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 222,183评论 6 516
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 94,850评论 3 399
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 168,766评论 0 361
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,854评论 1 299
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,871评论 6 398
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 52,457评论 1 311
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,999评论 3 422
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,914评论 0 277
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 46,465评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,543评论 3 342
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,675评论 1 353
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 36,354评论 5 351
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 42,029评论 3 335
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,514评论 0 25
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,616评论 1 274
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 49,091评论 3 378
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,685评论 2 360

推荐阅读更多精彩内容