沐鳴平台網站_如何利用 JavaScript 實現併發控制

一、前言

在開發過程中,有時會遇到需要控制任務併發執行數量的需求。

例如一個爬蟲程序,可以通過限制其併發任務數量來降低請求頻率,從而避免由於請求過於頻繁被封禁問題的發生。

接下來,本文介紹如何實現一個併發控制器。

二、示例

  const task = timeout => new Promise((resolve) => setTimeout(() => {
    resolve(timeout);
  }, timeout))

  const taskList = [1000, 3000, 200, 1300, 800, 2000];

  async function startNoConcurrentControl() {
    console.time(NO_CONCURRENT_CONTROL_LOG);
    await Promise.all(taskList.map(item => task(item)));
    console.timeEnd(NO_CONCURRENT_CONTROL_LOG);
  }

  startNoConcurrentControl();

上述示例代碼利用 Promise.all 方法模擬6個任務併發執行的場景,執行完所有任務的總耗時為 3000 毫秒。

下面會採用該示例來驗證實現方法的正確性。

三、實現

由於任務併發執行的數量是有限的,那麼就需要一種數據結構來管理不斷產生的任務。

隊列的「先進先出」特性可以保證任務併發執行的順序,在 JavaScript 中可以通過「數組來模擬隊列」

  class Queue {
    constructor() {
      this._queue = [];
    }

    push(value) {
      return this._queue.push(value);
    }

    shift() {
      return this._queue.shift();
    }

    isEmpty() {
      return this._queue.length === 0;
    }
  }

對於每一個任務,需要管理其執行函數和參數:

  class DelayedTask {
    constructor(resolve, fn, args) {
      this.resolve = resolve;
      this.fn = fn;
      this.args = args;
    }
  }

接下來實現核心的 TaskPool 類,該類主要用來控制任務的執行:

  class TaskPool {
    constructor(size) {
      this.size = size;
      this.queue = new Queue();
    }

    addTask(fn, args) {
      return new Promise((resolve) => {
        this.queue.push(new DelayedTask(resolve, fn, args));
        if (this.size) {
          this.size--;
          const { resolve: taskResole, fn, args } = this.queue.shift();
          taskResole(this.runTask(fn, args));
        }
      })
    }

    pullTask() {
      if (this.queue.isEmpty()) {
        return;
      }

      if (this.size === 0) {
        return;
      }

      this.size++;
      const { resolve, fn, args } = this.queue.shift();
      resolve(this.runTask(fn, args));
    }

    runTask(fn, args) {
      const result = Promise.resolve(fn(...args));

      result.then(() => {
        this.size--;
        this.pullTask();
      }).catch(() => {
        this.size--;
        this.pullTask();
      })

      return result;
    }
  }

TaskPool 包含三個關鍵方法:

addTask: 將新的任務放入隊列當中,並觸發任務池狀態檢測,如果當前任務池非滿載狀態,則從隊列中取出任務放入任務池中執行。

runTask: 執行當前任務,任務執行完成之後,更新任務池狀態,此時觸發主動拉取新任務的機制。

pullTask: 如果當前隊列不為空,且任務池不滿載,則主動取出隊列中的任務執行。

接下來,將前面示例的併發數控製為2個:

  const cc = new ConcurrentControl(2);

  async function startConcurrentControl() {
    console.time(CONCURRENT_CONTROL_LOG);
    await Promise.all(taskList.map(item => cc.addTask(task, [item])))
    console.timeEnd(CONCURRENT_CONTROL_LOG);
  }

  startConcurrentControl();

執行流程如下:

最終執行任務的總耗時為 5000 毫秒。

四、高階函數優化參數傳遞

  await Promise.all(taskList.map(item => cc.addTask(task, [item])))

手動傳遞每個任務的參數的方式顯得非常繁瑣,這裏可以通過「高階函數實現參數的自動透傳」

  addTask(fn) {
    return (...args) => {
      return new Promise((resolve) => {
        this.queue.push(new DelayedTask(resolve, fn, args));

        if (this.size) {
          this.size--;
          const { resolve: taskResole, fn: taskFn, args: taskArgs } = this.queue.shift();
          taskResole(this.runTask(taskFn, taskArgs));
        }
      })
    }
  }

改造之後的代碼顯得簡潔了很多:

  await Promise.all(taskList.map(cc.addTask(task)))

五、優化出隊操作

數組一般都是基於一塊「連續內存」來存儲,當調用數組的 shift 方法時,首先是刪除頭部元素(時間複雜度 O(1)),然後需要將未刪除元素左移一位(時間複雜度 O(n)),所以 shift 操作的時間複雜度為 O(n)。

由於 JavaScript 語言的特性,V8 在實現 jsArray 的時候給出了一種空間和時間權衡的解決方案,在不同的場景下,jsArray 會在 FixedArray 和 HashTable 兩種模式間切換。

在 hashTable 模式下,shift 操作省去了左移的時間複雜度,其時間複雜度可以降低為 O(1),即使如此,shift 仍然是一個耗時的操作。

在數組元素比較多且需要頻繁執行 shift 操作的場景下,可以通過 「reverse + pop」 的方式優化。

  const Benchmark = require('benchmark');
  const suite = new Benchmark.Suite;

  suite.add('shift', function() {
    let count = 10;
    const arr = generateArray(count);
    while (count--) {
      arr.shift();
    }
  })
  .add('reverse + pop', function() {
    let count = 10;
    const arr = generateArray(count);
    arr.reverse();
    while (count--) {
      arr.pop();
    }
  })
  .on('cycle', function(event) {
    console.log(String(event.target));
  })
  .on('complete', function() {
    console.log('Fastest is ' + this.filter('fastest').map('name'));
    console.log('\n')
  })
  .run({
    async: true
  })

通過 benchmark.js 跑出的基準測試數據,可以很容易地看出哪種方式的效率更高:

回顧之前 Queue 類的實現,由於只有一個數組來存儲任務,直接使用 reverse + pop 的方式,必然會影響任務執行的次序。

這裏就需要引入雙數組的設計,一個數組負責入隊操作,一個數組負責出隊操作。

  class HighPerformanceQueue {
    constructor() {
      this.q1 = []; // 用於 push 數據
      this.q2 = []; // 用於 shift 數據
    }

    push(value) {
      return this.q1.push(value);
    }

    shift() {
      let q2 = this.q2;
      if (q2.length === 0) {
        const q1 = this.q1;
        if (q1.length === 0) {
          return;
        }
        this.q1 = q2; // 感謝 @shaonialife 同學指正
        q2 = this.q2 = q1.reverse();
      }

      return q2.pop();
    }

    isEmpty() {
      if (this.q1.length === 0 && this.q2.length === 0) {
        return true;
      }
      return false;
    }
  }

最後通過基準測試來驗證優化的效果:

寫在最後

最後,「如果本文對您有幫助,歡迎關注(公眾號【漫談大前端】)、點贊、轉發 ε=ε=ε=┏(゜ロ゜;)┛。」

作者:descire

鏈接:https://juejin.cn/post/6912220538286899207

來源:掘金

站長推薦

1.雲服務推薦: 國內主流雲服務商,各類雲產品的最新活動,優惠券領取。地址:阿里雲騰訊雲華為雲

2.廣告聯盟: 整理了目前主流的廣告聯盟平台,如果你有流量,可以作為參考選擇適合你的平台點擊進入

鏈接: http://www.fly63.com/article/detial/9993