Springboot2.0 集成redisTemplate实现发布/订阅功能

Redis 不仅提供一个NoSQL数据库,同时提供了一套消息系统,在开发过程中,应用场景非常多,根据不同的业务需求,可以实现相应的功能。
我的业务需求是,在分布式系统中,当其中一个节点产生一条消息时,需要同时通知其他节点做相应的处理。

相关类的解释:

RedisMessageListenerContainer

Redis订阅发布的监听容器,通过Redis的消息发布、订阅配置都在这里面实现

  • addMessageListener(MessageListenerAdapter,PatternTopic) 新增订阅频道及订阅者,订阅者必须有相关方法处理收到的消息
  • setTopicSerializer(RedisSerializer) 对频道内容进行序列化解析
MessageListenerAdapter

监听适配器

  • MessageListenerAdapter(Object , defaultListenerMethod) 创建监听适配器,绑定订阅接收器和接收消息的方法
RedisTemplate

Redis模版类

  • convertAndSend(String channel, Object message) 发布者向Redis发布消息

一、新增依赖

<dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

二、Redis连接信息配置

spring:
   redis:
        host: 127.0.0.1
        port: 6381
        ### Redis数据库索引(默认为0)
        database: 0
        ### 连接超时时间(毫秒)
        timeout: 60000ms
        password:
        lettuce:
            pool:
                ### 最大连接数(使用负值表示没有限制) 默认8
                max-active: 8
                ### 最小空闲连接  默认8
                min-idle: 0
                ### 连接池中的最大空闲连接 默认8
                max-idle: 8
                ### 连接池最大阻塞等待时间(使用负值表示没有限制)
                max-wait: -1ms

三、RedisConfig核心类,实现了Redis连接,订阅以及发布配置

@Configuration
public class RedisConfig {

    @Bean
    @SuppressWarnings("all")
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {

        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<String, Object>();
        redisTemplate.setConnectionFactory(redisConnectionFactory);

        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(objectMapper);

        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
        // key采用String的序列化方式
        redisTemplate.setKeySerializer(stringRedisSerializer);
        // hash的key也采用String的序列化方式
        redisTemplate.setHashKeySerializer(stringRedisSerializer);
        // valuevalue采用jackson序列化方式
        redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
        // hash的value采用jackson序列化方式
        redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);
        //开启事务支持
        redisTemplate.setEnableTransactionSupport(true);
        redisTemplate.afterPropertiesSet();
        return redisTemplate;
    }

    /**
     * Redis消息监听器容器
     * 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,
     * 该消息监听器通过反射技术调用消息订阅处理器的相关方法进行一些业务处理
     * @param connectionFactory
     * @return
     */
    @Bean
    public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
        RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
        redisMessageListenerContainer.setConnectionFactory(connectionFactory);

        /**
         * 加入消息监听器(可以加入多个主题监听器,监听器也可以监听多个主题)
         **/
        // 加入WebSocket监听器
        final String TOPIC_NAME = "TEST_TOPIC"; // 订阅主题
        MessageListenerAdapter webSocketListenerAdapter = webSocketListenerAdapter();
        redisMessageListenerContainer.addMessageListener(webSocketListenerAdapter, new PatternTopic(TOPIC_NAME));

        /**
         * 设置序列化对象
         * 特别注意:1. 发布的时候需要设置序列化;订阅方也需要设置序列化
         *                   2. 设置序列化对象必须放在[加入消息监听器]这一步后面,否则会导致接收器接收不到消息
         */
        Jackson2JsonRedisSerializer seria = new Jackson2JsonRedisSerializer(Object.class);
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        seria.setObjectMapper(objectMapper);
        redisMessageListenerContainer.setTopicSerializer(seria);

        return redisMessageListenerContainer;
    }

    /**
     * 绑定WebSocket消息推送接收器和接收方法
     */
    @Bean
    public MessageListenerAdapter webSocketListenerAdapter() {
        WebSocketReceiver webSocketReceiver = new WebSocketReceiver(); // 消息接收器
        final String RECEIVE_MESSAGE_METHOD = "receiveMessage"; // 消息接收器的方法名称
        return new MessageListenerAdapter(webSocketReceiver, RECEIVE_MESSAGE_METHOD);
    }
}

四、封装消息对象

@Data
public class MessageDTO implements Serializable {
    private String type;
    private String title;
    private String content;
}

五、消息接收器

public class WebSocketReceiver {

    /**
     * 接收WebSocket推送的消息并处理
     * @param message
     */
    public void receiveMessage(String message) {
        //序列化对象(特别注意:发布的时候需要设置序列化;订阅方也需要设置序列化)
        Jackson2JsonRedisSerializer seria = new Jackson2JsonRedisSerializer(WebSocketMessageDTO.class);
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        seria.setObjectMapper(objectMapper);
        MessageDTO message = (MessageDTO ) seria.deserialize(message.getBytes());
        // 接收到消息对象,自己实现相关的业务处理...
    }
}

六、测试发布消息

@RunWith(SpringRunner.class)
@SpringBootTest
public class TestRedis {
    @Resource
    private RedisTemplate redisTemplate;
    @Test
    public void test() {
        final String TOPIC_NAME = "TEST_TOPIC"; // 订阅主题
        MessageDTO message = new MessageDTO();
        message.setTitle("订阅发布测试...");
        // 发布消息
        redisTemplate.convertAndSend(TOPIC_NAME, message);
    }
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,293评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,604评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,958评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,729评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,719评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,630评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,000评论 3 397
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,665评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,909评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,646评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,726评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,400评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,986评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,959评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,197评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,996评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,481评论 2 342

推荐阅读更多精彩内容