maven
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.11.4</version>
</dependency>
使用demo
public static void main(String[] args) {
Config config = new Config();
config.useClusterServers()
.addNodeAddress("redis://10.23.3.24:7000")
.addNodeAddress("redis://10.23.3.24:7001")
.addNodeAddress("redis://10.23.3.24:7002")
RedissonClient redisson = Redisson.create(config);
RBlockingQueue<String> blockingQueue = redisson.getBlockingQueue("dest_queue2");
RDelayedQueue<String> delayedQueue = redisson.getDelayedQueue(blockingQueue);
new Thread() {
public void run() {
while(true) {
try {
//阻塞队列有数据就返回,否则wait
String take = blockingQueue.take();
System.out.println("take:"+take);
long time = Long.valueOf(take.split("---")[0]);
//消费可能产生的延迟
System.out.println("delay:"+(System.currentTimeMillis()-time-2*60*1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
}.start();
for(int i=1;i<=100000;i++) {
// 向阻塞队列放入数据
long time = System.currentTimeMillis();
delayedQueue.offer(time+"---"+i, 2, TimeUnit.MINUTES);
}
delayedQueue.destroy();
}
这里使用了两个queue,对delayedQueue的offer操作是直接进入delayedQueue,元素到期之后进入blockingQueue,通过while循环取出到期的元素。
demo测试了延迟队列到期之后的延迟,结果比较震惊,达到了160s。分析发现,其实delayedQueue进入blockingQueue是准时的,但是同一时间到期的元素过多导致blockingQueue堆积,且只有一个线程消费blockingQueue,所以导致延迟越来越高;搞成两个线程消费延迟就正常到毫秒级了。
源码
RedissonDelayedQueue初始化
protected RedissonDelayedQueue(QueueTransferService queueTransferService, Codec codec, final CommandAsyncExecutor commandExecutor, String name) {
super(codec, commandExecutor, name);
channelName = prefixName("redisson_delay_queue_channel", getName());
queueName = prefixName("redisson_delay_queue", getName());
timeoutSetName = prefixName("redisson_delay_queue_timeout", getName());
//新建一个调度任务
QueueTransferTask task = new QueueTransferTask(commandExecutor.getConnectionManager()) {
//pushTaskAsync:异步将到期元素转移到阻塞队列
@Override
protected RFuture<Long> pushTaskAsync() {
//这里使用一段lua脚本,KEYS[1]为getName(),KEYS[2]为timeoutSetName,KEYS[3]为queueName;ARGV[1]为当前时间戳,ARGV[2]为100
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG,
//这里调用zrangebyscore,对timeoutSetName的zset使用timeout参数进行排序,取得分介于0和当前时间戳的元素(即到期的元素),取前100条
"local expiredValues = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); "
+ "if #expiredValues > 0 then "
+ "for i, v in ipairs(expiredValues) do "
+ "local randomId, value = struct.unpack('dLc0', v);"
//调用rpush移交到阻塞队列
+ "redis.call('rpush', KEYS[1], value);"
//调用lrem从元素队列移除
+ "redis.call('lrem', KEYS[3], 1, v);"
+ "end; "
//从timeoutSetName的zset中删除掉已经处理的这些元素
+ "redis.call('zrem', KEYS[2], unpack(expiredValues));"
+ "end; "
// get startTime from scheduler queue head task
//取timeoutSetName的zset的第一个元素的得分返回,如果没有返回nil,后面有用
+ "local v = redis.call('zrange', KEYS[2], 0, 0, 'WITHSCORES'); "
+ "if v[1] ~= nil then "
+ "return v[2]; "
+ "end "
+ "return nil;",
Arrays.<Object>asList(getName(), timeoutSetName, queueName),
System.currentTimeMillis(), 100);
}
@Override
protected RTopic getTopic() {
return new RedissonTopic(LongCodec.INSTANCE, commandExecutor, channelName);
}
};
//开启任务调度
queueTransferService.schedule(queueName, task);
this.queueTransferService = queueTransferService;
}
QueueTransferService.schedule
public class QueueTransferService {
private final ConcurrentMap<String, QueueTransferTask> tasks = PlatformDependent.newConcurrentHashMap();
public synchronized void schedule(String name, QueueTransferTask task) {
QueueTransferTask oldTask = tasks.putIfAbsent(name, task);
if (oldTask == null) {
//旧调度不存在,直接开启
task.start();
} else {
//调度已存在,则数量+1
oldTask.incUsage();
}
}
public synchronized void remove(String name) {
QueueTransferTask task = tasks.get(name);
if (task != null) {
if (task.decUsage() == 0) {
tasks.remove(name, task);
task.stop();
}
}
}
}
QueueTransferTask.start
public void start() {
RTopic<Long> schedulerTopic = getTopic();
//订阅时候触发pushTask
statusListenerId = schedulerTopic.addListener(new BaseStatusListener() {
@Override
public void onSubscribe(String channel) {
pushTask();
}
});
//监听消息,收到消息执行scheduleTask
messageListenerId = schedulerTopic.addListener(new MessageListener<Long>() {
@Override
public void onMessage(CharSequence channel, Long startTime) {
scheduleTask(startTime);
}
});
}
private void scheduleTask(final Long startTime) {
TimeoutTask oldTimeout = lastTimeout.get();
if (startTime == null) {
return;
}
if (oldTimeout != null) {
oldTimeout.getTask().cancel();
}
//delay:即还有多久到期
long delay = startTime - System.currentTimeMillis();
if (delay > 10) {
//delay大于10ms,则新建一个定时器,到期之后再执行pushTask
//这里底层通过HashedWheelTimer实现
Timeout timeout = connectionManager.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
pushTask();
TimeoutTask currentTimeout = lastTimeout.get();
if (currentTimeout.getTask() == timeout) {
lastTimeout.compareAndSet(currentTimeout, null);
}
}
}, delay, TimeUnit.MILLISECONDS);
if (!lastTimeout.compareAndSet(oldTimeout, new TimeoutTask(startTime, timeout))) {
timeout.cancel();
}
} else {
//delay小于10ms,立即执行pushTask
pushTask();
}
}
private void pushTask() {
//这里就是执行初始化时候构建的QueueTransferTask的pushTaskAsync方法
RFuture<Long> startTimeFuture = pushTaskAsync();
startTimeFuture.onComplete((res, e) -> {
if (e != null) {
if (e instanceof RedissonShutdownException) {
return;
}
log.error(e.getMessage(), e);
//如果执行异常,则5s之后继续调度
scheduleTask(System.currentTimeMillis() + 5 * 1000L);
return;
}
//res即pushTaskAsync方法最后取timeoutSetName的zset的第一个元素的得分返回
if (res != null) {
//如果不为空,继续执行调度
scheduleTask(res);
}
});
}
RedissonDelayedQueue.offer
@Override
public void offer(V e, long delay, TimeUnit timeUnit) {
get(offerAsync(e, delay, timeUnit));
}
@Override
public RFuture<Void> offerAsync(V e, long delay, TimeUnit timeUnit) {
if (delay < 0) {
throw new IllegalArgumentException("Delay can't be negative");
}
long delayInMs = timeUnit.toMillis(delay);
//到期时间:当前时间+延迟时间
long timeout = System.currentTimeMillis() + delayInMs;
long randomId = ThreadLocalRandom.current().nextLong();
//这里使用的是一段lua脚本,其中keys参数数组有四个值,KEYS[1]为getName(), KEYS[2]为timeoutSetName, KEYS[3]为queueName, KEYS[4]为channelName
//变量有三个,ARGV[1]为timeout,ARGV[2]为randomId,ARGV[3]为encode(e)
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_VOID,
"local value = struct.pack('dLc0', tonumber(ARGV[2]), string.len(ARGV[3]), ARGV[3]);"
//对timeoutSetName的zset添加一个结构体,其score为timeout值
+ "redis.call('zadd', KEYS[2], ARGV[1], value);"
//对queueName的list的表尾添加结构体
+ "redis.call('rpush', KEYS[3], value);"
// if new object added to queue head when publish its startTime
// to all scheduler workers
//判断timeoutSetName的zset的第一个元素是否是当前的结构体,如果是则对channel发布timeout消息
//这个作用是判断第一个添加的元素,触发定时器(初始化时候订阅了channel->onMessage->scheduleTask)
+ "local v = redis.call('zrange', KEYS[2], 0, 0); "
+ "if v[1] == value then "
+ "redis.call('publish', KEYS[4], ARGV[1]); "
+ "end;",
Arrays.<Object>asList(getName(), timeoutSetName, queueName, channelName),
timeout, randomId, encode(e));
}
总结
- Redisson延迟队列使用三个结构来存储,一个是queueName的list,值是添加的元素;一个是timeoutSetName的zset,值是添加的元素,score为timeout值;还有一个是getName()的blockingQueue,值是到期的元素。
- 主要方法是逻辑是:将元素及延时信息入队,之后定时任务将到期的元素转移到阻塞队列。
- 使用HashedWheelTimer做定时,定时到期之后从zset中取头部100个到期元素,所以定时和转移到阻塞队列是解耦的,无论是哪个task触发的pushTask,最终都是先取zset的头部先到期的元素。
- 元素数据都是存在redis服务端的,客户端只是执行HashedWheelTimer任务,所以单个客户端挂了不影响服务端数据,做到分布式的高可用。