AsyncQueue.js 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375
  1. /*
  2. MIT License http://www.opensource.org/licenses/mit-license.php
  3. Author Tobias Koppers @sokra
  4. */
  5. "use strict";
  6. const { SyncHook, AsyncSeriesHook } = require("tapable");
  7. const { makeWebpackError } = require("../HookWebpackError");
  8. const WebpackError = require("../WebpackError");
  9. const ArrayQueue = require("./ArrayQueue");
  10. const QUEUED_STATE = 0;
  11. const PROCESSING_STATE = 1;
  12. const DONE_STATE = 2;
  13. let inHandleResult = 0;
  14. /**
  15. * @template T
  16. * @callback Callback
  17. * @param {(WebpackError | null)=} err
  18. * @param {T=} result
  19. */
  20. /**
  21. * @template T
  22. * @template K
  23. * @template R
  24. */
  25. class AsyncQueueEntry {
  26. /**
  27. * @param {T} item the item
  28. * @param {Callback<R>} callback the callback
  29. */
  30. constructor(item, callback) {
  31. this.item = item;
  32. /** @type {typeof QUEUED_STATE | typeof PROCESSING_STATE | typeof DONE_STATE} */
  33. this.state = QUEUED_STATE;
  34. this.callback = callback;
  35. /** @type {Callback<R>[] | undefined} */
  36. this.callbacks = undefined;
  37. this.result = undefined;
  38. /** @type {WebpackError | undefined} */
  39. this.error = undefined;
  40. }
  41. }
  42. /**
  43. * @template T
  44. * @template K
  45. * @template R
  46. */
  47. class AsyncQueue {
  48. /**
  49. * @param {object} options options object
  50. * @param {string=} options.name name of the queue
  51. * @param {number=} options.parallelism how many items should be processed at once
  52. * @param {AsyncQueue<any, any, any>=} options.parent parent queue, which will have priority over this queue and with shared parallelism
  53. * @param {function(T): K=} options.getKey extract key from item
  54. * @param {function(T, Callback<R>): void} options.processor async function to process items
  55. */
  56. constructor({ name, parallelism, parent, processor, getKey }) {
  57. this._name = name;
  58. this._parallelism = parallelism || 1;
  59. this._processor = processor;
  60. this._getKey =
  61. getKey || /** @type {(T) => K} */ (item => /** @type {any} */ (item));
  62. /** @type {Map<K, AsyncQueueEntry<T, K, R>>} */
  63. this._entries = new Map();
  64. /** @type {ArrayQueue<AsyncQueueEntry<T, K, R>>} */
  65. this._queued = new ArrayQueue();
  66. /** @type {AsyncQueue<any, any, any>[] | undefined} */
  67. this._children = undefined;
  68. this._activeTasks = 0;
  69. this._willEnsureProcessing = false;
  70. this._needProcessing = false;
  71. this._stopped = false;
  72. this._root = parent ? parent._root : this;
  73. if (parent) {
  74. if (this._root._children === undefined) {
  75. this._root._children = [this];
  76. } else {
  77. this._root._children.push(this);
  78. }
  79. }
  80. this.hooks = {
  81. /** @type {AsyncSeriesHook<[T]>} */
  82. beforeAdd: new AsyncSeriesHook(["item"]),
  83. /** @type {SyncHook<[T]>} */
  84. added: new SyncHook(["item"]),
  85. /** @type {AsyncSeriesHook<[T]>} */
  86. beforeStart: new AsyncSeriesHook(["item"]),
  87. /** @type {SyncHook<[T]>} */
  88. started: new SyncHook(["item"]),
  89. /** @type {SyncHook<[T, Error, R]>} */
  90. result: new SyncHook(["item", "error", "result"])
  91. };
  92. this._ensureProcessing = this._ensureProcessing.bind(this);
  93. }
  94. /**
  95. * @param {T} item an item
  96. * @param {Callback<R>} callback callback function
  97. * @returns {void}
  98. */
  99. add(item, callback) {
  100. if (this._stopped) return callback(new WebpackError("Queue was stopped"));
  101. this.hooks.beforeAdd.callAsync(item, err => {
  102. if (err) {
  103. callback(
  104. makeWebpackError(err, `AsyncQueue(${this._name}).hooks.beforeAdd`)
  105. );
  106. return;
  107. }
  108. const key = this._getKey(item);
  109. const entry = this._entries.get(key);
  110. if (entry !== undefined) {
  111. if (entry.state === DONE_STATE) {
  112. if (inHandleResult++ > 3) {
  113. process.nextTick(() => callback(entry.error, entry.result));
  114. } else {
  115. callback(entry.error, entry.result);
  116. }
  117. inHandleResult--;
  118. } else if (entry.callbacks === undefined) {
  119. entry.callbacks = [callback];
  120. } else {
  121. entry.callbacks.push(callback);
  122. }
  123. return;
  124. }
  125. const newEntry = new AsyncQueueEntry(item, callback);
  126. if (this._stopped) {
  127. this.hooks.added.call(item);
  128. this._root._activeTasks++;
  129. process.nextTick(() =>
  130. this._handleResult(newEntry, new WebpackError("Queue was stopped"))
  131. );
  132. } else {
  133. this._entries.set(key, newEntry);
  134. this._queued.enqueue(newEntry);
  135. const root = this._root;
  136. root._needProcessing = true;
  137. if (root._willEnsureProcessing === false) {
  138. root._willEnsureProcessing = true;
  139. setImmediate(root._ensureProcessing);
  140. }
  141. this.hooks.added.call(item);
  142. }
  143. });
  144. }
  145. /**
  146. * @param {T} item an item
  147. * @returns {void}
  148. */
  149. invalidate(item) {
  150. const key = this._getKey(item);
  151. const entry =
  152. /** @type {AsyncQueueEntry<T, K, R>} */
  153. (this._entries.get(key));
  154. this._entries.delete(key);
  155. if (entry.state === QUEUED_STATE) {
  156. this._queued.delete(entry);
  157. }
  158. }
  159. /**
  160. * Waits for an already started item
  161. * @param {T} item an item
  162. * @param {Callback<R>} callback callback function
  163. * @returns {void}
  164. */
  165. waitFor(item, callback) {
  166. const key = this._getKey(item);
  167. const entry = this._entries.get(key);
  168. if (entry === undefined) {
  169. return callback(
  170. new WebpackError(
  171. "waitFor can only be called for an already started item"
  172. )
  173. );
  174. }
  175. if (entry.state === DONE_STATE) {
  176. process.nextTick(() => callback(entry.error, entry.result));
  177. } else if (entry.callbacks === undefined) {
  178. entry.callbacks = [callback];
  179. } else {
  180. entry.callbacks.push(callback);
  181. }
  182. }
  183. /**
  184. * @returns {void}
  185. */
  186. stop() {
  187. this._stopped = true;
  188. const queue = this._queued;
  189. this._queued = new ArrayQueue();
  190. const root = this._root;
  191. for (const entry of queue) {
  192. this._entries.delete(this._getKey(entry.item));
  193. root._activeTasks++;
  194. this._handleResult(entry, new WebpackError("Queue was stopped"));
  195. }
  196. }
  197. /**
  198. * @returns {void}
  199. */
  200. increaseParallelism() {
  201. const root = this._root;
  202. root._parallelism++;
  203. /* istanbul ignore next */
  204. if (root._willEnsureProcessing === false && root._needProcessing) {
  205. root._willEnsureProcessing = true;
  206. setImmediate(root._ensureProcessing);
  207. }
  208. }
  209. /**
  210. * @returns {void}
  211. */
  212. decreaseParallelism() {
  213. const root = this._root;
  214. root._parallelism--;
  215. }
  216. /**
  217. * @param {T} item an item
  218. * @returns {boolean} true, if the item is currently being processed
  219. */
  220. isProcessing(item) {
  221. const key = this._getKey(item);
  222. const entry = this._entries.get(key);
  223. return entry !== undefined && entry.state === PROCESSING_STATE;
  224. }
  225. /**
  226. * @param {T} item an item
  227. * @returns {boolean} true, if the item is currently queued
  228. */
  229. isQueued(item) {
  230. const key = this._getKey(item);
  231. const entry = this._entries.get(key);
  232. return entry !== undefined && entry.state === QUEUED_STATE;
  233. }
  234. /**
  235. * @param {T} item an item
  236. * @returns {boolean} true, if the item is currently queued
  237. */
  238. isDone(item) {
  239. const key = this._getKey(item);
  240. const entry = this._entries.get(key);
  241. return entry !== undefined && entry.state === DONE_STATE;
  242. }
  243. /**
  244. * @returns {void}
  245. */
  246. _ensureProcessing() {
  247. while (this._activeTasks < this._parallelism) {
  248. const entry = this._queued.dequeue();
  249. if (entry === undefined) break;
  250. this._activeTasks++;
  251. entry.state = PROCESSING_STATE;
  252. this._startProcessing(entry);
  253. }
  254. this._willEnsureProcessing = false;
  255. if (this._queued.length > 0) return;
  256. if (this._children !== undefined) {
  257. for (const child of this._children) {
  258. while (this._activeTasks < this._parallelism) {
  259. const entry = child._queued.dequeue();
  260. if (entry === undefined) break;
  261. this._activeTasks++;
  262. entry.state = PROCESSING_STATE;
  263. child._startProcessing(entry);
  264. }
  265. if (child._queued.length > 0) return;
  266. }
  267. }
  268. if (!this._willEnsureProcessing) this._needProcessing = false;
  269. }
  270. /**
  271. * @param {AsyncQueueEntry<T, K, R>} entry the entry
  272. * @returns {void}
  273. */
  274. _startProcessing(entry) {
  275. this.hooks.beforeStart.callAsync(entry.item, err => {
  276. if (err) {
  277. this._handleResult(
  278. entry,
  279. makeWebpackError(err, `AsyncQueue(${this._name}).hooks.beforeStart`)
  280. );
  281. return;
  282. }
  283. let inCallback = false;
  284. try {
  285. this._processor(entry.item, (e, r) => {
  286. inCallback = true;
  287. this._handleResult(entry, e, r);
  288. });
  289. } catch (err) {
  290. if (inCallback) throw err;
  291. this._handleResult(entry, err, null);
  292. }
  293. this.hooks.started.call(entry.item);
  294. });
  295. }
  296. /**
  297. * @param {AsyncQueueEntry<T, K, R>} entry the entry
  298. * @param {WebpackError=} err error, if any
  299. * @param {R=} result result, if any
  300. * @returns {void}
  301. */
  302. _handleResult(entry, err, result) {
  303. this.hooks.result.callAsync(entry.item, err, result, hookError => {
  304. const error = hookError
  305. ? makeWebpackError(hookError, `AsyncQueue(${this._name}).hooks.result`)
  306. : err;
  307. const callback = entry.callback;
  308. const callbacks = entry.callbacks;
  309. entry.state = DONE_STATE;
  310. entry.callback = undefined;
  311. entry.callbacks = undefined;
  312. entry.result = result;
  313. entry.error = error;
  314. const root = this._root;
  315. root._activeTasks--;
  316. if (root._willEnsureProcessing === false && root._needProcessing) {
  317. root._willEnsureProcessing = true;
  318. setImmediate(root._ensureProcessing);
  319. }
  320. if (inHandleResult++ > 3) {
  321. process.nextTick(() => {
  322. callback(error, result);
  323. if (callbacks !== undefined) {
  324. for (const callback of callbacks) {
  325. callback(error, result);
  326. }
  327. }
  328. });
  329. } else {
  330. callback(error, result);
  331. if (callbacks !== undefined) {
  332. for (const callback of callbacks) {
  333. callback(error, result);
  334. }
  335. }
  336. }
  337. inHandleResult--;
  338. });
  339. }
  340. clear() {
  341. this._entries.clear();
  342. this._queued.clear();
  343. this._activeTasks = 0;
  344. this._willEnsureProcessing = false;
  345. this._needProcessing = false;
  346. this._stopped = false;
  347. }
  348. }
  349. module.exports = AsyncQueue;