前言
消息队列作为一种常用的异步通信解决方案,而redis是一款高性能的nosql产品,今天就给大家介绍一下,如何使用redis实现消息队列,并整合到springboot。
两个消息模型
-
队列模型
队列模型如图所示,它具有以下几个特点,就像我们用微信和好友(群聊除外)聊天一样,微信就是这个队列,我们可以和很多个好友聊天,但是每条消息只能发给一个好友。
- 只有一个消费者将获得消息
- 生产者不需要在接收者消费该消息期间处于运行状态,接收者也同样不需要在消息发送时处于运行状态。
- 每一个成功处理的消息都由接收者签收
-
发布/订阅模型
发布/订阅模型如图所示,不用说,和订阅公众号是一样的。
- 多个消费者可以获得消息
- 在发布者和订阅者之间存在时间依赖性。发布者需要建立一个topic,以便客户能够购订阅。订阅者必须保持持续的活动状态以接收消息,除非订阅者建立了持久的订阅。在那种情况下,在订阅者未连接时发布的消息将在订阅者重新连接时重新发布。
redis如何实现
- 对于队列模型,我们可以使用redis的list数据结构,通过LPUSH和RPOP来实现一个队列。
- 发布/订阅模型就更简单了,redis官方就支持,而且还可以使用PSUBSCRIBE支持模式匹配,使用如下命令,即可订阅所有f开头的订阅,具体可查看文档。
PSUBSCRIBE f*
-
keyspace notifications(键空间通知)
该功能是在redis2.8之后引入的,即客户端可以通过pub/sub机制,接收key的变更的消息。换句话说,就是redis官方提供了一些topic,帮助我们去监听redis数据库中的key,我曾经就使用其中的'keyevent@0:expired'实现了定时任务。
和spring boot整合
首先得介绍一下spring-data-redis中的两种template的默认serializer,当然spring还提供其他的序列化器,具体可查看文档,也可以自己实现RedisSerializer接口,构建自己的序列化器。
template | default serializer | serialization |
---|---|---|
RedisTemplate | JdkSerializationRedisSerializer | 序列化String类型的key和value |
StringRedisTemplate | StringRedisSerializer | 使用Java序列化 |
终于到了写代码的时候了,先从发布/订阅说起吧,因为spring官方给了示例。但是呢,示例里面的消息是String类型,对于我们的业务来说,可能更需要一个POJO,所以还需要改造一下,走起。
- 先学习下org.springframework.data.redis.listener.adapter.MessageListenerAdapter源码如下,可以看到,如果使用StringRedisTemplate的话,默认都是使用StringRedisSerializer来反序列化,而如果想主动接收消息,则需要实现MessageListener接口。
/**
* Standard Redis {@link MessageListener} entry point.
* <p>
* Delegates the message to the target listener method, with appropriate conversion of the message argument. In case
* of an exception, the {@link #handleListenerException(Throwable)} method will be invoked.
*
* @param message the incoming Redis message
* @see #handleListenerException
*/
public void onMessage(Message message, byte[] pattern) {
try {
// Check whether the delegate is a MessageListener impl itself.
// In that case, the adapter will simply act as a pass-through.
if (delegate != this) {
if (delegate instanceof MessageListener) {
((MessageListener) delegate).onMessage(message, pattern);
return;
}
}
// Regular case: find a handler method reflectively.
Object convertedMessage = extractMessage(message);
String convertedChannel = stringSerializer.deserialize(pattern);
// Invoke the handler method with appropriate arguments.
Object[] listenerArguments = new Object[] { convertedMessage, convertedChannel };
invokeListenerMethod(invoker.getMethodName(), listenerArguments);
} catch (Throwable th) {
handleListenerException(th);
}
}
/**
* Extract the message body from the given Redis message.
*
* @param message the Redis <code>Message</code>
* @return the content of the message, to be passed into the listener method as argument
*/
protected Object extractMessage(Message message) {
if (serializer != null) {
return serializer.deserialize(message.getBody());
}
return message.getBody();
}
/**
* Initialize the default implementations for the adapter's strategies.
*
* @see #setSerializer(RedisSerializer)
* @see JdkSerializationRedisSerializer
*/
protected void initDefaultStrategies() {
RedisSerializer<String> serializer = new StringRedisSerializer();
setSerializer(serializer);
setStringSerializer(serializer);
}
- 将StringRedisTemplate替换为RedisTemplate
@Bean(name = "redisTemplate")
RedisTemplate<?, ?> template(RedisConnectionFactory connectionFactory) {
RedisTemplate<?, ?> template = new RedisTemplate<>();
template.setKeySerializer(new StringRedisSerializer());
template.setValueSerializer(new JdkSerializationRedisSerializer());
template.setConnectionFactory(connectionFactory);
return template;
}
- 改造Receiver
public class Receiver implements MessageListener {
@Override
public void onMessage(Message message, byte[] pattern) {
RedisSerializer serializer = new JdkSerializationRedisSerializer();
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
//接收的topic
String channel = stringRedisSerializer.deserialize(message.getChannel());
//消息的POJO
Object o = serializer.deserialize(message.getBody());
}
}
至此,发布/订阅的整合改造就完成了。
接下来就是消息队列了,这个就需要自己造轮子了,在spring中使用redisTemlate操作数据库,而对于不同的数据类型则需要不同的操作方式,如下表格所示,具体还是请看官方文档。
实现队列选择list数据结构,redisTemplate.opsForList()使用起来非常简单,和redis命令基本一致。
数据类型 | 操作方式 |
---|---|
string | redisTemplate.opsForValue() |
hash | redisTemplate.opsForHash() |
list | redisTemplate.opsForList() |
set | redisTemplate.opsForSet() |
- 先定义一个消息的POJO
import java.io.Serializable;
public class Message implements Serializable{
private String id;
private String content;
public Message() {
}
public Message(String id, String content) {
this.id = id;
this.content = content;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
@Override
public String toString() {
return "Message{" +
"id='" + id + '\'' +
", content='" + content + '\'' +
'}';
}
}
- 消息的生产者,这个类提供一个发送消息的方法。
@Service
public class MessageProducerService {
@Autowired
RedisTemplate<String, Message> redisTemplate;
public Long sendMessage(Message message) {
System.out.println("发送了"+ message);
return redisTemplate.opsForList().leftPush("queue", message);
}
}
- 消息的消费者,消费者需要不断轮询队列,有消息便取出来,实现方式如下:
@Service
public class MessageConsumerService extends Thread {
@Autowired
RedisTemplate<String, Message> redisTemplate;
@Override
public void run() {
while (true){
Message message = redisTemplate.opsForList().rightPop("queue", 1000L, TimeUnit.SECONDS);
System.out.println("接收到了" + message);
}
}
}
至此,消息队列的方式也整合完成了。
虽然redisTemplate是线程安全的,但是如果一个队列有多个接收者的话,可能也还需要考虑一下并发的问题。
最后有什么问题,欢迎大家给我留言,谢谢。