Skip to content

实现Promise并发控制

方式一

js
async function asyncPool(limit, fns) {
  const res = []
  const exec = []

  for (const item of fns) {
    const p = Promise.resolve().then(() => item())

    res.push(p)

    if (limit <= fns.length) {
      const execute = p.then(() => {
        exec.splice(exec.indexOf(execute), 1)
      })
      exec.push(p)
      if (exec.length >= limit) {
        await Promise.race(exec)
      }
    }
  }

  return Promise.all(res)
}
async function asyncPool(limit, fns) {
  const res = []
  const exec = []

  for (const item of fns) {
    const p = Promise.resolve().then(() => item())

    res.push(p)

    if (limit <= fns.length) {
      const execute = p.then(() => {
        exec.splice(exec.indexOf(execute), 1)
      })
      exec.push(p)
      if (exec.length >= limit) {
        await Promise.race(exec)
      }
    }
  }

  return Promise.all(res)
}
js
// 示例测试用例
async function runTest() {
  const taskLimit = 3; // 最大并发数
  const tasks = [
    async () => {
      await sleep(1000); // 模拟异步任务
      console.log('执行完成 1');
      return 1;
    },
    async () => {
      await sleep(1000); // 模拟异步任务
      console.log('执行完成 2');
      return 2;
    },
    async () => {
      await sleep(1000); // 模拟异步任务
      console.log('执行完成 3');
      return 3;
    },
    async () => {
      await sleep(500); // 模拟异步任务
      console.log('执行完成 4');
      return 4;
    },
    async () => {
      await sleep(2000); // 模拟异步任务
      console.log('执行完成 5');
      return 5;
    },
    async () => {
      await sleep(1000); // 模拟异步任务
      console.log('执行完成 6');
      return 6;
    },
    async () => {
      await sleep(3000); // 模拟异步任务
      console.log('执行完成 7');
      return 7;
    },
    async () => {
      await sleep(500); // 模拟异步任务
      console.log('执行完成 8');
      return 8;
    },
  ];

  console.log(`Running asyncPool with a limit of ${taskLimit}`);
  const results = await asyncPool(taskLimit, tasks);
  console.log(`Results: ${results}`);
}

// 辅助函数,模拟异步任务
function sleep(ms) {
  return new Promise((resolve) => setTimeout(resolve, ms));
}

// 运行测试用例
runTest().then(() => {
  console.log('执行完毕');
}).catch(console.error);
// 示例测试用例
async function runTest() {
  const taskLimit = 3; // 最大并发数
  const tasks = [
    async () => {
      await sleep(1000); // 模拟异步任务
      console.log('执行完成 1');
      return 1;
    },
    async () => {
      await sleep(1000); // 模拟异步任务
      console.log('执行完成 2');
      return 2;
    },
    async () => {
      await sleep(1000); // 模拟异步任务
      console.log('执行完成 3');
      return 3;
    },
    async () => {
      await sleep(500); // 模拟异步任务
      console.log('执行完成 4');
      return 4;
    },
    async () => {
      await sleep(2000); // 模拟异步任务
      console.log('执行完成 5');
      return 5;
    },
    async () => {
      await sleep(1000); // 模拟异步任务
      console.log('执行完成 6');
      return 6;
    },
    async () => {
      await sleep(3000); // 模拟异步任务
      console.log('执行完成 7');
      return 7;
    },
    async () => {
      await sleep(500); // 模拟异步任务
      console.log('执行完成 8');
      return 8;
    },
  ];

  console.log(`Running asyncPool with a limit of ${taskLimit}`);
  const results = await asyncPool(taskLimit, tasks);
  console.log(`Results: ${results}`);
}

// 辅助函数,模拟异步任务
function sleep(ms) {
  return new Promise((resolve) => setTimeout(resolve, ms));
}

// 运行测试用例
runTest().then(() => {
  console.log('执行完毕');
}).catch(console.error);

方式二

js
function asyncPool(fn, arr, limit = 10) {
  let args = [...arr]   //不修改原参数数组
  let results = []      //存放最终结果
  let runningCount = 0  //正在运行的数量
  let resultIndex = 0   //结果的下标,用于控制结果的顺序
  let resultCount = 0   //结果的数量

  return new Promise((resolve) => {
    function run() {
      while(runningCount < limit && args.length > 0) {
        runningCount++
        ((i)=> {        //闭包用于保存结果下标,便于在resolve时把结果放到合适的位置
          let v = args.shift()
          console.log('正在运行' + runningCount)
          fn(v).then(val => {
            results[i] = val
          }, () => {
            throw new Error(`An error occurred: ${v}`)
          }).finally(() => {
            runningCount--
            resultCount++
            if(resultCount === arr.length) {  //这里之所以用resultCount做判断,而不用results的长度和args的长度,是因为这两个都不准确
              resolve(results)
            } else {
              run()
            }
          })          
        })(resultIndex++)
      }
    }
    run()
  })
}
function asyncPool(fn, arr, limit = 10) {
  let args = [...arr]   //不修改原参数数组
  let results = []      //存放最终结果
  let runningCount = 0  //正在运行的数量
  let resultIndex = 0   //结果的下标,用于控制结果的顺序
  let resultCount = 0   //结果的数量

  return new Promise((resolve) => {
    function run() {
      while(runningCount < limit && args.length > 0) {
        runningCount++
        ((i)=> {        //闭包用于保存结果下标,便于在resolve时把结果放到合适的位置
          let v = args.shift()
          console.log('正在运行' + runningCount)
          fn(v).then(val => {
            results[i] = val
          }, () => {
            throw new Error(`An error occurred: ${v}`)
          }).finally(() => {
            runningCount--
            resultCount++
            if(resultCount === arr.length) {  //这里之所以用resultCount做判断,而不用results的长度和args的长度,是因为这两个都不准确
              resolve(results)
            } else {
              run()
            }
          })          
        })(resultIndex++)
      }
    }
    run()
  })
}
js
//测试
function getWeather(city) {
  console.log(`开始获取${city}的天气`)
  return fetch(`https://api2.jirengu.com/getWeather.php?city=${city}`).then(res=> res.json())
}

let citys = ['北京', '上海', '杭州', '成都', '武汉', '天津', '深圳', '广州', '合肥', '郑州']
asyncPool(getWeather, citys, 2).then(results => console.log(results))
//测试
function getWeather(city) {
  console.log(`开始获取${city}的天气`)
  return fetch(`https://api2.jirengu.com/getWeather.php?city=${city}`).then(res=> res.json())
}

let citys = ['北京', '上海', '杭州', '成都', '武汉', '天津', '深圳', '广州', '合肥', '郑州']
asyncPool(getWeather, citys, 2).then(results => console.log(results))