Redis自带发布订阅模式的消息队列,只需要在Springboot中集成Redis,监听topic即可。
Redis发布订阅
集成Redis
引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
配置RedisTemplate
@Configuration
public class RedisConfig {
@Bean
public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate<Object, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(redisConnectionFactory);
// 设置Key使用String序列化
redisTemplate.setKeySerializer(new StringRedisSerializer());
return redisTemplate;
}
}
订阅监听类
订阅者
public class SubscribeListener implements MessageListener {
/**
* 订阅接收发布者的消息
*/
@Override
public void onMessage(Message message, byte[] pattern) {
// 缓存消息是序列化的,需要反序列化。然而new String()可以反序列化,但静态方法valueOf()不可以
System.out.println(new String(pattern) + "主题发布:" + new String(message.getBody()));
}
}
发布者
@Component
public class PublishService {
@Autowired
StringRedisTemplate redisTemplate;
/**
* 发布方法
* @param channel 消息发布订阅 主题
* @param message 消息信息
*/
public void publish(String channel, Object message) {
// 该方法封装的 connection.publish(rawChannel, rawMessage);
redisTemplate.convertAndSend(channel, message);
}
}
添加监听主题
在RedisConfig中配置监听test-topic主题
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(redisConnectionFactory);
// 添加订阅者监听类,数量不限.PatternTopic定义监听主题,这里监听test-topic主题
container.addMessageListener(new SubscribeListener(), new PatternTopic("test-topic"));
return container;
}
发布订阅测试
@RunWith(SpringRunner.class)
@SpringBootTest
public class RedisMqPsApplicationTests {
@Autowired
private PublishService service;
@Test
public void contextLoads() {
for (int i = 0; i < 10; i++) {
service.publish("test-topic", "hello~~~" + i);
}
}
}