目的
一次添加几个异步处理,每次最多只能并行执行多少个。
思路
- 并发有一定的数量控制,所以后加的需要有一个队列存储剩余的异步处理。队列里面存储的是一个promise的resolve,通过await来阻断后面的异步执行。
- 怎么在控制每次添加一个返回的都是对应的promise。解决办法是通过async await来处理。async返回的是一个promise,当执行完异步后,执行
Promise.resolve()来返回结果。 - 每次执行完一个异步处理,都需要判断是否超过并发数量,从队列里面执行一个resolve,这样这个异步才可以接续执行,走try{}catch()。
code
class RequestDecorator {
constructor(num) {
this.limit = num || 2;
this.requestQueue = [];
this.currentConcurrent = 0;
}
async add(promiseFunc) {
if(this.currentConcurrent >= this.limit) {
await this.startBlocking();
}
try {
this.currentConcurrent++;
const result = await promiseFunc();
Promise.resolve(result)
} catch (error) {
Promise.reject(error)
} finally {
this.currentConcurrent --;
this.next();
}
}
startBlocking() {
let _resolve;
let promise2 = new Promise((resolve, reject) => _resolve = resolve);
this.requestQueue.push(_resolve);
return promise2;
}
next() {
if (this.requestQueue.length <= 0) return;
const _resolve = this.requestQueue.shift();
_resolve();
}
}
const timer = function (time) {
return () => new Promise(resolve => {
setTimeout(() => {
resolve()
}, time);
})
}
const decorator = new RequestDecorator(2);
function add(time, value) {
decorator.add(timer(time)).then(() => {
console.log(value);
})
}
add(1000, 1)
add(300, 2)
add(400, 3)
add(500, 4)
// 2,3,1,4
补充(2021-6-22)
function asyncPool(poolLimit, array, iteratorFn) {
let i = 0;
const ret = [];
const executing = [];
const enqueue = function() {
console.log('i:', i)
if (i === array.length) {
return Promise.resolve();
}
const item = array[i++];
const p = Promise.resolve().then(() => iteratorFn(item, array));
ret.push(p);
let r = Promise.resolve();
if (poolLimit <= array.length) {
const e = p.then(() => { console.log('从队列中删除此任务'); return executing.splice(executing.indexOf(e), 1)} );
if (executing.length >= poolLimit) {
r = Promise.race(executing);
}
}
console.log('ret', ret)
return r.then(() => { console.log('准备开始下一个'); return enqueue()});
};
return enqueue().then(() => { console.log('wait all'); return Promise.all(ret) });
}
const timeout = i => new Promise(resolve => { console.log('异步任务开始', i); setTimeout(() => {console.log('执行完任务', i); resolve(i) }, i); });
asyncPool(2, [1000, 5000, 3000, 2000], timeout).then(results => {
console.log('results', results)
});