Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reset for TransformStream (and WritableStream?) #1026

Open
ricea opened this issue Jan 30, 2020 · 10 comments
Open

Reset for TransformStream (and WritableStream?) #1026

ricea opened this issue Jan 30, 2020 · 10 comments
Labels
addition/proposal New features or enhancements

Comments

@ricea
Copy link
Collaborator

ricea commented Jan 30, 2020

On seek, WebCodecs would like to be able to throw away all existing data in the decoder so that they don't display any more frames from before the seek point.

It's easy enough to reset the decoder itself, but there still may be data queued in the writable side of the transform which we need to get rid of. There doesn't seem to be a clean way to do this at the moment.

I tentatively propose a reset() method for TransformStream which throws away any queued chunks. This might delegate to abstract operations on ReadableStream and WritableStream which do the actual queue reset.

It would probably be useful to have an optional reset() method on the underlying transformer. This would have unusual semantics that it could be called while a transform is in progress.

reset() might also be useful for WritableStream although I don't have a specific use case. It might even be useful for ReadableStream but I'm even further from having a use case for that.

@sandersdan
Copy link

I wanted to add that, like with flush (#960), reset is used as a barrier in WebCodecs and it is useful to notify on the output side when the reset has completed. It can be useful to attach metadata to a reset, but we can also queue a followup (passthrough) message for both of these purposes.

@MattiasBuelens
Copy link
Collaborator

Until now, what we've been doing is to abort the pipe and then re-create it with a new source:

const controller = new AbortController();
const sink = new WritableStream({ ... });
let currentPipe = Promise.resolve();

async function setSource(source) {
  controller.abort();
  controller = new AbortController();
  await currentPipe; // wait for the sink to become unlocked
  currentPipe = source
    .pipeThrough(transform())
    .pipeTo(sink, { signal: controller.signal, preventAbort: true })
    .catch(() => {});
}

This will properly discard any queued chunks. However, we need to re-create all transform streams between the source and sink, which is quite annoying.

A reset() method could help to avoid re-creating those transform streams. However, as you already pointed out, the semantics can get tricky.


This would have unusual semantics that it could be called while a transform is in progress.

If a transform() is already in progress, then we need to wait for it to complete before we can clear the queues. Otherwise, the pending transform() may asynchronously enqueue chunks after the reset() call, which were derived from chunks from before the reset() call.

Perhaps it could help to make pull(), transform() and write() take an AbortSignal, as I suggested previously in #1014? With that, the steps for TransformStream.reset() could look like:

  1. If a transformer.transform() is in progress, abort its AbortSignal.
  2. Wait for the in-progress transform() promise to complete.
  3. Call transformer.reset().

Still, all of this relies on the transformer actually implementing the reset() method. What if it doesn't? We have no way of knowing what sort of internal state the transformer is keeping on any chunks it has previously (partially) transformed. As such, we don't know whether the transformer will be ready to receive a completely new chunk (which isn't a continuation from the previous old chunk) in its next transform() call.

Does that mean that you can only reset a transform stream that actually implements reset()? But how would that work in a long pipe chain, where some transforms do implement it and others don't? You might have already reset a bunch of transform streams before you encounter one that doesn't support it. Now, half of the pipe has had its queue cleared and expecting new chunks, while the other half is still processing old chunks and expecting a continuation of those old chunks. This would seriously mess up the whole chain!

@sandersdan
Copy link

For WebCodecs, we do not need an abort signal. There is a lot of internal state, but chunks are not processed one-by-one. I would imagine that a Transformer that does not implement reset would instead flush, whatever that means.

@MattiasBuelens
Copy link
Collaborator

I would imagine that a Transformer that does not implement reset would instead flush, whatever that means.

With the current semantics, we call flush() when the writable end is closed. This "completes" the transform stream: after that call, we'll never call transform() or flush() again (we'll even clear those algorithms so that we can never call them again).

A developer can thus use flush() to clean up resources. For example, imagine a transform stream that offloads processing to a worker:

new TransformStream({
  start(controller) {
    this.worker = new Worker("my-worker.js");
    this.worker.onmessage = (event) => {
      controller.enqueue(event.data);
    };
  },
  transform(chunk) {
    this.worker.postMessage(chunk);
  },
  flush() {
    this.worker.terminate();
  }
});

This transform stream uses flush() to destroy the worker created in start(). This works, because transform() can never be called again after flush().

With the proposed semantics of reset() "falling back" to flush(), this pattern would no longer work.


Now, flush() might not be the right method to clean up such resources. flush() is only called when the writable end is closed, not when the writable end is aborted or the readable end is cancelled. In such cases, the transform stream does not call any method on its transformer. Perhaps we need a new transformer method to fill this gap?

@domenic
Copy link
Member

domenic commented Feb 1, 2020

Perhaps we need a new transformer method to fill this gap?

This seems to tie in to previous discussions about a "finally"; see e.g. #636.

@sandersdan
Copy link

With the current semantics, we call flush() when the writable end is closed.

I mean flush as in #960.

@MattiasBuelens
Copy link
Collaborator

MattiasBuelens commented Feb 4, 2020

I mean flush as in #960.

Ah, that does make more sense. 😅

I'm still a bit hesitant though. The way I understand the proposed sync() method: it's completely optional whether or not the transformer provides a sync() method, or whether the transform stream actually calls the transformer's sync() method at all. In the Zlib example, even if you never call sync(), you'll still get a valid compressed stream.

However, with reset(), those calls are not optional. The transform stream will call reset() when there's a "discontinuity" in the source (e.g. a video seek), and the transformer needs to be made aware of that. Otherwise, if the transformer does not implement reset(), the transformer will still think that the next chunk passed to transform() will be a "continuation" of the previous chunk (e.g. the video frames are part of the same GOP).

I'm also not sure whether "call sync() and clear the queues" is always a valid fall-back scenario in case no reset() method is provided. For example:

  • For a video decoder stream, sync() could mean: finish decoding any pending frames and enqueue them. (For example, you might have been holding onto the last few frames whose PTS > DTS, since perhaps a future frame will need to be presented out-of-order: DTS1 < DTS2 < PTS2 < PTS1.) Nothing about sync()'s current definition requires the video decoder to start a new GOP on the next transform() call. Never mind, bad example. If sync() is supposed to be optional, then it shouldn't end the GOP or flush any frames that might still have their order changed...
  • For a TextDecoderStream, sync() would be empty: while the decoder might still be holding onto one or more bytes from the end of the previous chunk, it should not yet try to decode them inside sync(). Instead, a TextDecoderStream would need to explicitly implement reset(), so that it can first "process end-of-stream" on its decoder to get rid of those buffered bytes (possibly enqueuing a replacement character), and then reset its decoder in preparation for the next transform() call.

...Or I might be understanding sync() incorrectly. I'll re-read it tomorrow. 😛

@ricea
Copy link
Collaborator Author

ricea commented Feb 4, 2020

One way forward would be if leaving reset() undefined on the transformer would just disable the reset functionality altogether. If a transformer really wanted a reset that just cleared the queues and nothing else, they could define an empty method.

This seems to be sufficiently difficult and controversial that we won't get to it soon.

@ricea ricea added the addition/proposal New features or enhancements label Feb 4, 2020
@MattiasBuelens
Copy link
Collaborator

One way forward would be if leaving reset() undefined on the transformer would just disable the reset functionality altogether.

It's not sufficient for one transform stream to support reset(). All transform streams in the pipe chain need to support it, otherwise we still have the problem from my previous comment where only a part of the chain gets reset.

That reminds me: pipeTo() would need to propagate resets from the readable end to the writable end, so we'd need to extend the reader and writer APIs as well. Sure, we could add writer.reset(), but how will that affect reader.read()? Not so trivial. 😅

This seems to be sufficiently difficult and controversial that we won't get to it soon.

Agreed, let's not rush this. 👍

For now, developers can abort and re-create their pipe chain if they need to "reset" it. It's not pretty, but it works.

@sandersdan
Copy link

We currently expect that users of WebCodecs will be writing new streams code. Given that, one simple solution that is acceptable to us would be for the default implementation of reset() to abort the stream.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
addition/proposal New features or enhancements
Development

No branches or pull requests

4 participants