一般我们使用redis最多的场景还是作为缓存中间件使用,redis也能做为消息队列使用,但这不是Redis的强项,不过如果需要的话还是可以使用的。
redis的发布订阅
集成到springboot中
- 引入redis starter
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
- 新建一个RedisMessageListenerConfig
创建 RedisMessageListenerConfig之间要先自定义定义一个接口RedisPubSub接口,这个接口用于处理收到的信息,如果实现发布订阅只需实现这个接口即可。
public interface RedisPubSub {
/**
* 接收消息
* @param message
*/
void receiveMessage(String message);
/**
* 发布订阅监听的topic key
* @return
*/
CacheKeyEnum getCacheKeyEnum();
}
public enum CacheKeyEnum {
/**
* 消息订阅
*/
PUBSUB_QUEUE("pubsub:queue" , 0L),
;
// 缓存键名
private String key;
/**
* 过期时间,单位秒
* 0 表示不过期
*/
private Long expireTime;
//省略getter、setter
}
RedisMessageListenerConfig.java
@Component
public class RedisMessageListenerConfig {
// 如果项目中没有RedisPubSub实现类,启动会报错,所以设置required = false
@Autowired(required = false)
private Set<RedisPubSub> redisPubSubs;
/**
* 创建连接工厂
* @return
*/
@Bean
public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory , Map<? extends MessageListener, Collection<? extends Topic>> listenerAdapters){
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setMessageListeners(listenerAdapters);
return container;
}
@Bean
public Map<MessageListener, Collection<Topic>> listenerAdapters(){
if (!CollectionUtils.isEmpty(redisPubSubs)) {
Map<MessageListener, Collection<Topic>> map = new HashMap<>(redisPubSubs.size());
for (RedisPubSub redisPubSub : redisPubSubs) {
final CacheKeyEnum cacheKeyEnum = redisPubSub.getCacheKeyEnum();
// redis会利用反射调用receiveMessage方法
final MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(redisPubSub, "receiveMessage");
messageListenerAdapter.afterPropertiesSet();
map.put(messageListenerAdapter , Collections.singletonList(new PatternTopic(cacheKeyEnum.getKey())));
}
return map;
}
return Collections.emptyMap();
}
}
准备redis工具类:
@Slf4j
@Component
public class RedisUtil {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
/**
* 消息发布
* @param cacheKeyEnum
* @param message
*/
public void publish(CacheKeyEnum cacheKeyEnum , Object message){
redisTemplate.convertAndSend(cacheKeyEnum.getKey() , message);
}
/**
* 反序列化redis数据
* @param value
* @param <T>
* @return
*/
public <T> T deserialize(String value){
final RedisSerializer<?> valueSerializer = redisTemplate.getValueSerializer();
final Object deserialize = valueSerializer.deserialize(value.getBytes());
return deserialize == null ? null : (T) deserialize;
}
}
集成测试
- 实现RedisPubSub
@Component
public class PubsubQueue implements RedisPubSub {
@Autowired
private RedisUtil redisUtil;
/**
* 接收消息
*
* @param message
*/
@Override
public void receiveMessage(String message) {
System.out.println(message);
final UserEntity deserialize = redisUtil.deserialize(message);
System.out.println("getId="+deserialize.getId());
}
/**
* 发布订阅监听的topic key
*
* @return
*/
@Override
public CacheKeyEnum getCacheKeyEnum() {
return CacheKeyEnum.PUBSUB_QUEUE;
}
}
- 单元测试
@Test
public void testRedisQueue(){
UserEntity userEntity = new UserEntity();
userEntity.setId(123456L);
redisUtil.publish(CacheKeyEnum.PUBSUB_QUEUE , userEntity);
}
控制台输出:
使用redis发布订阅的注意点:
RedisPubSub 中receiveMessage接收的参数是String类型,redis在发布订阅中接收到的对象是字节数组,控制台打印是一个json格式的,如果redis用的是默认的JdkSerializationRedisSerializer序列化类,直接想通过将String转成JSON是不行的,会报错的,而且如果用的是jdk的序列化类,要发布的对象必须实现Serializable接口,否则也会报错。
所以可以通过上面的redisUtil中的反序列化方法来进行对象的转化,这样不管是不是用的是什么序列化类都不会报错。
Redis发布订阅的缺点:
消息不持久化,一旦订阅者没收到消息或者重启,消息将丢失。
相对于专业的消息中间件来说,Redis的发布订阅相对简单,慎用即可。
能力一般,水平有限,如有错误,请多指出。
如果对你有用点个关注给个赞呗