How to properly do max parallelism in TypeScript.

This is the solution to a question I usually ask on interviews. I decided to write a solution since I usually don't like the answers candidates give me.
The Problem
Let's say you have an infinite list of urls. And let's say you could only do a maximum amount of requests per unit of time. Let's call that maximum number n.
How would you implement a max parallelism function in different languages?
The Naïve Solution
The solution I always get is: "Couldn't you just do Promise.all(...) with n values on the list and that'd be that?"
async function maxParallelism(n: number, urls: string[]) {
const responses = []
for(let i = 0; i < urls.length; i += n) {
const promises = []
for (let j = i; i < n; j++) {
promises.push(fetch(urls[j]))
}
const result = await Promise.all(promises)
result.forEach(e => responses.push(e))
}
return responses
}
Where I always point out that that would only ensure that you're making n requests on t0. Requests might take different times
and you'll always be waiting for the slowest request, thus having tons of cpu idle time.
So, how can we improve this? Could we have some kind of check where I check how many requests are being done at any moment and launch a new one if none is being done?
An Improvement, Still Not Perfect
Let's do that. Let's have an interval where we check every 100ms if we're doing the right amount of requests.
async function maxParallelism(n: number, urls: string[]) {
let counter = 0
let i = 0
const responses = []
let working = false
const makeRequest = () => {
counter++
responses.push(await fetch(urls[i++]))
counter--
}
return new Promise((resolve, reject) => {
const interval = setInterval(() => {
if (counter < n) makeRequest()
// doing responses.length instead of i to fully wait for every response to arrive
if (responses.length >= urls.length) {
clearInterval(interval)
resolve(responses)
}
}, 100)
})
}
Let's break down the code step by step:
const makeRequest = () => {
counter++
responses.push(await fetch(urls[i++]))
counter--
}
The makeRequest function actually does the request, and increments the counter. The counter counts how many requests in parallel are being done. After that, we have the code inside the interval:
if (counter < n) makeRequest()
// doing responses.length instead of i to fully wait for every response to arrive
if (responses.length >= urls.length) {
clearInterval(interval)
resolve(responses)
}
This code is being run every 100ms. If counter is less than n, meaning if we have less requests in parallel that we aim to, make another request.
Also, as I point out on the comment, I'm waiting until I get 100% of the responses to assert that we're fully done. If I used i, I could have left the last couple
of requests out (for a couple of cycles, they'd eventually be present on the list), but could lead to race conditions.
This is still not perfect for many reasons. First, I'm running the check every 100ms and, even it's cheap, it's redudnant in mane cases, and if I make it bigger, I have more idle time.
Couldn't I just "await until I'm free"? How would I do that?
Well, I implemented just that.
The Solution
Here's the solution I came up with.
async function maxParallelism(n: number, urls: string[]): Promise<void> {
let counter = 0
const responses = []
let res: (() => void) | undefined;
for(let i = 0; i < functions.length; i++) {
counter++;
fetch(urls[i]).then(response => {
counter--;
responses.push(response)
if (res != null) {
res()
res = undefined
}
})
await new Promise<void>((resolve, reject) => {
if (counter < n) return resolve()
res = resolve
})
}
return responses
}
This is, in my opinion, the best implementation for this. Let's break it down:
First, we have a for loop that iterates over all the urls. Just before calling a request, we increment the counter once. Again, here the counter indicates the amount of
requests that are being done in parallel.
for(let i = 0; i < functions.length; i++) {
counter++;
fetch(urls[i]).then(() => {
// ...
}
The first line within the then block I decrement the counter since I already got the response. The next is the most interesting block, but we need to understand the last Promise first.
await new Promise<void>((resolve, reject) => {
if (counter < n) return resolve()
res = resolve
})
What's going on here?
If the counter is less than n, meaning I have less requests in parallel that I should, I just return immediatly. However, if that's not the case I'm doing something really interesting:
I'm setting the resolve function to the res variable. What that would do, is it would leave the Promise hanging until I call res(). And where am I calling res()? Within the fetch!
fetch(urls[i]).then(response => {
counter--;
responses.push(response)
if (res != null) {
res()
res = undefined
}
})
Here I'm decrementing the counter, meaning if n == counter before, that's no longer the case so I can safely call res(). When I call res(), the previous Promise that was waiting
will resolve, increment the i on the for loop and make another request without waiting anything.
Here's a more generic implementation, not for urls, but for just async functions that you might want to send.
async function maxParallelism(n: number, functions: (() => Promise<void>)[]): Promise<void> {
let counter = 0
let res: (() => void) | undefined;
for(let i = 0; i < functions.length; i++) {
counter++;
functions[i]().then(() => {
counter--;
if (res != null) {
res()
res = undefined
}
})
await new Promise<void>((resolve, reject) => {
if (counter < n) return resolve()
res = resolve
})
}
}
(async () => {
// function that waits some random time between 0 and 1 second.
const wait = (id: number) => new Promise<void>((resolve, reject) => setTimeout(() => {
console.log("finished id ", id)
resolve()
}, Math.floor(Math.random() * 1_000)))
// building a list of 100 functions
const functions: (() => Promise<void>)[] = Array.from(Array(100).keys()).map(e => () => wait(e))
// calling the function with n = 3 (at most 3 requests at a time)
await maxParallelism(3, functions)
})()
If you want to test that this actually do 3 requests at a time, you can add an interval or something to see if it writes something other than 3:
async function maxParallelism(n: number, functions: (() => Promise<void>)[]): Promise<void> {
let counter = 0
let res: (() => void) | undefined;
const interval = setInterval(() => {
console.log("counter", counter)
}, 10)
for(let i = 0; i < functions.length; i++) {
counter++;
functions[i]().then(() => {
counter--;
if (res != null) {
res()
res = undefined
}
})
await new Promise<void>((resolve, reject) => {
if (counter < n) return resolve()
res = resolve
})
}
clearInterval(interval)
}
(async () => {
// function that waits some random time between 0 and 1 second.
const wait = (id: number) => new Promise<void>((resolve, reject) => setTimeout(() => {
console.log("finished id ", id)
resolve()
}, Math.floor(Math.random() * 1_000)))
// building a list of 100 functions
const functions: (() => Promise<void>)[] = Array.from(Array(100).keys()).map(e => () => wait(e))
// calling the function with n = 3 (at most 3 requests at a time)
await maxParallelism(3, functions)
})()
I hope you liked this.