node 异步编程

高阶函数:把函数参数作为参数,或作为返回值

偏函数: 将传入参数作为判断或者其他逻辑条件

注意点

异常处理

异步I/O有两个处理阶段,中间有事件循环调度,异步方法在提交请求后就返回了,try/catch只能捕获当次事件循环内的异常,不能捕获callback的异常

try {
    req.body = JSON.parse(buf, options.reviver);
    //callback()不能放在这,若callback异常,会导致它多次执行
} catch (err) {
    err.body = buf;
    err.status = 400;
    return callback(err);
}
callback();

阻塞代码

这么写会持续占用CPU资源,破坏事件循环的调度,因为Node是单线程,会导致其余请求得不到响应

    // TODO
var start = new Date();
while (new Date() - start < 1000) {
    // TODO 
}
   // 需要阻塞的代码

异步编程解决方案

事件

Node中事件的发布通常是伴随事件循环而异步触发的

  • 事件与侦听器是多对多的,设置过多的侦听器会导致过多的CPU占用
  • 要给EventEmitter对象添加error事件和处理,若触发error事件不处理,会引起线程退出\
  • 使用once()添加的侦听器只会执行一次,在执行后就会将相关的事件移除

使用事件的途径

  • 实例化events
var events =require(‘events’);
var emitter = new events.EventEmitter();

emitter.on('do',function(value){console.log(value)});
emitter.emit('do','doing');

  • 继承events模块
var util = require('util')
var events = require('events');
function Stream() {
    events.EventEmitter.call(this);
}
util.inherits(Stream, events.EventEmitter);
Stream.prototype.test = function() {
    var self = this;
    try {
        throw 111;
    } catch (error) {
        self.emit('error')
    }
}
var stream = new Stream();
stream.on('error', function() {
    console.error('asdasd')
});
stream.test();

利用Once()解决雪崩问题
  • 在高访问量、大并发量的情况下缓存突然失效, 大量的请求同时涌入数据库中,数据库无法承受,影响网站速度
  • 在缓存中无数据时,访问量大的话,同条SQL可能会被执行多次,此时可以将所有请求放入事件队列,利用once()来绑定,SQL在执行时,新到的相同调用只需在队列中等待数据,一旦查询结束,得到的结果可以被这些调用共同使用
var proxy = new events.EventEmitter();
var status = "ready";
var select = function(callback) {
    proxy.once("selected", callback);
    if (status === "ready") {
        status = "pending";
        db.select("SQL", function(results) {
            proxy.emit("selected", results);
            status = "ready";
        });
    }
};

可能会因为侦听器过多引发警号,调用setMaxListeners(0)移除或设置大的阈值

EventProxy 处理多事件对一侦听器,适用于实例化事件
  • all() 当每个事件都被触发了才会执行,只执行一次
var proxy = new EventProxy();
proxy.all("template", "data", "resources", function(template, data, resources) { // TODO
});
  • tail() 首次也是需要每个事件都被触发,之后只要某个事件触发,就会用最新的数据执行
  • after() 事件执行多少次后执行侦听器的单一事件组合订阅
var proxy = new EventProxy();
proxy.after("data", 10, function(datas) { // TODO
});

  • not()
  • any()

EventProxy对异常处理的优化

  • fail()
ep.fail(callback);
//等价于
ep.fail(function(err) {
    callback(err);
});
//又等价于 
ep.bind('error', function(err) {
    // 卸载所有处理函数 
    ep.unbind();
    // 异常回调
    callback(err);
});

  • done()
ep.done('tpl');
//等价于
function(err, content) {
    if (err) {
        // 发生异常,交给error事件处理函数处理
        return ep.emit('error', err);
    }
    ep.emit('tpl', content);
}

done()接受函数参数

ep.done(function(content) { 
     // TODO
    // 无需考虑异常 
    ep.emit('tpl', content);
});

//等价于
function(err, content) {
    if (err) {
        //   发生异常 给error事件的处理函数处理
        return ep.emit('error', err);
    }
    (function(content) {
        // TODO
        // 无需考虑异常 
        ep.emit('tpl', content);
    }(content));
}

代码对比

exports.getContent = function(callback) {
    var ep = new EventProxy();
    ep.all('tpl', function(tpl, data) {
        // 成功回调
        callback(null, {
            template: tpl,
            data: data
        });
    });
    // 帧听error事件 
    ep.bind('error', function(err) {
        // 卸载 所有处理函数 
        ep.unbind();
        // 异常回调 
        callback(err);
    });
    fs.readFile('template.tpl', 'utf-8', function(err, content) {
        if (err) {
            //   发生异常,给error事件的处理函数处理 
            return ep.emit('error', err);
        }
        ep.emit('tpl', content);
    });
};


exports.getContent = function(callback) {
    var ep = new EventProxy();
    ep.all('tpl', function(tpl, data) { 
    // 成功回调
        callback(null, {
            template: tpl,
            data: data
        });
    }); 
    //绑定错误处理函数 
    ep.fail(callback);
    fs.readFile('template.tpl', 'utf-8', ep.done('tpl'));
};

Promise/Deferred(应用参见:es6 Promise)

  • 先执行异步调用,后传递处理方法
  • then()方法只接受function对象,继续返回promise()对象,可选progress事件传入
  • Promise是高级接口,依靠低级接口事件来实现
//then
var Promise = function() {
    EventEmitter.call(this);
};
util.inherits(Promise, EventEmitter);
Promise.prototype.then = function(fulfilledHandler, errorHandler, progressHandler) {
    if (typeof fulfilledHandler === 'function') {
        //  用once()方法  保证成功回调只执行一次  
        this.once('success', fulfilledHandler);
    }
    if (typeof errorHandler === 'function') {
        //  用once()方法  保证成功回调只执行一次   
        this.once('error', errorHandler);
    }
    if (typeof progressHandler === 'function') {
        this.on('progress', progressHandler);
    }
    return this;
};
//实现这些功能的对象被称为Deferred,即延迟对象 
var Deferred = function() {
    this.state = 'unfulfilled';
    this.promise = new Promise();
};
Deferred.prototype.resolve = function(obj) {
    this.state = 'fulfilled';
    this.promise.emit('success', obj);
};
Deferred.prototype.reject = function(err) {
    this.state = 'failed';
    this.promise.emit('error', err);
};
Deferred.prototype.progress = function(data) {
    this.promise.emit('progress', data);
};

对res改造成promise

Deferred用于内部,维护异步模型的状态,Promise作用于外部,通过then()暴露给外部以添加自定义逻辑

var promisify = function(res) {
    var deferred = new Deferred();
    var result = '';
    res.on('data', function(chunk) {
        result += chunk;
        deferred.progress(chunk);
    });
    res.on('end', function() {
        deferred.resolve(result);
    });
    res.on('error', function(err) {
        deferred.reject(err);
    });
    return deferred.promise;//更改内部状态的行为由定义者处理  
};
//then调用的是promise
promisify(res).then(function() {
    // Done
}, function(err) {
    // Error
}, function(chunk) {
    // progress
    console.log('BODY: ' + chunk);
});

Promise多异步协作,all()

Deferred.prototype.all = function(promises) {
    var count = promises.length;
    var that = this;
    var results = [];
    promises.forEach(function(promise, i) {
        promise.then(function(data) {
            count--;
            results[i] = data;
            if (count === 0) {
                that.resolve(results);
            }
        }, function(err) {
            that.reject(err);
        });
    });
    return this.promise;
}; 

//all()返回resolve()结果集

Promise链式调用

  • 前一个的调用的结果,作为下一个调用的开始,后一个then的回调函数会等待前一个promise的状态变化而运行

将API Promise化

// smooth(fs.readFile);
var smooth = function(method) {
    return function() {
        var deferred = new Deferred();
        var args = Array.prototype.slice.call(arguments, 1); //跳过第一个参数
        args.push(deferred.callback());
        method.apply(null, args);
        return deferred.promise;
    };
};

bluebrid的 promisify可以将api promise化

var Promise = require('bluebird')
fs.readFileAsync = Promise.promisify(fs.readFie, fs)

var Promise = require('bluebird')
Promise.promisifyAll(fs)

async

串行执行:series()

async.series([
    function(callback) {
        fs.readFile('file1.txt', 'utf-8', callback);
    },
    function(callback) {
        fs.readFile('file2.txt', 'utf-8', callback);
    }
], function(err, results) {
    // results => [file1.txt, file2.txt] 
});

传入的callback()不是由使用者指定,callback()执行时将结果保存然后执行下一个调用,最终的回调函数执行时,保存的结果以数组传入,一旦异常结束所有调用,将异常传递给最终函数的第一个参数

并行执行:parallel()

async.parallel([function(callback) {
        fs.readFile('file1.txt', 'utf-8', callback);
    },
    function(callback) {
        fs.readFile('file2.txt', 'utf-8', callback);
    }
], function(err, results) {
    // results => [file1.txt, file2.txt] 
});

//等价于
var counter = 2;
var results = [];
var done = function(index, value) {
    results[index] = value;
    counter--;
    if (counter === 0) {
        callback(null, results);
    }
};
// 只传递第一个异常
var hasErr = false;
var fail = function(err) {
    if (!hasErr) {
        hasErr = true;
        callback(err);
    }
};
fs.readFile('file1.txt', 'utf-8', function(err, content) {
    if (err) {
        return fail(err);
    }
    done(0, content);
});
fs.readFile('file2.txt', 'utf-8', function(err, data) {
    if (err) {
        return fail(err);
    }
    done(1, data);
});

一旦异步调用异常,,就将异常作为第一个参数传给最终回调函数,结果为数组

异步调用 依赖:当前一个的结果是后一个调用的输入 waterfall()

async.waterfall([function(callback) {
        fs.readFile('file1.txt', 'utf-8', function(err, content) {
            callback(err, content);
        });
    },
    function(arg1, callback) {
        // arg1 => file2.txt
        fs.readFile(arg1, 'utf-8', function(err, content) {
            callback(err, content);
        });
    },
    function(arg1, callback) {
        // arg1 => file3.txt
        fs.readFile(arg1, 'utf-8', function(err, content) {
            callback(err, content);
        });
    }
], function(err, result) {
    // result => file4.txt 
});

自动依赖处理 auto()

{
    readConfig: function() {}, //读取配置
    connectMongoDB: function() {},//连接mongo
    connectRedis: function() {},//连接redis
    complieAsserts: function() {},//编译静态
    uploadAsserts: function() {},//上传静态到cdn
    startup: function() {}//启动
}

var deps = {
    readConfig: function(callback) {
        // read config file
        callback();
    },
    connectMongoDB: ['readConfig', function(callback) { 
    // connect to mongodb
        callback();
    }],
    connectRedis: ['readConfig', function(callback) {
        // connect to redis callback();
        图灵社区会员 Eric Liu(guangqiang.dev @gmail.com) 专享 尊重版权
    }],
    complieAsserts: function(callback) {
        // complie asserts
        callback();
    },
    uploadAsserts: ['complieAsserts', function(callback) {
        // upload to assert 2 callback();
    }],
    startup: ['connectMongoDB', 'connectRedis', 'uploadAsserts', function(callback) {
        // startup
    }] 
};

//auto根据依赖关系自动分析
async.auto(deps);

Step

串行

Step(
    function readFile1() {
        fs.readFile('file1.txt', 'utf-8', this);
    },
    function readFile2(err, content) {
        fs.readFile('file2.txt', 'utf-8', this);
    },
    function done(err, content) {
        console.log(content);
    }
);

this是Step内部的一个next()方法,将调用结果作为下一个任务的参数并调用

并行 parallel()

Step(
    function readFile1() {
        fs.readFile('file1.txt', 'utf-8', this.parallel());
        fs.readFile('file2.txt', 'utf-8', this.parallel());
    },
    function done(err, content1, content2) {
        // content1 => file1
        // content2 => file2 
        console.log(arguments);
    });

如果异步方法传回结果为多个参数,step只取前两个。parallel()原理是每次执行时将内部计数器加1,

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,047评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,807评论 3 386
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,501评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,839评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,951评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,117评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,188评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,929评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,372评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,679评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,837评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,536评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,168评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,886评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,129评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,665评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,739评论 2 351

推荐阅读更多精彩内容