Workers - pipe a sequential series of Streams to Response without blocking

Is there a way to sequentially fetch several different resources, and pipe the output of each one, one after the other, to the response, while still being able to return the response object basically immediately?

Say I have 3 different resources, ideally I want to be able to fetch the first one, pipe its body to the Response, then when it has completed, fetch the next one, pipe its data next, and so on, all without blocking the call to return Response(readable, options)

I have been experimenting and this is where I am at so far

addEventListener("fetch", event => {
	event.respondWith(fetchAndStream(event))
})


async function fetchAndStream(event) {
	const request = event.request
	const urls = ['url1', 'url2', 'url3']
	let current = 0

	const { readable, writable } = new TransformStream()
	const writer = writable.getWriter()

	async function getURL() {
		const urlRequest = new Request(urls[current])
		const response = await fetch(urlRequest)
		const bodyReader = response.body.getReader()
		while (true) {
			const { value, done } = await bodyReader.read()
			if (done) break
			writer.write(value)
		}
		current++
	}

	const readPromise = new Promise(async (resolve, reject) => {
		while(current < urls.length) {
			await getURL()
		}
		writer.close()
		resolve(1)
	})

	event.waitUntil(readPromise)

	return new Response(readable)
}

It kind of works, but it appears that its nowhere near as efficient as using a TransformStream and pipe() to just pass the body of the fetch to the response. The operation is timing out and going well over the 50ms limit, and the download speed on the client side is orders of magnitude slower.

Any more efficient way I could pipe multiple Streams to the response?

@mrbbot to the rescue. It turns out there is a preventClose option in pipeTo()

const { readable, writable } = new TransformStream()

const readPromise = new Promise(async (resolve, reject) => {
	let currentURL = 0
	while(currentURL < numURLS) {
		const url = `${baseURL}${urls[currentURL]}`
		const request = new Request(url)
		const response = await fetch(request)
		await response.body.pipeTo(writable, { preventClose: true })
		currentURL++
	}
	await writable.close()
	resolve(0)
})

return new Response(readable, {...})

edit: turns out you dont even need event.waitUntil() - since its returning a Readable Stream to the response, it will keep it open until its closed

Does this mean we can mark your last response here as the Solution?

Not quite - there is still one dangling issue. Regardless of how I order and await/dont await these last 2 bits

await writable.close()
resolve(0)

In the Workers Logs, after the transfer completes successfully (eg I get the complete file), the Log shows this error

"message": "The script will never generate a response.",

So yea I am not sure how to correctly close the stream so it exits cleanly

The hardcore Workers folk (I’m looking at you @walshy), and some staff hang out at:

1 Like

Hi @matthewjumpsoffbuild,

This should work :slightly_smiling_face:

// See working code below

EDIT: whoops, seems this also has the “never generate a response” issue :sweat_smile:

1 Like

Alright, this doesn’t cause The script will never generate a response!

addEventListener("fetch", event => {
    event.respondWith(handleRequest(event.request))
})

async function concatenate(writable, parts) {
    for (let [index, part] of parts.entries()) {
        const response = await fetch(part)
        if (index == parts.length - 1) {
            await response.body.pipeTo(writable)
        } else {
            await response.body.pipeTo(writable, {preventClose: true})
        }
    }
}

async function handleRequest(request) {
    const { readable, writable } = new TransformStream()
    const parts = [
        "https://icanhazip.com/",
        "https://icanhazip.com/",
        "https://icanhazip.com/",
        "https://icanhazip.com/",
        "https://icanhazip.com/",
    ]
    concatenate(writable, parts)
    return new Response(readable)
}
2 Likes

Oh! I get it now - in order to get it to properly terminate, the last call to pipeTo() should not have preventClose set to true!

I did it slightly differently, but that is indeed the solution, thanks

await response.body.pipeTo(writable, {
  preventClose: current < total-1
})
1 Like

Ahh, yes, that’s a better solution. Didn’t think of that one!

1 Like

This topic was automatically closed 3 days after the last reply. New replies are no longer allowed.