js实现并发请求控制

在项目开发中,会经常遇到需要异步请求多个接口,获取到数据结果,但是一个个串行调太慢,思索着可以并发同时几个一起调,做个并发池,提高请求效率。
这个场景很经典,在日常开发中也很实用。3年前,面试字节的时候,也被问到类似的现场编程题。
2020年写了此文章,时隔3年,已经是个老鸟了,把之前的豆腐渣工程翻新一下。


功能说明

支持多个异步任务同时执行,等待全部执行完成后,直接返回执行结果

  • 支持限制并发池的大小
  • 支持全部任务执行成功后,进行回调处理
  • 支持执行过程中继续追加任务
  • 返回结果支持【按顺序取用】或者【按特定对象取用】

实现图解

方式一【推荐】

先取用任务进行执行,直到工作区占满了,当某个任务执行结束后,继续取用任务执行,直到任务区满了,则暂停取用。一直等待某个任务结束,重新取用任务执行。
按此循环操作,直到工作区正在执行的任务数为0,表示全部执行完毕。然后返回全部执行结果,执行回调函数。
期间,支持不断地往任务池追加任务。

interface ITask {
  fn: () => Promise<any>;
  key?: string | number;
}

interface IResults {
  [key: string | number]: any;
}
class TaskQueue {
  maxNum: number;
  running: number;
  queue: Array<ITask>;
  queueIndex: number;
  results: IResults;
  callback: null | Function;
  isPushOver: boolean;

  constructor(maxNum: number) {
    this.maxNum = maxNum; // 并发池数量大小
    this.running = 0; // 当前正在执行的池数
    this.queue = []; // 任务队列
    this.queueIndex = 0; // 当前进入执行的任务索引顺序
    this.results = {}; // 存储任务执行结果
    this.callback = null; // 回调函数
    this.isPushOver = false; // 任务是否追加完毕
  }

  // 追加任务,并执行
  pushTasks(tasks: Array<ITask>) {
    this.queue.push(...tasks);
    this.next();
  }

  // 通知任务追加完毕
  pushOver() {
    this.isPushOver = true;
    this.runOver();
  }

  // 任务全部执行完毕
  runOver() {
    if (
      typeof this.callback === "function" &&
      this.running == 0 &&
      this.isPushOver
    ) {
      this.callback.call(null, this.results);
    }
  }

  next() {
    while (this.running < this.maxNum && this.queue.length) {
      const task = this.queue.shift();
      // 标识当前任务索引,方便从 results 取用值
      const key = task?.key || this.queueIndex;
      this.queueIndex++;
      // 任务池被占用数量+1
      this.running++;
      // 任务临界判断合法性
      if (!task) {
        this.results[key] = null;
        this.running--;
        continue;
      }
      // 执行任务
      task
        .fn()
        .then((res: any) => {
          this.results[key] = res;
        })
        .catch((err: any) => {
          this.results[key] = err;
        })
        .finally(() => {
          this.running--;
          this.next();
        });
    }

    this.runOver();
  }
}

/**
 * 测试用例
 */

function run() {
  // 创建实例
  const queue = new TaskQueue(5);
  queue.callback = (result: any) => {
    console.log("asyncPool_1 ~ result:", result);
    console.log(result[1]); // 支持按顺序取用结果
    console.log(result.four); // 支持针对特殊任务取用结果
  };

  function buildTask(result: any, delay = 0) {
    return () =>
      new Promise((resolve) =>
        setTimeout(() => {
          console.log("正在执行任务", result);
          resolve(result);
        }, delay)
      );
  }

  const tasks = [
    { fn: buildTask(1, 100) },
    { fn: buildTask(2, 200) },
    { fn: buildTask(3, 300) },
    { fn: buildTask(4, 100), key: "four" }, // key
  ];
  queue.pushTasks(tasks);
  setTimeout(() => {
    console.log("再追加一个任务");
    queue.pushTasks([{ fn: buildTask(5, 100), key: 5 }]);
  }, 500);

  setTimeout(() => {
    console.log("通知追加结束");
    queue.pushOver();
  }, 700);
}

run();
测试执行结果
  1. 支持中途追加任务
  2. 支持全部执行完成后,返回执行结果,并执行回调函数
  3. 支持对执行结果按顺序取用 或 按需取用

方式二

假设支持最大并发执行的任务数是5个,先创建5个执行工作线,每个工作线开始取任务并执行。当某个工作线的任务执行完毕,则再次从任务池中取任务。直到某个工作线取不到任务时,表示全部执行完毕,进行执行回调方法。

interface ITask {
  fn: () => Promise<any>;
  key?: string | number;
}

interface IResults {
  [key: string | number]: any;
}
function asyncPool_2(tasks: Array<ITask>, max: number, callback: Function) {
  let result: IResults = {};
  let taskIndex = 0;
  Promise.all(
    Array.from({ length: max }).map(() => {
      return new Promise((resolve) => {
        function runTask() {
          if (tasks.length <= 0) {
            resolve(null);
            return;
          }
          const task = tasks.shift();
          const key = task?.key || taskIndex;
          if (!task?.fn) {
            result[key] = null;
            taskIndex++;
            runTask();
            return;
          }
          task.fn().then((res) => {
            result[key] = res;
            runTask();
          });
          taskIndex++;
        }
        runTask();
      });
    })
  ).then(() => callback(result));
}
/**
 * 测试用例
 */
function run() {
  function buildTask(result: any, delay = 0) {
    return () =>
      new Promise((resolve) =>
        setTimeout(() => {
          console.log("正在执行任务", result);
          resolve(result);
        }, delay)
      );
  }

  const tasks = [
    { fn: buildTask(1, 100) },
    { fn: buildTask(2, 200) },
    { fn: buildTask(3, 300) },
    { fn: buildTask(4, 100), key: "four" }, // key
  ];
  asyncPool_2(tasks, 5, (result: any) => {
    console.log("asyncPool_2 ~ result:", result);
    console.log(result[1]); // 支持按顺序取用结果
    console.log(result.four); // 支持针对特殊任务取用结果
  });
}

run();
测试用例执行结果

代码逻辑简洁一些,适用于一些相对简单的应用场景,需要一开始确定好执行哪些任务。

  1. 支持传入任务数组,虽能中途追加任务,但是可能存在时机问题
  2. 支持全部执行完成后,返回结果,并执行回调函数
  3. 支持对执行结果按顺序取用 或 按需取用

方式三

npm中有挺多第三方包,比如 async-pooles-promise-poolp-limit等,但是实际使用起来还挺麻烦,挑了使用比较多的async-pool进行重写。
其中,具体实现原理可以查看Promise.all并发限制文章(这边文章提供的代码是存在问题的,但是原理讲得挺清楚的)。
基于这篇文章提供的思路,对代码进行改写,具体如下

/**
 * promise并发限制调用
 * @param {object[]} data - 调用的数据列表
 * @param {number} maxLimit - 并发调用限制个数
 * @param {function} iteratee - 处理单个节点的方法
 * @returns {promise}
 */
export function promiseLimitPool({ data = [], maxLimit = 3, iteratee = () => {} } = {}, callback=()=>{}) {
  const executing = [];
  const enqueue = (index = 0) => {
    // 边界处理
    if (index === data.length) {
      return Promise.all(executing);
    }
    // 每次调用enqueue, 初始化一个promise
    const item = data[index];

    function itemPromise(index) {
      const promise = new Promise(async (resolve) => {
        // 处理单个节点
        await iteratee({ index, item: cloneDeep(item), data: cloneDeep(data) });
        resolve(index);
      }).then(() => {
        // 执行结束,从executing删除自身
        const delIndex = executing.indexOf(promise);
        delIndex > -1 && executing.splice(delIndex, 1);
      });
      return promise;
    }
    // 插入executing数字,表示正在执行的promise
    executing.push(itemPromise(index));

    // 使用Promise.rece,每当executing数组中promise数量低于maxLimit,就实例化新的promise并执行
    let race = Promise.resolve();

    if (executing.length >= maxLimit) {
      race = Promise.race(executing);
    }

    // 递归,直到遍历完
    return race.then(() => enqueue(index + 1));
  };

  return enqueue();
}

// 示例
 promiseLimitPool({
      data: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20],
      maxLimit: 2,
      iteratee: async ({ item }) => {
        console.log('onClick -> item', item);
        await Axios({
          method: 'get',
          url: `API接口地址`,
          params: { page: 0, size: 9 },
        });
      },
 });

缺点:没有提供全部成功后的回调函数(当然,这个也支持扩展);代码逻辑不是很简约,代码有点绕...

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,752评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,100评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,244评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,099评论 1 286
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,210评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,307评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,346评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,133评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,546评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,849评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,019评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,702评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,331评论 3 319
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,030评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,260评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,871评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,898评论 2 351

推荐阅读更多精彩内容