Node.js 任务队列Bull的源码浅析

原文地址: https://www.jianshu.com/p/1ed50e6d4a08

Bull是基于Redis的一个Node.js任务队列管理库,支持延迟队列,优先级任务,重复任务,以及原子操作等多种功能.

本文将从基本的使用来分析Bull的源码,对于repeat job,seperate processes等暂不展开.

Bull: Premium Queue package for handling jobs and messages in NodeJS.

相关的信息如下:

目录

基本使用

Bull的使用分为三个步骤:

  1. 创建队列
  2. 绑定任务处理函数
  3. 添加任务

如下示例:

const Bull = require('bull')
// 1. 创建队列
const myFirstQueue = new Bull('my-first-queue');
// 2. 绑定任务处理函数
myFirstQueue.process(async (job, data) => {
  return doSomething(data);
});
// 3. 添加任务
const job = await myFirstQueue.add({
  foo: 'bar'
});

创建队列

创建队列是先通过require然后再通过new来实现的,因此要先找到require的入口.打开package.json:

{
  "name": "bull",
  "version": "3.7.0",
  "description": "Job manager",
  "main": "./index.js",
  ...
}

看到入口为index.js,打开:

module.exports = require('./lib/queue');
module.exports.Job = require('./lib/job');

从而找到目标函数所在文件./lib/queue:

module.exports = Queue;

可以看到exports的是Queue,接着去分析Queue函数:

const Queue = function Queue(name, url, opts) {
  ...
  // 默认设置
  this.settings = _.defaults(opts.settings, { 
    lockDuration: 30000,
    stalledInterval: 30000,
    maxStalledCount: 1,
    guardInterval: 5000,
    retryProcessDelay: 5000,
    drainDelay: 5, // 空队列时brpoplpush的等待时间
    backoffStrategies: {}
  });
  ...
  // Bind these methods to avoid constant rebinding and/or creating closures
  // in processJobs etc.
  this.moveUnlockedJobsToWait = this.moveUnlockedJobsToWait.bind(this);
  this.processJob = this.processJob.bind(this);
  this.getJobFromId = Job.fromId.bind(null, this);
  ...
};

主要是进行参数初始化和函数的绑定.

绑定任务处理函数

该步骤是从myFirstQueue.process开始的,先看process函数:

Queue.prototype.process = function (name, concurrency, handler) {
  ...
  this.setHandler(name, handler); // 1. 绑定handler

  return this._initProcess().then(() => {
    return this.start(concurrency); // 2. 启动队列
  });
};

该函数做了两个事情:

  1. 绑定handler
  2. 启动队列

先看绑定handler:

Queue.prototype.setHandler = function (name, handler) {
  ...
  if (this.handlers[name]) {
    throw new Error('Cannot define the same handler twice ' + name);
  }
  ...
  if (typeof handler === 'string') {
    ...
  } else {
    handler = handler.bind(this);
    // 将handler和名字保存起来
    if (handler.length > 1) {
      this.handlers[name] = promisify(handler);
    } else {
      this.handlers[name] = function () {
       ...
    }
  }
};

再看队列的启动:

Queue.prototype.start = function (concurrency) {
  return this.run(concurrency).catch(err => {
    this.emit('error', err, 'error running queue');
    throw err;
  });
};

run函数:

Queue.prototype.run = function (concurrency) {
  const promises = [];

  return this.isReady()
    .then(() => {
      return this.moveUnlockedJobsToWait(); // 将unlocked的任务移动到wait队列
    })
    .then(() => {
      return utils.isRedisReady(this.bclient);
    })
    .then(() => {
      while (concurrency--) {
        promises.push(
          new Promise(resolve => {
            this.processJobs(concurrency, resolve); // 处理任务
          })
        );
      }

      this.startMoveUnlockedJobsToWait(); // unlocked job定时检查

      return Promise.all(promises);
    });
};

unlocked job(stalled job): job的运行需要锁,正常情况下job在active时会获取锁(有过期时间lockDuration,定时延长lockRenewTime),complete时释放锁,如果job在active时无锁,说明进程被阻塞或崩溃导致锁过期

processJobs:

Queue.prototype.processJobs = function (index, resolve, job) {
  const processJobs = this.processJobs.bind(this, index, resolve);
  process.nextTick(() => {
    this._processJobOnNextTick(processJobs, index, resolve, job);
  });
};

再看_processJobOnNextTick:

        // 关键代码
        const gettingNextJob = job ? Promise.resolve(job) : this.getNextJob();

        return (this.processing[index] = gettingNextJob
          .then(this.processJob)
          .then(processJobs, err => {
            ...
          }));

上述代码可以作如下描述:

  1. job为空时用getNextJob函数来获取job
  2. 执行processJob函数
  3. 执行processJobs函数

先看getNextJob:

if (this.drained) {
    //
    // Waiting for new jobs to arrive
    //
    console.log('bclient start get new job');
    return this.bclient
      .brpoplpush(this.keys.wait, this.keys.active, this.settings.drainDelay)
      .then(
        jobId => {
          if (jobId) {
            return this.moveToActive(jobId);
          }
        },
        err => {
         ...
        }
      );
  } else {
    return this.moveToActive();
  }

运用Redis的PUSH/POP机制来获取消息,超时时间为drainDelay.

接着来看processJob:

Queue.prototype.processJob = function (job) {
  ... 
  const handleCompleted = result => {
    return job.moveToCompleted(result).then(jobData => {
      ...
      return jobData ? this.nextJobFromJobData(jobData[0], jobData[1]) : null;
    });
  };

  // 延长锁的时间
  lockExtender();
  const handler = this.handlers[job.name] || this.handlers['*'];

  if (!handler) {
    ...
  } else {
    let jobPromise = handler(job);
    ...

    return jobPromise
      .then(handleCompleted) 
      .catch(handleFailed)
      .finally(() => {
        stopTimer();
      });
  }
};

可以看到任务处理成功后会调用handleCompleted,在其中调用的是job的moveToCompleted,中间还有一些调用,最终会调用lua脚本moveToFinished:

  ...
  -- Try to get next job to avoid an extra roundtrip if the queue is not closing, 
  -- and not rate limited.
  ...

该脚本到作用是将job移动到completed或failed队列,然后取下一个job.

processJob执行完后就又重复执行processJobs,这就是一个循环,这个是核心,如下图:

image.png

添加任务

直接看add函数:

Queue.prototype.add = function (name, data, opts) {
  ...
  if (opts.repeat) {
    ...
  } else {
    return Job.create(this, name, data, opts);
  }
};

调用的是Job中的create函数:

Job.create = function (queue, name, data, opts) {
  const job = new Job(queue, name, data, opts); // 1. 创建job

  return queue
    .isReady()
    .then(() => {
      return addJob(queue, job); // 2. 添加job到队列中
    })
    ...
};

继续沿着addJob,最终会调用的是lua脚本的addJob,根据job设置将job存入redis.

问题

1. 为什么会出现错误: job stalled more than allowable limit

在run函数中执行了函数this.startMoveUnlockedJobsToWait(),来看看该函数:

Queue.prototype.startMoveUnlockedJobsToWait = function () {
  clearInterval(this.moveUnlockedJobsToWaitInterval);
  if (this.settings.stalledInterval > 0 && !this.closing) {
    this.moveUnlockedJobsToWaitInterval = setInterval(
      this.moveUnlockedJobsToWait,
      this.settings.stalledInterval
    );
  }
};

该函数是用来定时执行moveUnlockedJobsToWait函数:

Queue.prototype.moveUnlockedJobsToWait = function () {
  ...
  return scripts
    .moveUnlockedJobsToWait(this)
    .then(([failed, stalled]) => {
      const handleFailedJobs = failed.map(jobId => {
        return this.getJobFromId(jobId).then(job => {
          this.emit(
            'failed',
            job,
            new Error('job stalled more than allowable limit'),
            'active'
          );
          return null;
        });
      });
      ...
    })
    ...
    ;
};

该函数会通过scripts的moveUnlockedJobsToWait函数最终调用lua脚本moveUnlockedJobsToWait:

  ...
  local MAX_STALLED_JOB_COUNT = tonumber(ARGV[1])
  ...
        if(stalledCount > MAX_STALLED_JOB_COUNT) then
          rcall("ZADD", KEYS[4], ARGV[3], jobId)
          rcall("HSET", jobKey, "failedReason", "job stalled more than allowable limit")
          table.insert(failed, jobId)
        else
          -- Move the job back to the wait queue, to immediately be picked up by a waiting worker.
          rcall("RPUSH", dst, jobId)
          rcall('PUBLISH', KEYS[1] .. '@', jobId)
          table.insert(stalled, jobId)
        end
  ...
return {failed, stalled}
  • MAX_STALLED_JOB_COUNT: 默认为1

该脚本会将stalled的job取出并返回,从而生成如题错误.

参考

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,607评论 6 507
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,239评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,960评论 0 355
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,750评论 1 294
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,764评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,604评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,347评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,253评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,702评论 1 315
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,893评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,015评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,734评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,352评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,934评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,052评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,216评论 3 371
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,969评论 2 355