Redis的发布/订阅模型
Redis 通过 PUBLISH 、 SUBSCRIBE 等命令实现了订阅与发布模式, 这个功能提供两种信息机制, 分别是订阅/发布到频道和订阅/发布到模式, 本文讨论订阅/发布到频道的实现
该种模型类似于RocketMQ中广播模式,消费者订阅topic
如图展示了发布消息到channel1
后,各个client都会接收到message
虽然Redis能够实现发布/订阅的功能,但是有如下缺点,所以选用前需谨慎考虑
- 1.消息无法持久化,存在丢失风险
和常规的MQ不同,redis实现的发布/订阅模型消息无法持久化,一经发布,即使没有任何订阅方处理,该条消息就会丢失
- 2.没有类似ACK的机制
即发布方不会确保订阅方成功接收
- 3.广播机制,下游消费能力取决于消费方本身
广播机制无法通过添加多个消费方增强消费能力,因为这和发布/订阅模型本身的目的是不符的.广播机制的目的是一个一个发布者被多个订阅进行不同的处理
Redis发布/订阅应用场景
由于Redis发布/订阅模型存在的缺陷,所以使用前需要考虑如下几点
- 1.对于消息处理可靠性要求不强
- 2.消费能力无需通过增加消费方进行增强
考虑如上两点后,可以想到的场景有如下 - 1.用户注册后,发送相关优惠信息
- 2.用户修改名称,由于有业务表对用户名称进行了字段冗余,通过订阅修改名称的channel,触发各个业务表的字段修改
具体使用还是需要考虑业务场景需求
SpringBoot使用Redis的发布/订阅功能
在目前SpringBoot使用Redis的操作中,官方推荐使用SpringData模块中的spring-data-redis
,所以下文会以spring-data-redis
进行
下文需要对Springboot工程有一定的基础认识
1.Maven依赖redis组件
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-redis</artifactId>
</dependency>
2.redis序列化相关配置
序列化使用的是GenericJackson2JsonRedisSerializer
,使用这个类可以正确序列化Null的对象.如果使用Jackson2JsonRedisSerializer
,会将对象序列号成空数组.
@Configuration
public class RedisConfig {
@Bean
public RedisTemplate<Object, Object> redisTemplate(RedisSerializer<Object> redisSerializer, RedisConnectionFactory redisConnectionFactory) {
RedisTemplate<Object, Object> template = new RedisTemplate<>();
template.setConnectionFactory(redisConnectionFactory);
template.setKeySerializer(RedisSerializer.string());
template.setDefaultSerializer(redisSerializer);
return template;
}
@Bean
public RedisSerializer<Object> redisSerializer(){
GenericJackson2JsonRedisSerializer redisSerializer = new GenericJackson2JsonRedisSerializer();
return redisSerializer;
}
}
3.配置发布方法
简单起见,在这里使用SpringSchedule,周期性发布消息
@EnableScheduling
@Component
public class RedisPublisher {
private static final Logger log = LoggerFactory.getLogger(RedisPublisher.class);
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
private AtomicInteger incrInteger = new AtomicInteger();
@Scheduled(initialDelay = 500, fixedDelay = 10000)
public void publish() {
int incrementAndGet = incrInteger.incrementAndGet();
String topic = "redis/test";
String message = "current num : " + incrementAndGet;
log.info("发布消息..topic:{},内容:{}", topic, message);
redisTemplate.convertAndSend(topic, message);
}
}
4.订阅配置
在订阅程序中,有两个比较重要的类,分别是MessageListenerAdapter
和RedisMessageListenerContainer
其中MessageListenerAdapter
实现MessageListener
作用是将自定义的消费类进行适配.这个类必须有一个public
的消费方法,并且方法需要有两个参数,arg1为channel
,arg2是Message
.原因可以在MessageListenerAdapter
源码中发现
4.1MessageListenerAdapter和一个自定义消费类
在MessageListenerAdapter.onMessage
方法中,通过反射对消费类进行了方法调用,并且方法的参数和顺序进行了硬编码,所以必须在消费类中提供一个public方法
4.2RedisMessageListenerContainer
从官方文档中,可以得知RedisMessageListenerContainer
的作用是用于接收消息后进行分发,并且通过内部的线程池进行异步分发,(也可以使用自定义的线程池和相关失败策略)
4.3完整订阅配置
@Configuration
public class ConsumerConfig {
@Bean
public MessageListenerAdapter processorOne(RedisSerializer<Object> serializer, RedisConsumer redisConsumer) {
MessageListenerAdapter adapter = new MessageListenerAdapter(redisConsumer, "onMessage");
adapter.setSerializer(serializer);
return adapter;
}
/**
*
* 支持动态添加监听
*
* @param adapter
* @return
*/
@Bean
public RedisMessageListenerContainer messageListenerContainer(RedisConnectionFactory redisConnectionFactory,
MessageListenerAdapter adapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(redisConnectionFactory);
//制定topic的序列化方式,String
container.setTopicSerializer(RedisSerializer.string());
//添加监听
container.addMessageListener(adapter, new PatternTopic("redis/**"));
return container;
}
}
启动发布者和订阅者,查看日志
启动一个发布者,两个订阅者
由于先启动的发布者,所以部分已经发布的消息,会直接被丢弃,这也是Redis发布订阅模型的一个缺点