Support for Transformers in TransformStreams


#1

We’re trying to use the pipe the result from a fetch using the pipeTo functionality with a tranformStream. In order to know how many bytes were sent to the client we tried adding a transformer to the transformStream, but it doesn’t seem to get triggered. Is this not yet supported or am I doing it the wrong way? It works fine locally using the @mattiasbuelens/web-streams-polyfill shim.

This is more or less what we’re currently testing with:

    const { readable, writable } = new TransformStream({
        transform: (chunk, controller) => {
            bytesSent += chunk.length;
            controller.enqueue(chunk);
          }
    });
    response.body.pipeTo(writable);

#2

Hi @markus, transformers aren’t yet supported – the only way to use a TransformStream in Cloudflare Workers is as an identity TransformStream (no arguments to the constructor).

It should still be possible to do what you want to do explicitly. Something like this for example (disclaimer: untested code):

async function handle(request) {
  let response = await fetch(request)
  let { readable, writable } = new TransformStream()

  // Stream the response, count the number of bytes, and send that number
  // to our hypothetical logging service.
  let bodyPromise = countBytes(response.body, writable)
      .then(bytesSent => fetch(myLoggingServiceUrl, {
    method: "POST",
    body: `{ "url": "${event.request.url}", "bytesSent": ${bytesSent} }`,
    headers: { "Content-Type": "application/json", "Authorization": "..." }
  }))

  event.waitUntil(bodyPromise)

  return new Response(readable, response)
}

async function countBytes(readable, writable) {
  let reader = readable.getReader()
  let writer = writable.getWriter()
  let bytesSent = 0

  for (;;) {
    let { value, done } = await reader.read()
    if (done) break
    bytesSent += value.byteLength
    await writer.write(value)
  }

  await writer.close()

  return bytesSent
}

Harris


#3

Hi @harris and thanks for the quick reply!

We’re doing something similar today which works, but the transformStream looked like a more slick solution.

I got two follow up questions (hope that’s ok…):

  • Do we need to add a await writer.ready before the writer.write to make the backpressure works? When running locally it seems to help reduce the memory usage, but not sure if there’s a difference to the cloudflare environment?
  • Is there a way to tell if the client aborts the request? In node environments we can hook the request.on(‘close’), but I can’t find any similar functionality with the Web streams. I was hoping that writer.write would throw if the client disconnected, but it doesn’t seem to be the case?

Thanks for the help!


#4

Hi @markus, sorry for my slow reply.

No. We actually don’t even implement writer.ready currently. If there’s backpressure, the writer.write() promise will not resolve until the backpressure subsides.

Yes, it should throw (or more pedantically, its promise should be rejected). Normally if the client disconnects, all handlers/tasks associated with that client’s request are canceled, but if the worker script happens to pass a promise to event.waitUntil(), the runtime will wait upto 30 seconds for that promise to settle before canceling the request’s tasks. With that grace period, it’s possible to catch the exception from writer.write() and record it.

Here’s an example, based on the script I shared above:

addEventListener("fetch", event => {
  event.respondWith(handle(event)
      // Debugging aid.
      .catch(e => new Response(e.stack)))
})

const myLoggingService = "https://my-requestbin"

async function handle(event) {
  let { request } = event

  let response = await fetch(request)
  if (response.body === null) {
    // Probably a 304.
    return response
  }

  let { readable, writable } = new TransformStream()

  // Stream the response, count the number of bytes, and send that number
  // to our hypothetical logging service.
  let bodyPromise = countBytes(response.body, writable)
      .then(bytesSent => {
    return {
      url: event.request.url,
      bytesSent: bytesSent,
    }
  }, error => {
    return {
      url: event.request.url,
      bytesSent: error.bytesSent,
      error: error.stack,
    }
  }).then(payload => {
    return fetch(myLoggingService, {
      method: "POST",
      body: JSON.stringify(payload),
      headers: { "Content-Type": "application/json" }
    })
  })

  // We must use event.waitUntil() here so that the runtime
  // waits for this promise (up to 30 seconds) even after the
  // client disconnects. This allows us to detect the disconnection
  // and log it.
  event.waitUntil(bodyPromise)

  return new Response(readable, response)
}

function sleep(ms) {
  return new Promise(r => setTimeout(r, ms))
}

async function countBytes(readable, writable) {
  let reader = readable.getReader()
  let writer = writable.getWriter()
  let bytesSent = 0

  try {
    for (;;) {
      let { value, done } = await reader.read()
      if (done) break
      bytesSent += value.byteLength

      // Wait a little so that I have time to CTRL-C my curl process.
      await sleep(1000)
      await writer.write(value)
    }

    await sleep(1000)
    await writer.close()
  } catch (error) {
    error.bytesSent = bytesSent
    throw error
  }

  return bytesSent
}

If I deploy that worker on a route, then download a large-ish file, cancel the download partway through, then I see messages like the following show up in my logging service (I just used a requestbin running on my origin for testing purposes):

{"url":"https://my-domain/a-large-ish-file","bytesSent":44663,"error":"Error: Network connection lost."}

One caveat is that manually pumping bytes like this in JavaScript is relatively expensive CPU-wise. If you have to transfer files greater than about 5MB, you’d want to use ReadableStream.pipeTo(), but then you lose the ability to count the number of bytes.

Harris


#5

Thanks for the reply @harris!

We did try something very similar to this, but get the feeling that the promise hangs on writer.write as there is backpressure and hence never rejects. Could that be the case and is there in that case any way around it? We do see a few network errors in our logs but it’s only in maybe 5% of the requests.

The typical case we have is that the browser requests a media file that is immediately canceled as soon as it detects that the server accepts range requests, so it can make request for a range instead. Those aborted requests doesn’t seem to cause any network error rejections.

If it would it help I can create a small snippet to reproduce the issue?


#6

Yes, I’d appreciate that a lot. I’m curious what’s going on.


#7

Hi again @harris

It could very well be that i’m doing something stupid, but the sample below doesn’t seem to log anything when the request is canceled. Would be great if you had time to have a look at it.

The link to the media file will expire, but you can fetch a new link here: http://traffic.libsyn.com/alexosigge/aosavsnitt354.mp3

addEventListener("fetch", event => {
    event.respondWith(handle(event)
        // Debugging aid.
        .catch(e => new Response(e.stack)))
})

const myLoggingService = 'add-a-requestbin-here...'
const bigFile = 'http://hwcdn.libsyn.com/p/1/e/7/1e788e3cc3c0b8b3/aosavsnitt354.mp3?c_id=36281804&cs_id=36281804&expiration=1552572419&hwt=35581008f9ccd9c5c842e58677e47e32';

async function handle(event) {
    const headersObj = [...event.request.headers].reduce((obj, item) => {
        const header = {};        
        header[item[0]] = item[1];
        return Object.assign({}, obj, header);
      }, {});

    await log({ start: new Date(), headers: headersObj });
    let response = await fetch(bigFile, { headers: headersObj })
   
    let { readable, writable } = new TransformStream()

    event.waitUntil(
        stream(response.body, writable)
            .then(async (bytesSent) => {
                await log({ done: new Date(), bytesSent });
            })
            .catch(async (err) => {
                await log({ failed: new Date(), message: err.message });
            }));

    return new Response(readable, response)
}

async function log(payload) {
    return fetch(myLoggingService, {
        method: "POST",
        body: JSON.stringify(payload),
        headers: { "Content-Type": "application/json" }
    });
}

async function stream(readable, writable) {
    const reader = readable.getReader();
    const writer = writable.getWriter();
    let bytesSent = 0;

    try {
        while (true) {
            let { value, done } = await reader.read();
            if (done) {
                break;
            }

            bytesSent += value.byteLength;
            await writer.write(value);
        }
        await writer.close();
        return bytesSent;
    } catch (error) {
        error.bytesSent = bytesSent;
        throw error;
    }
}

#8

Hi @markus,

It looks like the test file is 44MB, which will definitely cause the read/write loop to exhaust the worker’s CPU time limit. Recall the caveat at the end of my previous response: the manual read/write pump here is expensive CPU-wise and really only works reliably for responses up to around 5MB. Once the CPU time limit is exceeded, the runtime terminates that request, and the script has no opportunity to log the failure.

To recognize this, notice that when you download the file through the worker without canceling the download, the file ends up truncated. Also, you might see a message “script exceeded time limit” in the devtools console when previewing this particular worker. I only saw it myself after trying a couple times, though.

To avoid exceeding the CPU limit, you’ll need to use ReadableStream.pipeTo(). Unfortunately, that doesn’t provide any way count the bytes.

Harris