目标
- 使用 async(https://github.com/caolan/async) 来控制并发连接数
新建文件 app.js
,当调用 node app.js
时,输出 CNode(https://cnodejs.org/ ) 社区首页的所有帖子标题和链接以及第一条评论,以 json 的格式返回。
输出示例:
[
{
title: '微信应用号在前端开发圈火了,而Docker其实早已火遍后端',
href: 'https: //cnodejs.org/topic/57e45421015b4f570e0d02df',
comment1: 'weapp跟Docker。。。这两者有关系吗。。。。'
},
{
title: '开发过微信签名算法的大大看过来。。。',
href: 'https: //cnodejs.org/topic/57e287a43af3942a3aa3b959',
comment1: '这个参数是微信加的用于跟踪之类而且前端wx.config注册一次以后就用不到签名了还是看你的业务逻辑吧'
}
]
注意:与上篇文章(使用 eventproxy 控制并发)不同,并发连接数需要控制在 5 个。
如果你亲自运行过上篇文章的代码,就会发现一个问题,有的返回值为空或者报错。这是因为 cnodejs.org 网站有并发连接数的限制,所以当请求发送太快的时候会导致返回值为空或报错。
注意:有的的网站有可能会因为你发出的并发连接数太多而当你是在恶意请求,把你的 IP 封掉。
我们在写爬虫的时候,如果有 1000 个链接要去爬,那么不可能同时发出 1000 个并发链接,我们需要控制一下并发的数量,比如并发 10 个就好,然后慢慢抓完这 1000 个链接。
用 async 来做这件事很简单。
这次我们要介绍的是 async 的常用的控制并发连接数的接口 queue(worker, concurrency)(http://caolan.github.io/async/docs.html#.queue) 。
什么时候用 eventproxy,什么时候使用 async 呢?它们不都是用来做异步流程控制的吗?
答案:
当你需要去多个源(一般是小于 10 个)汇总数据的时候,用 eventproxy 方便;当你需要用到队列,需要控制并发数,或者你喜欢函数式编程思维时,使用 async。大部分场景是前者,所以大部分时间是用 eventproxy 的。
最终版的代码:
var superagent = require('superagent');
var cheerio = require('cheerio');
var url = require('url');
var async = require('async');
var cnodeUrl = 'https://cnodejs.org/'
superagent.get(cnodeUrl)
.end(function(err, res) {
var topicUrls = [];
var $ = cheerio.load(res.text);
$('#topic_list .topic_title').each(function(index, element) {
var $element = $(element);
var href = url.resolve(cnodeUrl, $element.attr('href'));
topicUrls.push(href);
});
var data = [];
/**
* queue(worker, concurrency)
* queue 是一个串行的消息队列,通过限制了 worker 数量,不再一次性全部执行。
* 当 worker 数量不够用时,新加入的任务将会排队等候,直到有新的 worker 可用。
*
*/
// 定义一个 queue,设 worker 数量为 5
var q = async.queue(function(task, callback) {
var topicUrl = task.topicUrl;
superagent.get(topicUrl)
.end(function(err, res) {
var $ = cheerio.load(res.text);
var result = {
title: $('.topic_full_title').text().trim(),
href: topicUrl,
comment1: $('.reply_content').eq(0).text().trim()
};
callback(data.push(result));
});
}, 5);
/**
* 监听:当所有任务都执行完以后,将调用该函数
*/
q.drain = function() {
console.log('all tasks have been processed');
console.log(data);
};
/**
* 添加任务
*/
topicUrls.forEach(function(topicUrl) {
q.push({ topicUrl: topicUrl }, function(err) {
console.log('push finished ' + topicUrl);
});
});
});