springboot整合redis消息队列

前言

消息队列作为一种常用的异步通信解决方案,而redis是一款高性能的nosql产品,今天就给大家介绍一下,如何使用redis实现消息队列,并整合到springboot。

两个消息模型

  1. 队列模型
    队列模型如图所示,它具有以下几个特点,就像我们用微信和好友(群聊除外)聊天一样,微信就是这个队列,我们可以和很多个好友聊天,但是每条消息只能发给一个好友。
  • 只有一个消费者将获得消息
  • 生产者不需要在接收者消费该消息期间处于运行状态,接收者也同样不需要在消息发送时处于运行状态。
  • 每一个成功处理的消息都由接收者签收
队列模型
  1. 发布/订阅模型
    发布/订阅模型如图所示,不用说,和订阅公众号是一样的。
  • 多个消费者可以获得消息
  • 在发布者和订阅者之间存在时间依赖性。发布者需要建立一个topic,以便客户能够购订阅。订阅者必须保持持续的活动状态以接收消息,除非订阅者建立了持久的订阅。在那种情况下,在订阅者未连接时发布的消息将在订阅者重新连接时重新发布。
发布/订阅模型

redis如何实现

  1. 对于队列模型,我们可以使用redis的list数据结构,通过LPUSH和RPOP来实现一个队列。
  2. 发布/订阅模型就更简单了,redis官方就支持,而且还可以使用PSUBSCRIBE支持模式匹配,使用如下命令,即可订阅所有f开头的订阅,具体可查看文档
PSUBSCRIBE f*
  1. keyspace notifications(键空间通知)
    该功能是在redis2.8之后引入的,即客户端可以通过pub/sub机制,接收key的变更的消息。换句话说,就是redis官方提供了一些topic,帮助我们去监听redis数据库中的key,我曾经就使用其中的'keyevent@0:expired'实现了定时任务。

和spring boot整合

首先得介绍一下spring-data-redis中的两种template的默认serializer,当然spring还提供其他的序列化器,具体可查看文档,也可以自己实现RedisSerializer接口,构建自己的序列化器。

template default serializer serialization
RedisTemplate JdkSerializationRedisSerializer 序列化String类型的key和value
StringRedisTemplate StringRedisSerializer 使用Java序列化

终于到了写代码的时候了,先从发布/订阅说起吧,因为spring官方给了示例。但是呢,示例里面的消息是String类型,对于我们的业务来说,可能更需要一个POJO,所以还需要改造一下,走起。

  1. 先学习下org.springframework.data.redis.listener.adapter.MessageListenerAdapter源码如下,可以看到,如果使用StringRedisTemplate的话,默认都是使用StringRedisSerializer来反序列化,而如果想主动接收消息,则需要实现MessageListener接口。
    /**
     * Standard Redis {@link MessageListener} entry point.
     * <p>
     * Delegates the message to the target listener method, with appropriate conversion of the message argument. In case
     * of an exception, the {@link #handleListenerException(Throwable)} method will be invoked.
     * 
     * @param message the incoming Redis message
     * @see #handleListenerException
     */
    public void onMessage(Message message, byte[] pattern) {
        try {
            // Check whether the delegate is a MessageListener impl itself.
            // In that case, the adapter will simply act as a pass-through.
            if (delegate != this) {
                if (delegate instanceof MessageListener) {
                    ((MessageListener) delegate).onMessage(message, pattern);
                    return;
                }
            }

            // Regular case: find a handler method reflectively.
            Object convertedMessage = extractMessage(message);
            String convertedChannel = stringSerializer.deserialize(pattern);
            // Invoke the handler method with appropriate arguments.
            Object[] listenerArguments = new Object[] { convertedMessage, convertedChannel };

            invokeListenerMethod(invoker.getMethodName(), listenerArguments);
        } catch (Throwable th) {
            handleListenerException(th);
        }
    }

    /**
     * Extract the message body from the given Redis message.
     * 
     * @param message the Redis <code>Message</code>
     * @return the content of the message, to be passed into the listener method as argument
     */
    protected Object extractMessage(Message message) {
        if (serializer != null) {
            return serializer.deserialize(message.getBody());
        }
        return message.getBody();
    }

    /**
     * Initialize the default implementations for the adapter's strategies.
     * 
     * @see #setSerializer(RedisSerializer)
     * @see JdkSerializationRedisSerializer
     */
    protected void initDefaultStrategies() {
        RedisSerializer<String> serializer = new StringRedisSerializer();
        setSerializer(serializer);
        setStringSerializer(serializer);
    }
  1. 将StringRedisTemplate替换为RedisTemplate
    @Bean(name = "redisTemplate")
    RedisTemplate<?, ?> template(RedisConnectionFactory connectionFactory) {
        RedisTemplate<?, ?> template = new RedisTemplate<>();
        template.setKeySerializer(new StringRedisSerializer());
        template.setValueSerializer(new JdkSerializationRedisSerializer());
        template.setConnectionFactory(connectionFactory);
        return template;
    }
  1. 改造Receiver
public class Receiver implements MessageListener {

    @Override
    public void onMessage(Message message, byte[] pattern) {
        RedisSerializer serializer = new JdkSerializationRedisSerializer();
        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
        //接收的topic
        String channel = stringRedisSerializer.deserialize(message.getChannel());
        //消息的POJO
        Object o = serializer.deserialize(message.getBody());
    }
}

至此,发布/订阅的整合改造就完成了。

接下来就是消息队列了,这个就需要自己造轮子了,在spring中使用redisTemlate操作数据库,而对于不同的数据类型则需要不同的操作方式,如下表格所示,具体还是请看官方文档
实现队列选择list数据结构,redisTemplate.opsForList()使用起来非常简单,和redis命令基本一致。

数据类型 操作方式
string redisTemplate.opsForValue()
hash redisTemplate.opsForHash()
list redisTemplate.opsForList()
set redisTemplate.opsForSet()
  1. 先定义一个消息的POJO
import java.io.Serializable;

public class Message implements Serializable{

    private String id;

    private String content;

    public Message() {
    }

    public Message(String id, String content) {
        this.id = id;
        this.content = content;
    }


    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getContent() {
        return content;
    }

    public void setContent(String content) {
        this.content = content;
    }

    @Override
    public String toString() {
        return "Message{" +
                "id='" + id + '\'' +
                ", content='" + content + '\'' +
                '}';
    }
}
  1. 消息的生产者,这个类提供一个发送消息的方法。
@Service
public class MessageProducerService {

    @Autowired
    RedisTemplate<String, Message> redisTemplate;

    public Long sendMessage(Message message) {
        System.out.println("发送了"+ message);
        return redisTemplate.opsForList().leftPush("queue", message);
    }

}
  1. 消息的消费者,消费者需要不断轮询队列,有消息便取出来,实现方式如下:
@Service
public class MessageConsumerService extends Thread {

    @Autowired
    RedisTemplate<String, Message> redisTemplate;


    @Override
    public void run() {
        while (true){
            Message message = redisTemplate.opsForList().rightPop("queue", 1000L, TimeUnit.SECONDS);
            System.out.println("接收到了" + message);
        }
    }
}

至此,消息队列的方式也整合完成了。
虽然redisTemplate是线程安全的,但是如果一个队列有多个接收者的话,可能也还需要考虑一下并发的问题。
最后有什么问题,欢迎大家给我留言,谢谢。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,236评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,867评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,715评论 0 340
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,899评论 1 278
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,895评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,733评论 1 283
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,085评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,722评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,025评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,696评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,816评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,447评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,057评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,009评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,254评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,204评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,561评论 2 343

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,598评论 18 139
  • Spring Boot 参考指南 介绍 转载自:https://www.gitbook.com/book/qbgb...
    毛宇鹏阅读 46,745评论 6 342
  • 今天2月4日寒假的最后一天,开学前一天,一大早妈妈把我叫起来唠唠叨叨没完。让我刷牙,洗脸,剪指甲,洗头,整理...
    小太阳教室张耿嘉阅读 382评论 0 0
  • 忘了多拍几张了ㄟ( ▔, ▔ )ㄏ,就发个成品吧
    桅笑阅读 224评论 0 1
  • 这里简单介绍下 Linux 下 Phalcon 的安装及配置,更多内容请查看官网安装介绍 Installation...
    野尘lxw阅读 973评论 0 0