背景
很多人在面试的时候可能都碰到过这样的一个面试题:设计一个秒杀系统,30分钟没付款就自动关闭交易,这里我们主要来看下在实际的项目中如何结合业务需求来实现类似"xxx分钟后自动完成xxx"这种属于延迟任务的功能。
业务场景
下面来看看具体的业务场景,我们在系统中有很多的需求如活动报名,活动签到,活动取消,活动审核等等都需要发送短信或者消息,其中有些比较特殊的如活动开始或者结束前xxx小时发送消息、活动到期后自动发送通知等等都属于延迟性触发的任务,那么针对于类似这样的任务,一般我们都是怎么处理的呢?
设计思路
由于系统是目前仍属于单机应用,所以在实现上暂时不考虑分布式,为了简单快速采用了JDK自带的本地延迟队列DelayQueue结合redis作为数据灾备的方案.DelayQueue是JUC框架中提供的一个具备延迟机制的队列.
DelayQueue具有有如下特点:
- 队列中存储的元素必须实现Delayed接口,且元素具有时效性.
- Delayed接口提供了getDelay方法来返回对象的延迟时间.
- Delayed接口提供了compareTo方法用于队列内部元素的比较排序.
- 内部使用了优先级队列PriorityQueue来实现每次从队首中取出来的都是最先要过期的元素.
- 实现了BlockingQueue接口,是一个无界阻塞队列,且元素不允许为null.
- 提供了如阻塞方法take()返回队首元素,put()方法添加元素,remove()方法元素出队等等...
那么大致实现的思路如下:
在创建活动时会将活动id与计算好的时间差值存储到redis缓存中,服务器后台开启守护线程实时监控本地队列中到期的任务,并触发相应的推送操作,同时为了防止服务器意外重启等情况,在系统初始化时会将缓存数据load到本地队列中,这样可以避免由于数据丢失导致消息与短信数据没有推送到,下面来看看实现步骤:
1.先创建任务对象dto,内部定义了任务标识ID与延迟时间戳并实现Delayed接口和Runnable接口,我们来看看其中一个dto的实现,其他的类似,代码如下:
public class NotifyDto implements Delayed, Runnable, Serializable {
private static final long serialVersionUID = 1L;
private final static Logger logger = LoggerFactor.getLogger(NotifyDto.class);
private String redisMsg;//缓存数据即任务标识ID
private long expireTime;//延迟时间
//带参构造函数
public NotifyDto(String redisMsg, long delayTime) {
this.redisMsg = redisMsg;
this.expireTime = TimeUnit.NANOSECONDS.convert(delayTime,
TimeUnit.MILLISECONDS) + System.nanoTime();
}
public String getRedisMsg() {
return redisMsg;
}
public void setRedisMsg(String redisMsg) {
this.redisMsg = redisMsg;
}
public long getExpireTime() {
return expireTime;
}
public void setExpireTime(long expireTime) {
this.expireTime = expireTime;
}
/**
* 用于延迟队列内部比较排序 当前对象的延迟时间 - 比较对象的延迟时间
* @see java.lang.Comparable#compareTo(java.lang.Object)
**/
@Override
public int compareTo(Delayed o) {
NotifyDto task = (NotifyDto) o;
long result = this.getDelay(TimeUnit.NANOSECONDS)
- task.getDelay(TimeUnit.NANOSECONDS);
if (result < 0) {
return -1;
} else if (result > 0) {
return 1;
} else {
return 0;
}
}
/**
* 返回对象延迟时间
* @see java.util.concurrent.Delayed#getDelay(java.util.concurrent.TimeUnit)
**/
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(this.expireTime - System.nanoTime(),
TimeUnit.NANOSECONDS);
}
/**
* 任务回调处理
* @see java.lang.Runnable#run()
**/
@Override
public void run() {
logger.debug("当前任务队列:msgQueue,延迟时间:{},活动ID:{}", this.expireTime
+ "=========================================", this.redisMsg);
this.msgPus h(this.redisMsg);
}
private void msgPush(String redisMsg) {
Map<String, Object> msgMap = new HashMap<>();
Integer activeId = NumberHelpUtils.toInt(redisMsg);
//省略发送消息动作
//清空缓存记录
JedisUtils.zRemove("active:review:notify", activeId + "");
}
@Override
public String toString() {
return "NotifyDto [redisMsg=" + redisMsg + ", expireTime=" + expireTime
+ "]";
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result
+ ((redisMsg == null) ? 0 : redisMsg.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
NotifyDto other = (NotifyDto) obj;
if (redisMsg == null) {
if (other.redisMsg != null)
return false;
} else if (!redisMsg.equals(other.redisMsg))
return false;
return true;
}
}
2.创建一个任务调度服务service监控所有队列中的过期任务对象,其内部包含各种任务队列初始化,出队与入队等方法,大致代码如下.
public class MsgQueueService {
private final static Logger logger = LoggerFactory
.getLogger(MsgQueueService.class);
private MsgQueueService() {
}
private static class MsgHolder {
private static MsgQueueService msgQueueService = new MsgQueueService();
}
public static MsgQueueService getInstance() {
return MsgHolder.msgQueueService;
}
/**
* 执行任务线程池
*/
private final static Executor es = Executors.newFixedThreadPool(5);
/**
* 创建3个守护线程
*/
private Thread expireActiveThread;
private Thread startByActiveThread;
private Thread beforeEndActiveThread;
/**
* 创建延迟任务队列
*/
private DelayQueue<NotifyDto> msgQueue = new DelayQueue<>();
private DelayQueue<StartByNotifyDto> msgQueue2 = new DelayQueue<>();
private DelayQueue<BeforEndNotifyDto> msgQueue3 = new DelayQueue<>();
/**
*
* 系统启动时初始化
*/
public void init() {
//初始化数据
initRedisMsg();
//监听活动结束后任务
expireActiveThread = new Thread(() -> execute());
expireActiveThread.setDaemon(true);
expireActiveThread.setName("ExpireActive Daemon Thread");
expireActiveThread.start();
//监听活动开始前2小时任务
startByActiveThread = new Thread(() -> execute2());
startByActiveThread.setDaemon(true);
startByActiveThread.setName("StartByActive Daemon Thread");
startByActiveThread.start();
//监听活动结束前2小时任务
beforeEndActiveThread = new Thread(() -> execute3());
beforeEndActiveThread.setDaemon(true);
beforeEndActiveThread.setName("BeforeEndActive Daemon Thread");
beforeEndActiveThread.start();
}
/**
*
* 从Redis中初始化任务到队列中
*/
public void initRedisMsg() {
Set<String> keySet = JedisUtils.zRange("active:review:notify", 0, -1);
if (CollectionHelpUtils.isNotEmpty(keySet)) {
keySet.stream().forEach(
o -> {
long expireTime = JedisUtils.zScore(
"active:review:notify", o).longValue();
NotifyDto redisTask = new NotifyDto(o, expireTime
- System.currentTimeMillis());
this.push(redisTask);
});
}
Set<String> applyStartSet = JedisUtils.zRange(
"active:applyStart:notify", 0, -1);
if (CollectionHelpUtils.isNotEmpty(applyStartSet)) {
applyStartSet.stream().forEach(
o -> {
long expireTime = JedisUtils.zScore(
"active:applyStart:notify", o).longValue();
StartByNotifyDto redisTask = new StartByNotifyDto(o,
expireTime - System.currentTimeMillis());
this.push(redisTask);
});
}
Set<String> applyEndSet = JedisUtils.zRange("active:applyEnd:notify",
0, -1);
if (CollectionHelpUtils.isNotEmpty(applyEndSet)) {
applyEndSet.stream().forEach(
o -> {
long expireTime = JedisUtils.zScore(
"active:applyEnd:notify", o).longValue();
BeforEndNotifyDto redisTask = new BeforEndNotifyDto(o,
expireTime - System.currentTimeMillis());
this.push(redisTask);
});
}
}
/**
* 监听队列,如果没有过期对象则阻塞
* @param es
*/
private void execute() {
while (true) {
try {
NotifyDto task = msgQueue.take();
if (task != null) {
logger.debug("当前任务队列:{},执行时间:{}", "msgQueue",
DateTimeUtils.getTime());
//此处真正执行了任务对象中的run方法,触发了业务推送动作
es.execute(task);
}
} catch (Exception e) {
e.printStackTrace();
break;
}
}
}
private void execute2() {
while (true) {
try {
StartByNotifyDto task2 = msgQueue2.take();
if (task2 != null) {
logger.debug("当前任务队列:{},执行时间:{}", "msgQueue2",
DateTimeUtils.getTime());
//此处真正执行了任务对象中的run方法,触发了业务推送动作
es.execute(task2);
}
} catch (Exception e) {
e.printStackTrace();
break;
}
}
}
private void execute3() {
while (true) {
try {
BeforEndNotifyDto task3 = msgQueue3.take();
if (task3 != null) {
logger.debug("当前任务队列:{},执行时间:{}", "msgQueue3",
DateTimeUtils.getTime());
//此处真正执行了任务对象中的run方法,触发了业务推送动作
es.execute(task3);
}
} catch (Exception e) {
e.printStackTrace();
break;
}
}
}
/**
*
* 任务入队
* @param time
* @param task
*/
public void push(NotifyDto task) {
msgQueue.put(task);
}
public void push(StartByNotifyDto task) {
msgQueue2.put(task);
}
public void push(BeforEndNotifyDto task) {
msgQueue3.put(task);
}
/**
*
* 任务出队
* @param task
*/
public void remove(NotifyDto task) {
msgQueue.remove(task);
}
public void remove(StartByNotifyDto task) {
msgQueue2.remove(task);
}
public void remove(BeforEndNotifyDto task) {
msgQueue3.remove(task);
}
}
注意: MsgQueueService的实现使用了单例模式,并且其中的init方法在系统初始化时被调度执行.
3.创建一个系统初始化服务类,默认实现Spring框架提供的接口InitializingBean,重写其接口方法afterPropertiesSet以便在系统启动后自动执行初始化逻辑.
@Override
public void afterPropertiesSet() throws Exception {
logger.debug("系统初始化...");
MsgQueueService.getInstance().init();
}
4.最后一步就是具体的业务逻辑处理了,我们在业务service中的调用方法代码如下:
/**
*
* 把活动任务推送到延迟队列中
* @param activeId
*/
private void Msg2Queue(Active active, boolean isAdd) {
long delayTime = 0;
if (isAdd) {
//计算出当前任务延迟时间
delayTime = active.getActiveEndTime().getTime()
- active.getCreateTime().getTime();
//推送到redis中
JedisUtils.zAdd("active:review:notify", active.getActiveEndTime()
.getTime(), active.getId() + "");
//活动到期未点评的任务入队
NotifyDto task = new NotifyDto(active.getId() + "", delayTime);
MsgQueueService.getInstance().push(task);
//判断活动报名截止状态
if (active.getApplyAudit() == 1) {
if (active.getApplyAbort() == 1) {
//活动开始截止报名
delayTime = active.getActiveStartTime().getTime() - 2 * 60
* 60 * 1000 - System.currentTimeMillis();
//推送到redis中
JedisUtils.zAdd("active:applyStart:notify", active
.getActiveStartTime().getTime()
- 2
* 60
* 60
* 1000, active.getId() + "");
StartByNotifyDto task2 = new StartByNotifyDto(
active.getId() + "", delayTime);
MsgQueueService.getInstance().push(task2);
} else {
//活动结束前可报名
delayTime = active.getActiveEndTime().getTime() - 2 * 60
* 60 * 1000 - System.currentTimeMillis();
//推送到redis中
JedisUtils.zAdd("active:applyEnd:notify", active
.getActiveEndTime().getTime() - 2 * 60 * 60 * 1000,
active.getId() + "");
BeforEndNotifyDto task3 = new BeforEndNotifyDto(
active.getId() + "", delayTime);
MsgQueueService.getInstance().push(task3);
}
}
} else {
//动态取消任务
Double score = JedisUtils.zScore("active:review:notify",
active.getId() + "");
if (score != null) {
long expireTime = score.longValue();
MsgQueueService.getInstance().remove(
new NotifyDto(active.getId() + "", expireTime));
//清空redis中记录
JedisUtils.zRemove("active:review:notify", active.getId() + "");
//重新计算出当前任务延迟时间
delayTime = active.getActiveEndTime().getTime()
- System.currentTimeMillis();
//推送到redis中
JedisUtils.zAdd("active:review:notify", active
.getActiveEndTime().getTime(), active.getId() + "");
//延迟任务入队
NotifyDto task = new NotifyDto(active.getId() + "", delayTime);
MsgQueueService.getInstance().push(task);
}
}
}
注意:由于实际业务中任务可以被修改或取消,所以定义任务dto时需重写其hashcode方法与equals方法,来防止队列中的对象出现冲突,由于dto中的redisMsg字段对应了mysql中的表自增主键,所以我们使用了这个字段来重写这两个方法.
总结
此方案的优点:
- 代码实现相对比较简单,利用JDK自带的容器类来解决延迟处理问题,无需自己造轮子.
- 效率高,任务触发时间延迟低.
- 由于线程安全还可以实现多生产者与消费者.
- 结合redis做数据灾备,避免由于服务重启或其他异常退出导致的数据丢失问题.
此方案的缺点:
- DelayQueue属于单机队列,若在分布式集群环境下,要自己做横向扩展以实现高可用,难度较高.
- 服务器一旦宕机,数据将丢失,需结合其他底层存储做持久化,增加了编码的复杂性.
- 数据存储在单机内存中,受物理条件限制,数据量大时容易OOM.
其他的解决方案:
- JDK自带的线程池ScheduledExecutorService.
- 简单的定时任务轮询,扫描数据表,数据量大时会有性能瓶颈.
- 可以定时任务结合redis,任务和到期时间都保存在redis中,启动定时任务扫描redis,到期的key删除,并且异步更新数据库.
- google guava的缓存也可实现类似的功能.
- 消息队列如ActiveMQ或者RobbitMQ都提供了死信队列可实现延迟功能,设定任务的到期时间,到期之后自动进入死信队列,后台开启守护线程监控死信队列.
- 时间轮算法,Netty提供了一个HashedWheelTimer来实现.
- 利用redis的zset可实现延迟队列或者redis的Keyspace Notifications(键空间机制)实现key失效后提供回调函数.
......