本文将使用redis实现异步队列以及延迟队列,虽然我们在实际开发中经常会有专业的消息队列中间件,如:rabbitmq等,但是如果系统中没有mq中间件,又懒得维护mq中间件,那么我们可以通过redis来实现
因为redis并不是专业实现队列的中间件,因此在实现方式上还是会存在一些问题,还是比不上rabbitmq之类的中间件,那么我为什么还写这篇文章,是因为通过使用redis来实现队列及延迟队列,可以让我对redis的数据结构更加熟悉,使用的更加顺手。
一、使用redis实现异步队列
redis实现队列主要是使用数据结构中的list,因为它是按照塞入顺序排序的结构,我们就可以按照左边塞入,右边取出的方式来实现先入先出的队列需求
具体实现如下:
public class RedisClient {
@Resource
private JedisPool jedisPool;
/**
* 向List头部追加记录
* @param key
* @param value
* @return 记录总数
*/
public void rpush(String key, String value) {
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
jedis.rpush(key,value);
} catch (Exception e) {
throw e;
} finally {
if(jedis != null){
jedis.close();
}
}
}
}
//controller写一个写入队列的方法
@PostMapping("insert")
public void setList(@RequestBody QueueTest queueTest){
System.out.println("塞入一条数据");
redisClient.rpush("queueTest", JSON.toJSONString(queueTest));
}
此时我们已经实现了从右边往队列中塞数据,那么我们接下来只需要从左边将数据取出,即可实现先入先出
我们在项目启动的时候创建一个线程来消费数据,代码如下:
@Service
public class TestRunner implements ApplicationRunner {
@Autowired
private RedisClient redisClient;
@Override
public void run(ApplicationArguments args) throws Exception {
new Thread(new Worker()).start();
}
private class Worker implements Runnable {
@Override
public void run() {
while (true){
List<String> queueTest = redisClient.blpop(0,"queueTest");
System.out.println("开始消费了");
if(queueTest != null){
System.out.println(JSON.toJSONString(queueTest));
}
}
}
}
}
这样我们就可以实现一个简单的队列,至于弹出时为什么使用blpop的方法而不是lpop方法是因为如果使用lpop方法的话会造成如果队列中没有数据,连接一直空置的情况,所以使用blpop的方法可以在没有数据的时候将连接阻塞,在有数据时再读取
。
当然使用blpop同样可能存在长时间没有数据,redis将连接断掉的情况,因此就需要我们在使用时将这种情况的异常也考虑进去,在catch中将连接重新建立之类。
测试过程如下:
上面我们实现的只是简单的异步队列demo,在项目中使用的话我们可以灵活扩展,如:我们需要根据value中的某一个值来做不同的处理,那么我们可以使用工厂模式,写一个工厂类,在消费消息时,通过工厂类来分配不同的消费方式。
缺点: redis实现消息我们消费的时候可能无法得到消息的ack,因为我们可能需要自己维护一个消息消费的表来记录消费的情况,做后续处理
二、redis实现延迟队列
我们使用redis实现延迟队列主要时事使用zset数据格式,主要是借助它的score参数,因为zset是按照score进行排序的,实现如下:
我们这里实现一个简单的五秒延迟执行:
1、首先我们插入一条数据,将score设置当前时间戳加上五秒,如下
@PostMapping("setZset")
public void setZset(@RequestBody QueueTest queueTest){
System.out.println("塞入一条数据");
redisClient.zadd("zqueueTest1",System.currentTimeMillis() + 5000,JSON.toJSONString(queueTest));
}
2、我们设置一个跑批去读取在当前时间或者当前时间之前的值,代码如下:
@EnableScheduling
@Component
public class TestTask {
@Autowired
private RedisClient redisClient;
@Scheduled(cron = "* * * * * ?")
public void popSet(){
Set<String> zqueueTest = redisClient.zrangeWithScores("zqueueTest1", 0, System.currentTimeMillis());
//只取一条数据
//Set<String> zqueueTest1 = redisClient.zrangeWithScores("zqueueTest1", 0, System.currentTimeMillis(),0,1);
for (String item : zqueueTest){
System.out.println(JSON.toJSONString(item));
redisClient.zrem("zqueueTest1",item);
}
}
}
这样我们就可以实现zset里面是按照时间的大小进行排序的,我们只需要取出当前时间以前的数据,就可以实现延迟的效果。
测试如下:
通过上图的时间可以看出来我们基本上试下了五秒的延迟
缺点:
1、因为是跑批读取所以存在一定的时间延迟
2、对于并发的支持性不强
总结: 上面就是redis实现队列的一些demo,这里写出来只是给大家提供一些思路,但是大家也可以看出来这种实现方式还是存在一定的问题,所以真正用的时候还是使用专业的mq中间件,后续也会有mq的系列文章大家也可以参考借鉴下。