在实际的业务领域中,常常会异步处理一些任务,这些任务的执行时间可能会比我们的预期更长一点,这些任务有点需要及时处理,有的可能需要延时执行。通常的做法是将这些任务放入某一种队列里面,把工作交给对应的任务处理器去处理。这个队列就是任务队列,如先进先出 (FIFO) 队列、后进先出 (LIFO) 队列,优先级 (Priority) 队列以及延时(Delay)队列.
先进先出 (FIFO) 队列
FIFO 队列具有语义清晰,严格维持任务发送和接收的顺序处理,易于实现等特点。Redis 的列表是简单的字符串列表,按照插入顺序排序。允许用户通过 LPUSH 和 RPUSH 以及 RPOP 和 LPOP 在列表的两端推入和插入元素,此外还提供了 BRPOP 和BLPOP 连个阻塞命令,非常适合用于 FIFO 队列。
定义任务实体
创建 taskModel.js 文件,该文件定义了任务实体。一个任务通常要包含,任务必要的标识、任务的类型、任务的行为等等。
/**
* 任务实体
* New node file
*/
function Task(){
var id
var type;
var action;
this.setId = function(theyId){
id = theyId;
};
this.setType = function(theyType){
type = theyType;
};
this.setAction = function(theyAction){
action = theyAction;
};
this.getTaskInfo = function(){
return '我的任务 ID 是: '+ this.id +", 任务类型 : "+ this.type+", 我的需要完成任务行为是:"+this.action;
};
}
module.exports = Task;
创建生产者和消费者
创建 main.js 负责模拟生产者和消费者,为了方便演示,这里分别使用 addTaskToQueue () 模拟生产者,getTaskFromQueue 模拟消费者。
/**
* New node file
*/
var Task = require('./taskModel');
const schedule = require('node-schedule');
const app = require('express')();
const PORT = 6379,
HOST = '127.0.0.1',
OPTS = {}; // 配置项,比如说设置密码 {auth_pass:'123456'},
//生产者客户端
const producterClient = require('redis').createClient(PORT, HOST, OPTS);
//消费者客户端
const consumerClient = producterClient.duplicate();
const QUEUE_NAME = 'queue:issue';
//添加任务到队列
function addTaskToQueue(){
// 使用 redis 的 incr 生成 id
producterClient.incr('id', function(err, id) {
if(err){
console.log(err);
}else{
var task = new Task();
task.setId('Task:::'+id);
task.setType('MESSAGE')
task.setAction('打印任务创建的时间>>>创建时间:::'+ new Date());
producterClient.rpush([QUEUE_NAME, task.getTaskInfo()],function(err,reply) {
if(err){
console.log(err);
}else{
console.log('添加任务:'+task.getTaskInfo());
}
});
}
});
}
//从任务队列中获取任务
function getTaskFromQueue(){
//使用 blpop 阻塞模式,直到有数据返回。blpop 命令每次只会从列表中
//弹出一个任务,所以这保证了任务不会被重复执行。
producterClient.blpop(QUEUE_NAME, 4,function(err,reply) {
if(err){
console.log(err);
}else{
if(reply){
console.log('>>>>>>>>消费任务: '+ reply[1]);
}
}
});
}
// 生产者定时任务
let producterJob = schedule.scheduleJob('*/4 * * * * *', () => {
addTaskToQueue();
});
// 消费者定时任务
let consumerJob = schedule.scheduleJob('*/2 * * * * *', () => {
getTaskFromQueue();
});
运行结果:
总结
这里使用 List 的特性简单的模拟了实现了 FIFO 队列。实际应用中任务的定义远远没有这么简单,除了将任务信息推送到队列外,还需要持久化任务信息。其次,还需要考虑到客户端将取得任务后未及时处理宕机导致任务处理失败。再者,客户端获取任务的指令执行成功但是客户连接超时断开连接导致任务丢失等情况。