Springboot集成Redis实现消息队列-发布订阅

Redis自带发布订阅模式的消息队列,只需要在Springboot中集成Redis,监听topic即可。
Redis发布订阅

集成Redis

引入依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

配置RedisTemplate

@Configuration
public class RedisConfig {

    @Bean
    public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
        RedisTemplate<Object, Object> redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(redisConnectionFactory);
        // 设置Key使用String序列化
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        return redisTemplate;
    }
}

订阅监听类

订阅者

public class SubscribeListener implements MessageListener {
    /**
     * 订阅接收发布者的消息
     */
    @Override
    public void onMessage(Message message, byte[] pattern) {
        // 缓存消息是序列化的,需要反序列化。然而new String()可以反序列化,但静态方法valueOf()不可以
        System.out.println(new String(pattern) + "主题发布:" + new String(message.getBody()));
    }
}

发布者

@Component
public class PublishService {
    @Autowired
    StringRedisTemplate redisTemplate;

    /**
     * 发布方法
     * @param channel 消息发布订阅 主题
     * @param message 消息信息
     */
    public void publish(String channel, Object message) {
        // 该方法封装的 connection.publish(rawChannel, rawMessage);
        redisTemplate.convertAndSend(channel, message);
    }
}

添加监听主题

在RedisConfig中配置监听test-topic主题

 @Bean
    public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(redisConnectionFactory);
        // 添加订阅者监听类,数量不限.PatternTopic定义监听主题,这里监听test-topic主题
        container.addMessageListener(new SubscribeListener(), new PatternTopic("test-topic"));
        return container;
    }

发布订阅测试

@RunWith(SpringRunner.class)
@SpringBootTest
public class RedisMqPsApplicationTests {
    @Autowired
    private PublishService service;

    @Test
    public void contextLoads() {
        for (int i = 0; i < 10; i++) {
            service.publish("test-topic", "hello~~~" + i);
        }
    }

}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。