在项目开发中,会经常遇到需要异步请求多个接口,获取到数据结果,但是一个个串行调太慢,思索着可以并发同时几个一起调,做个并发池,提高请求效率。
这个场景很经典,在日常开发中也很实用。3年前,面试字节的时候,也被问到类似的现场编程题。
2020年写了此文章,时隔3年,已经是个老鸟了,把之前的豆腐渣工程翻新一下。
功能说明
支持多个异步任务同时执行,等待全部执行完成后,直接返回执行结果
- 支持限制并发池的大小
- 支持全部任务执行成功后,进行回调处理
- 支持执行过程中继续追加任务
- 返回结果支持【按顺序取用】或者【按特定对象取用】
实现图解
方式一【推荐】
先取用任务进行执行,直到工作区占满了,当某个任务执行结束后,继续取用任务执行,直到任务区满了,则暂停取用。一直等待某个任务结束,重新取用任务执行。
按此循环操作,直到工作区正在执行的任务数为0,表示全部执行完毕。然后返回全部执行结果,执行回调函数。
期间,支持不断地往任务池追加任务。
interface ITask {
fn: () => Promise<any>;
key?: string | number;
}
interface IResults {
[key: string | number]: any;
}
class TaskQueue {
maxNum: number;
running: number;
queue: Array<ITask>;
queueIndex: number;
results: IResults;
callback: null | Function;
isPushOver: boolean;
constructor(maxNum: number) {
this.maxNum = maxNum; // 并发池数量大小
this.running = 0; // 当前正在执行的池数
this.queue = []; // 任务队列
this.queueIndex = 0; // 当前进入执行的任务索引顺序
this.results = {}; // 存储任务执行结果
this.callback = null; // 回调函数
this.isPushOver = false; // 任务是否追加完毕
}
// 追加任务,并执行
pushTasks(tasks: Array<ITask>) {
this.queue.push(...tasks);
this.next();
}
// 通知任务追加完毕
pushOver() {
this.isPushOver = true;
this.runOver();
}
// 任务全部执行完毕
runOver() {
if (
typeof this.callback === "function" &&
this.running == 0 &&
this.isPushOver
) {
this.callback.call(null, this.results);
}
}
next() {
while (this.running < this.maxNum && this.queue.length) {
const task = this.queue.shift();
// 标识当前任务索引,方便从 results 取用值
const key = task?.key || this.queueIndex;
this.queueIndex++;
// 任务池被占用数量+1
this.running++;
// 任务临界判断合法性
if (!task) {
this.results[key] = null;
this.running--;
continue;
}
// 执行任务
task
.fn()
.then((res: any) => {
this.results[key] = res;
})
.catch((err: any) => {
this.results[key] = err;
})
.finally(() => {
this.running--;
this.next();
});
}
this.runOver();
}
}
/**
* 测试用例
*/
function run() {
// 创建实例
const queue = new TaskQueue(5);
queue.callback = (result: any) => {
console.log("asyncPool_1 ~ result:", result);
console.log(result[1]); // 支持按顺序取用结果
console.log(result.four); // 支持针对特殊任务取用结果
};
function buildTask(result: any, delay = 0) {
return () =>
new Promise((resolve) =>
setTimeout(() => {
console.log("正在执行任务", result);
resolve(result);
}, delay)
);
}
const tasks = [
{ fn: buildTask(1, 100) },
{ fn: buildTask(2, 200) },
{ fn: buildTask(3, 300) },
{ fn: buildTask(4, 100), key: "four" }, // key
];
queue.pushTasks(tasks);
setTimeout(() => {
console.log("再追加一个任务");
queue.pushTasks([{ fn: buildTask(5, 100), key: 5 }]);
}, 500);
setTimeout(() => {
console.log("通知追加结束");
queue.pushOver();
}, 700);
}
run();
- 支持中途追加任务
- 支持全部执行完成后,返回执行结果,并执行回调函数
- 支持对执行结果按顺序取用 或 按需取用
方式二
假设支持最大并发执行的任务数是5个,先创建5个执行工作线,每个工作线开始取任务并执行。当某个工作线的任务执行完毕,则再次从任务池中取任务。直到某个工作线取不到任务时,表示全部执行完毕,进行执行回调方法。
interface ITask {
fn: () => Promise<any>;
key?: string | number;
}
interface IResults {
[key: string | number]: any;
}
function asyncPool_2(tasks: Array<ITask>, max: number, callback: Function) {
let result: IResults = {};
let taskIndex = 0;
Promise.all(
Array.from({ length: max }).map(() => {
return new Promise((resolve) => {
function runTask() {
if (tasks.length <= 0) {
resolve(null);
return;
}
const task = tasks.shift();
const key = task?.key || taskIndex;
if (!task?.fn) {
result[key] = null;
taskIndex++;
runTask();
return;
}
task.fn().then((res) => {
result[key] = res;
runTask();
});
taskIndex++;
}
runTask();
});
})
).then(() => callback(result));
}
/**
* 测试用例
*/
function run() {
function buildTask(result: any, delay = 0) {
return () =>
new Promise((resolve) =>
setTimeout(() => {
console.log("正在执行任务", result);
resolve(result);
}, delay)
);
}
const tasks = [
{ fn: buildTask(1, 100) },
{ fn: buildTask(2, 200) },
{ fn: buildTask(3, 300) },
{ fn: buildTask(4, 100), key: "four" }, // key
];
asyncPool_2(tasks, 5, (result: any) => {
console.log("asyncPool_2 ~ result:", result);
console.log(result[1]); // 支持按顺序取用结果
console.log(result.four); // 支持针对特殊任务取用结果
});
}
run();
代码逻辑简洁一些,适用于一些相对简单的应用场景,需要一开始确定好执行哪些任务。
- 支持传入任务数组,虽能中途追加任务,但是可能存在时机问题
- 支持全部执行完成后,返回结果,并执行回调函数
- 支持对执行结果按顺序取用 或 按需取用
方式三
npm中有挺多第三方包,比如 async-pool、es-promise-pool、p-limit等,但是实际使用起来还挺麻烦,挑了使用比较多的async-pool
进行重写。
其中,具体实现原理可以查看Promise.all并发限制文章(这边文章提供的代码是存在问题的,但是原理讲得挺清楚的)。
基于这篇文章提供的思路,对代码进行改写,具体如下
/**
* promise并发限制调用
* @param {object[]} data - 调用的数据列表
* @param {number} maxLimit - 并发调用限制个数
* @param {function} iteratee - 处理单个节点的方法
* @returns {promise}
*/
export function promiseLimitPool({ data = [], maxLimit = 3, iteratee = () => {} } = {}, callback=()=>{}) {
const executing = [];
const enqueue = (index = 0) => {
// 边界处理
if (index === data.length) {
return Promise.all(executing);
}
// 每次调用enqueue, 初始化一个promise
const item = data[index];
function itemPromise(index) {
const promise = new Promise(async (resolve) => {
// 处理单个节点
await iteratee({ index, item: cloneDeep(item), data: cloneDeep(data) });
resolve(index);
}).then(() => {
// 执行结束,从executing删除自身
const delIndex = executing.indexOf(promise);
delIndex > -1 && executing.splice(delIndex, 1);
});
return promise;
}
// 插入executing数字,表示正在执行的promise
executing.push(itemPromise(index));
// 使用Promise.rece,每当executing数组中promise数量低于maxLimit,就实例化新的promise并执行
let race = Promise.resolve();
if (executing.length >= maxLimit) {
race = Promise.race(executing);
}
// 递归,直到遍历完
return race.then(() => enqueue(index + 1));
};
return enqueue();
}
// 示例
promiseLimitPool({
data: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20],
maxLimit: 2,
iteratee: async ({ item }) => {
console.log('onClick -> item', item);
await Axios({
method: 'get',
url: `API接口地址`,
params: { page: 0, size: 9 },
});
},
});
缺点:没有提供全部成功后的回调函数(当然,这个也支持扩展);代码逻辑不是很简约,代码有点绕...