processAsyncTree.js 1.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768
  1. /*
  2. MIT License http://www.opensource.org/licenses/mit-license.php
  3. Author Tobias Koppers @sokra
  4. */
  5. "use strict";
  6. /**
  7. * @template T
  8. * @template {Error} E
  9. * @param {Iterable<T>} items initial items
  10. * @param {number} concurrency number of items running in parallel
  11. * @param {function(T, function(T): void, function(E=): void): void} processor worker which pushes more items
  12. * @param {function(E=): void} callback all items processed
  13. * @returns {void}
  14. */
  15. const processAsyncTree = (items, concurrency, processor, callback) => {
  16. const queue = Array.from(items);
  17. if (queue.length === 0) return callback();
  18. let processing = 0;
  19. let finished = false;
  20. let processScheduled = true;
  21. /**
  22. * @param {T} item item
  23. */
  24. const push = item => {
  25. queue.push(item);
  26. if (!processScheduled && processing < concurrency) {
  27. processScheduled = true;
  28. process.nextTick(processQueue);
  29. }
  30. };
  31. /**
  32. * @param {E | null | undefined} err error
  33. */
  34. const processorCallback = err => {
  35. processing--;
  36. if (err && !finished) {
  37. finished = true;
  38. callback(err);
  39. return;
  40. }
  41. if (!processScheduled) {
  42. processScheduled = true;
  43. process.nextTick(processQueue);
  44. }
  45. };
  46. const processQueue = () => {
  47. if (finished) return;
  48. while (processing < concurrency && queue.length > 0) {
  49. processing++;
  50. const item = /** @type {T} */ (queue.pop());
  51. processor(item, push, processorCallback);
  52. }
  53. processScheduled = false;
  54. if (queue.length === 0 && processing === 0 && !finished) {
  55. finished = true;
  56. callback();
  57. }
  58. };
  59. processQueue();
  60. };
  61. module.exports = processAsyncTree;