index.js 1.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768
  1. import Queue from 'yocto-queue';
  2. export default function pLimit(concurrency) {
  3. if (!((Number.isInteger(concurrency) || concurrency === Number.POSITIVE_INFINITY) && concurrency > 0)) {
  4. throw new TypeError('Expected `concurrency` to be a number from 1 and up');
  5. }
  6. const queue = new Queue();
  7. let activeCount = 0;
  8. const next = () => {
  9. activeCount--;
  10. if (queue.size > 0) {
  11. queue.dequeue()();
  12. }
  13. };
  14. const run = async (fn, resolve, args) => {
  15. activeCount++;
  16. const result = (async () => fn(...args))();
  17. resolve(result);
  18. try {
  19. await result;
  20. } catch {}
  21. next();
  22. };
  23. const enqueue = (fn, resolve, args) => {
  24. queue.enqueue(run.bind(undefined, fn, resolve, args));
  25. (async () => {
  26. // This function needs to wait until the next microtask before comparing
  27. // `activeCount` to `concurrency`, because `activeCount` is updated asynchronously
  28. // when the run function is dequeued and called. The comparison in the if-statement
  29. // needs to happen asynchronously as well to get an up-to-date value for `activeCount`.
  30. await Promise.resolve();
  31. if (activeCount < concurrency && queue.size > 0) {
  32. queue.dequeue()();
  33. }
  34. })();
  35. };
  36. const generator = (fn, ...args) => new Promise(resolve => {
  37. enqueue(fn, resolve, args);
  38. });
  39. Object.defineProperties(generator, {
  40. activeCount: {
  41. get: () => activeCount,
  42. },
  43. pendingCount: {
  44. get: () => queue.size,
  45. },
  46. clearQueue: {
  47. value: () => {
  48. queue.clear();
  49. },
  50. },
  51. });
  52. return generator;
  53. }