bagpipe 源码解析

这个也是朴大的一个解决异步并发
控制的包。
抄朴大的一段话,来描述应用场景。
在node中,我们可以十分方便地异步发起并行的调用。使用下面的代码,我们可以轻松发起100次异步调用

for(var i = 0;i < 100; i++){
  async();
}

但是如果并发量过大,我们的下层服务器将会吃不消。如果是对文件系统进行大量并发调用,操作系统的文件描述符数量会瞬间用光,抛出如下,错误

Error: EMFILE, too many open files

可以看出,同步I/O和异步I/O的显著差距:同步I/O因为每一个I/O都是彼此阻塞的,在循环体中,总是一个接着一个的调用,不会出现消耗文件描述符太多的情况,同时性能也一下, 对于异步I/O,虽然并发容易实现,但是由于太容易实现,依然需要控制。换言之,尽管是要压榨底层系统的性能,但是需要给与一定的过载保护,以防止过犹不及。
bagpipe的应用呢,就是维护一个队列来控制并发。
具体应用代码如下:

var Bagpipe = require('bagpipe');
var bagpipe = new Bagpipe(10);
for(var i = 1; i < 10; i++){
  bagpipe.push(async,  function(){
  });
}
bagpipe.on('full', function(length){
  console.log('底层系统处理不能及时完成,队列堵塞,目前队列长度' + length);
});

下面就来看一看bagpipe的源码吧。
这里朴大的bagpipe是继承了nodejs的EventEmitter

构造函数

var Bagpipe = function (limit, options) {
  events.EventEmitter.call(this);
  // 活跃的任务数(并发数)
  this.limit = limit;
  // 当前活跃的任务数量
  this.active = 0;
  // 异步处理的队列
  this.queue = [];
  // 一些配置信息
  this.options = {
    // 是否应用控制并发
    disabled: false,
    // 如果异步事件长度超过了queueLength是否还进入队列
    refuse: false,
    // 根据limit来算队列的长度,用于refuse
    ratio: 1,
    // 超时的时间,如果超过了这个时间任务失败
    timeout: null
  };
  if (typeof options === 'boolean') {
    options = {
      disabled: options
    };
  }
  options = options || {};
  for (var key in this.options) {
    if (options.hasOwnProperty(key)) {
      this.options[key] = options[key];
    }
  }
  // queue length
  this.queueLength = Math.round(this.limit * (this.options.ratio || 1));
};
util.inherits(Bagpipe, events.EventEmitter);

push 方法(添加异步任务)

Bagpipe.prototype.push = function (method) {
  // 处理异步任务method之外的其他参数
  var args = [].slice.call(arguments, 1);
  // 异步任务的回调函数
  var callback = args[args.length - 1];
  if (typeof callback !== 'function') {
    args.push(function () {});
  }
  // 如果不控制,或者limit小于1,那么就立即执行
  if (this.options.disabled || this.limit < 1) {
    method.apply(null, args);
    return this;
  }

  // 队列长度也超过限制值时
  if (this.queue.length < this.queueLength || !this.options.refuse) {
    this.queue.push({
      method: method,
      args: args
    });
  } else {
    var err = new Error('Too much async call in queue');
    err.name = 'TooMuchAsyncCallError';
    callback(err);
  }

  if (this.queue.length > 1) {
    this.emit('full', this.queue.length);
  }
  // 执行next方法,来检测是否可以执行一个异步任务
  this.next();
  return this;
};

next方法,用于检测是否可以并且有异步任务执行

Bagpipe.prototype.next = function () {
  var that = this;
  // 活跃的任务数小于限制数
  if (that.active < that.limit && that.queue.length) {
    var req = that.queue.shift();
    //执行异步任务
    that.run(req.method, req.args);
  }
};

// 异步任务执行成功结束,之后调用的内部方法
Bagpipe.prototype._next = function () {
  //活跃数减一
  this.active--;
  this.next();
};

执行异步任务 run

Bagpipe.prototype.run = function (method, args) {
  var that = this;
  // 活跃数,并行执行的任务数加一
  that.active++;
  var callback = args[args.length - 1];
  var timer = null;
  var called = false;

  // 添加超时逻辑
  args[args.length - 1] = function (err) {
    // anyway, clear the timer
    // 如果有timer,就要clear掉这个timer
    if (timer) {
      clearTimeout(timer);
      timer = null;
    }
    // 没有过期之前
    if (!called) {
      that._next();
      callback.apply(null, arguments);
    // 过期了,所以执行抛错
    } else {
      // pass the outdated error
      if (err) {
        that.emit('outdated', err);
      }
    }
  };

  // 设置一个timer,来防止超时
  var timeout = that.options.timeout;
  if (timeout) {
    timer = setTimeout(function () {
      // set called as true
      called = true;
      that._next();
      // pass the exception
      var err = new Error(timeout + 'ms timeout');
      err.name = 'BagpipeTimeoutError';
      err.data = {
        name: method.name,
        method: method.toString(),
        args: args.slice(0, -1)
      };
      callback(err);
    }, timeout);
  }
  // 执行异步任务
  method.apply(null, args);
};

其实在bagpipe中维护一个异步并发任务的队列,来使得最大的并发数也小于limit的数。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,029评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,395评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,570评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,535评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,650评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,850评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,006评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,747评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,207评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,536评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,683评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,342评论 4 330
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,964评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,772评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,004评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,401评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,566评论 2 349

推荐阅读更多精彩内容