Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Piping to writable streams with HWM 0? #1158

Open
MattiasBuelens opened this issue Aug 6, 2021 · 21 comments · May be fixed by #1190
Open

Piping to writable streams with HWM 0? #1158

MattiasBuelens opened this issue Aug 6, 2021 · 21 comments · May be fixed by #1190

Comments

@MattiasBuelens
Copy link
Collaborator

Right now, piping to a writable stream with { highWaterMark: 0 } stalls indefinitely:

const rs = new ReadableStream({
  start(c) {
    c.enqueue("a");
    c.enqueue("b");
    c.enqueue("c");
    c.close();
  }
});
const ws = new WritableStream({
  write(chunk) {
    console.log("wrote:", chunk);
  }
}, { highWaterMark: 0 });
rs.pipeTo(ws); // never resolves, and no messages are logged

This makes it impossible to pipe through a TransformStream without increasing the total queue size of a pipe chain by at least one chunk:

const upperCaseTransform = new TransformStream({
  transform(chunk, controller) {
    controller.enqueue(chunk.toUpperCase());
  }
}, { highWaterMark: 0 }, { highWaterMark: 0 });
rs.pipeThrough(upperCaseTransform).pipeTo(ws); // stalls indefinitely

const upperCaseTransform = new TransformStream({
  transform(chunk, controller) {
    controller.enqueue(chunk.toUpperCase());
  }
}, { highWaterMark: 1 }, { highWaterMark: 0 }); // same as default strategies
rs.pipeThrough(upperCaseTransform); // works, but already pulls the first chunk from `rs`

This is unfortunate, since there are many use cases for synchronous TransformStreams that shouldn't need buffering (i.e. every call to transform() immediately results in at least one enqueue()):

  • A generic mapTransform(fn), similar to array.map(fn):
    function mapTransform(fn) {
      return new TransformStream({
        transform(chunk, controller) {
          controller.enqueue(fn(chunk));
        }
      });
    }
    rs.pipeThrough(mapTransform(x => x.toUpperCase()));
  • TextEncoderStream and TextDecoderStream from Encoding.

Prior discussions on this topic noted that this is not possible. writer.desiredSize is always <= 0, so writer.ready is always pending:

  • Change default readableStrategy HWM to 0? #777:

    We can't reduce the HWM of the writableStrategy to 0 because it would have permanent backpressure preventing the pipe from working.

  • #1083:

    Yes. As you observed, a writable stream with a HWM of 0 will always have backpressure. So adding an identity TransformStream to a pipe can't be a complete no-op: it always increases the total queue size by 1.

But that got me thinking. A ReadableStream's source can be pull()ed as a result of reader.read(), even if controller.desiredSize <= 0. Maybe a WritableStream's sink should then also be able to release backpressure even if writer.desiredSize <= 0? 🤔

We could add a method on WritableStreamDefaultController (controller.pull()? controller.releaseBackpressure()? controller.notifyReady()?) that would have the result of immediately resolving the current writer.ready promise. Internally, we would do something like WritableStreamUpdateBackpressure(stream, false). My hope is that we can then use this inside TransformStreamSetBackpressure(), so that pulling from the readable end of a transform stream would also resolve ready on the writable end.

...Or am I missing something very obvious? 😛

@domenic
Copy link
Member

domenic commented Aug 9, 2021

Although adding such a mechanism might be useful, I want to make sure we've figured out the root issue here. In particular I worry that maybe we got the semantics of HWM wrong for WritableStream, if there's an expressive gap here.

But, I'm having a hard time articulating the expressive gap. Let's say we had a WritableStream with HWM = 0 that called controller.releaseBackpressure(). (When would it call it, exactly?) What is that WritableStream trying to express?

My best guess was something like "I currently have nothing in the queue, and I intend to consume whatever you give me ASAP, but please don't give me more than 1 chunk". But... that seems like the right semantics for HWM = 1. So I admit I'm confused.

@MattiasBuelens
Copy link
Collaborator Author

Although adding such a mechanism might be useful, I want to make sure we've figured out the root issue here. In particular I worry that maybe we got the semantics of HWM wrong for WritableStream, if there's an expressive gap here.

But, I'm having a hard time articulating the expressive gap. Let's say we had a WritableStream with HWM = 0 that called controller.releaseBackpressure(). (When would it call it, exactly?) What is that WritableStream trying to express?

I agree, it's tricky. I'm also still trying to wrap my head around it, it's not very concrete yet. 😛

I'm mainly coming at this from the TransformStream use case. Data written to the writable end will be queued and, when the readable end releases its backpressure, will be transformed and enqueued on the readable end. Similarly, read requests on the readable end should cause the writable end to also request some data if it doesn't have anything queued yet. But does that "write request" mean anything outside of the transform stream's use case?

This may also be necessary if/when we flesh out writable byte streams (#495), and (by extension) transform streams with readable/writable byte stream end(s). If we want "reverse BYOB" or "please use this buffer" semantics, then it would be nice if a BYOB request from the readable end could be forwarded to the writable end. But then the writable end is almost required to have HWM = 0, so it can wait for a read request to provide a buffer (instead of allocating one by itself). 🤔

...Or maybe "reverse BYOB" is too weird, and a "regular" BYOB writer is fine. I don't know yet. 😅

My best guess was something like "I currently have nothing in the queue, and I intend to consume whatever you give me ASAP, but please don't give me more than 1 chunk". But... that seems like the right semantics for HWM = 1. So I admit I'm confused.

With HWM = 1, the stream is asking to always put at least one chunk in its queue, and to try and hold back if it's not done processing that latest chunk yet. Here, most of the time writer.ready will be resolved. You write a bunch of chunks, writer.ready temporarily becomes pending while the sink works through the backlog and eventually writer.ready resolves again.

With HWM = 0, the stream is asking not to put anything its queue, except for the case where it wants a chunk. Here, most of the time writer.ready will be pending. When the sink eventually wants a chunk, writer.ready temporarily becomes resolved and waits for a chunk to be written. Once that happens, writer.ready becomes pending again.

They're similar, but with HWM = 1 the "default" state is for writer.ready to be resolved and the "transient" state is writer.ready being pending, whereas with HWM = 0 those states are reversed.

@ricea
Copy link
Collaborator

ricea commented Aug 12, 2021

The goal of avoiding the queue bloat from TransformStream is a good one, but the details of how to do it are tricky.

I think it may be sufficient to have a readyWhenNotWriting boolean as part of the strategy which changes the condition for writer.ready from queueSize < highWaterMark to queueSize < highWaterMark || underlyingSinkWriteNotInProgress.

@domenic
Copy link
Member

domenic commented Aug 12, 2021

With HWM = 1, ...

With HWM = 0, ...

Yeah, these make sense. But, could you phrase the transform stream case--which appears to be something in between---in this way? I.e. what is the writable end of such a no-queueing transform stream asking for? How does that different from what the the HWM = 1 case is asking for?

@jan-ivar
Copy link
Contributor

jan-ivar commented Sep 8, 2021

My best guess was something like "I currently have nothing in the queue, and I intend to consume whatever you give me ASAP, but please don't give me more than 1 chunk". But... that seems like the right semantics for HWM = 1. So I admit I'm confused.

"I have no queue, so I couldn't consume a chunk before, but I'm ready to consume one now (because I have a taker for it)".

(I know it technically has a queue, but the intent of HWM = 0 seems to be for it to remain empty ideally, so for all intents and purposes it's acting like it has no queue).

Similarly, read requests on the readable end should cause the writable end to also request some data if it doesn't have anything queued yet.

How about controller.requestData() ? controller.pullProducer() ?

This would be useful for realtime streams where chunks are expensive and prebuffering undesirable (pulling too soon may yield a less fresh realtime frame).

@domenic
Copy link
Member

domenic commented Sep 9, 2021

Sorry, still confused...

"I have no queue, so I couldn't consume a chunk before, but I'm ready to consume one now (because I have a taker for it)".

compared to

With HWM = 0, the stream is asking not to put anything its queue, except for the case where it wants a chunk. Here, most of the time writer.ready will be pending. When the sink eventually wants a chunk, writer.ready temporarily becomes resolved and waits for a chunk to be written. Once that happens, writer.ready becomes pending again.

makes it sound like HWM = 0 is perfect for this use case. When the sink wants a chunk, it will signal that through writer.ready.

@jan-ivar
Copy link
Contributor

Yes it is perfect. I was merely trying to articulate the expressive gap (What is that WritableStream trying to express?)

We're on the same page I think.

@domenic
Copy link
Member

domenic commented Sep 14, 2021

Well, it feels like there's still a missing capability here, as noted by the OP and even your comment in #1157 indicating that some way to get less buffering would be helpful. I'm just confused as to what that missing capability is still, i.e. I cannot understand what the proposed controller.requestData() would give that isn't already given by using HWM = 0 or HWM = 1.

@jan-ivar
Copy link
Contributor

the proposed controller.requestData()

I was just bikeshedding on controller.releaseBackpressure() based on @MattiasBuelens using the phrase "request some data if it doesn't have anything queued yet". Sorry for causing confusion.

But you're right, there is a (side) issue in #1157 (comment) with effectively implementing a FrameDropper TransformStream. Today, it requires {highWaterMark: 1}, {highWaterMark: 2} in order to perform the if (controller.desiredSize < 2) to drop a frame. It's not clear how one would implement it with {highWaterMark: 0}, {highWaterMark: 0} which would be desirable.

@jan-ivar
Copy link
Contributor

jan-ivar commented Sep 14, 2021

that isn't already given by using ... HWM = 1

I think the OP does a good job of explaining the problem with HWM =1: it sprinkles chunks into the pipe early, just because HWM = 1 says to do it. I see no inherent reason a TransformStream has to require this to operate, when it could requestData() on demand (when it gets a pull() on its readable). It doesn't require a chunk to have already been queued up, in order to work, inherently.

For sources with time-sensitive chunks — which is any source that may drop chunks as a solution to back-pressure (a camera or WebTransport datagram receiver) — operating on already-queued chunks means potentially operating on old chunks. Example.

@domenic
Copy link
Member

domenic commented Sep 14, 2021

So maybe my confusion is this. @MattiasBuelens said

With HWM = 0, the stream is asking not to put anything its queue, except for the case where it wants a chunk. Here, most of the time writer.ready will be pending. When the sink eventually wants a chunk, writer.ready temporarily becomes resolved and waits for a chunk to be written. Once that happens, writer.ready becomes pending again.

I assume this is referring to the current spec? However, I can't figure out in what scenario with HWM = 0 writer.ready will ever resolve.

@MattiasBuelens
Copy link
Collaborator Author

I assume this is referring to the current spec? However, I can't figure out in what scenario with HWM = 0 writer.ready will ever resolve.

No, this is describing the desired semantics, what I would like HWM = 0 to do. "When the sink eventually wants a chunk" means "when controller.releaseBackpressure() is called".

In the current spec, when HWM = 0, writer.ready is always pending. I would like to change that, so that the underlying sink has some control over that.

@domenic
Copy link
Member

domenic commented Sep 14, 2021

Oh! That explains the confusion.

What do you think of @ricea's suggestion in #1158 (comment) for that purpose? Or even just changing the ready condition from queueSize < highWaterMark to queueSize <= highWaterMark, with some reasoning about how writable streams by default want at least one chunk, and HWM = 0 just says at most one chunk (no matter its size).

I think that's less expressive than a dedicated method, but it might be a better general fix if right now HWM = 0 is just a footgun.

@MattiasBuelens
Copy link
Collaborator Author

I think it may be sufficient to have a readyWhenNotWriting boolean as part of the strategy

I don't know if I'd want readyWhenNotWriting as part of the queuing strategy, I feel like it belongs to the underlying sink.

  • If it were part of the strategy, we'd have to change the ByteLengthQueuingStrategy and CountQueuingStrategy classes too so you could set that property through their constructors.
  • It'd be kind of weird to have a queuing strategy property that only applies to writable streams.

But if you have good arguments for adding it to the queuing strategy, I'm all ears. 🙂

Or even just changing the ready condition from queueSize < highWaterMark to queueSize <= highWaterMark

That would mean a pipe operation would never fill exactly up to the HWM, you'd always go over the HWM, right? 🤔

I'm still leaning towards a controller method like controller.releaseBackpressure():

  • When called, it sets an internal boolean [[releaseBackpressure]] flag to true and calls WritableStreamUpdateBackpressure() if needed.
  • If [[releaseBackpressure]] is true, WritableStreamDefaultControllerGetBackpressure must return false (without checking desizedSize).
  • When WritableStreamDefaultControllerWrite is called, [[releaseBackpressure]] is reset to false.

@domenic
Copy link
Member

domenic commented Sep 14, 2021

That would mean a pipe operation would never fill exactly up to the HWM, you'd always go over the HWM, right? 🤔

True. I guess another alternative is special-casing 0 HWM, since in that case you don't want to exactly fill up to the HWM (because doing so stalls the stream). But that's kind of icky...

@MattiasBuelens
Copy link
Collaborator Author

MattiasBuelens commented Sep 14, 2021

Yeah, that's also what I was thinking. If we consider HWM = 0 to be broken, we could special case it. I think we would only need to change a single abstract op:

WritableStreamDefaultControllerGetBackpressure(controller) performs the following steps:

  1. Let desiredSize be ! WritableStreamDefaultControllerGetDesiredSize(controller).
  2. If controller.[[strategyHWM]] is 0,
    1. Return true if desiredSize < 0, or false otherwise.
      (This is effectively the same as checking if controller.[[queueTotalSize]] > 0.)
  3. Otherwise,
    1. Return true if desiredSize ≤ 0, or false otherwise.

Alternatively, if want zero-size chunks to also cause backpressure, we could change it to "if controller.[[stream]].[[writeRequests]] is not empty" or something.

Still, not sure how I feel about this. Might be difficult to explain this behavior to developers. But then again, so would explaining the need for controller.releaseBackpressure()... 🤷‍♂️

@ricea
Copy link
Collaborator

ricea commented Sep 14, 2021

WritableStreamDefaultControllerGetBackpressure(controller) performs the following steps:

  1. Let desiredSize be ! WritableStreamDefaultControllerGetDesiredSize(controller).

  2. If controller.[[strategyHWM]] is 0,

    1. Return true if desiredSize < 0, or false otherwise.
      (This is effectively the same as checking if controller.[[queueTotalSize]] > 0.)
  3. Otherwise,

    1. Return true if desiredSize ≤ 0, or false otherwise.

Is this different from just making HWM 0 equivalent to HWM 1?

@MattiasBuelens
Copy link
Collaborator Author

Is this different from just making HWM 0 equivalent to HWM 1?

You're right, that change would make them equivalent. That's not what we want. Woops! 😅

Okay, then it's back to the explicit controller.releaseBackpressure(). 😛

@domenic
Copy link
Member

domenic commented Sep 15, 2021

Is this different from just making HWM 0 equivalent to HWM 1?

I believe it's different in the case where the queue total size is between 0 and 1. But the fundamental issue where it causes a chunk to end up in the queue remains; see below.

Okay, then it's back to the explicit controller.releaseBackpressure().

I was hoping there'd be some way to get the benefit here, without a new API, and with some way of "fixing" HWM = 0 so that it's less of a footgun and more useful.

But going back to the description of the desired behavior,

With HWM = 0, the stream is asking not to put anything its queue, except for the case where it wants a chunk. Here, most of the time writer.ready will be pending. When the sink eventually wants a chunk, writer.ready temporarily becomes resolved and waits for a chunk to be written. Once that happens, writer.ready becomes pending again.

I guess we might need something different after all. If we try to use the HWM, we're intricately tied to the queue; the only way to switch between "not ready" and "wants a chunk" mode is by manipulating the contents of the queue, presumably by dropping it from containing a chunk to not containing a chunk. But that means the queue must contain a chunk during the "not ready" state, which is what we want to avoid!

So I am convinced that we cannot do anything just by tweaking the HWM/queue comparison mechanism.

That leaves two options:

  • Explicit method; or
  • readyWhenNotWriting

How do these compare in expressiveness? At first I thought the explicit method was more expressive, since you could combine it with any arbitrary HWM. But I wonder if that's actually true... does it make sense to use it with non-0 HWMs? How do each compare for realistic use cases?

It almost feels like readyWhenNotWriting is just a totally alternate kind of queuing strategy, separate from a size-based one?

@jan-ivar
Copy link
Contributor

jan-ivar commented Sep 15, 2021

does it make sense to use it with non-0 HWMs?

I don't think so, because releasing back pressure (which is built on queues) works fine then (there's water on the wheel).

This doesn't feel like a different strategy to me, so much as a way for WritableStream to signal for data (sorta) like one can with ReadableStream.

@ricea
Copy link
Collaborator

ricea commented Nov 16, 2021

I think I see two options for the API:

  1. Implicit. Semantics are "writer.ready is true when the underlying sink is not writing".
  2. Explicit. Semantics are "writer.ready is true when controller.bikeshed() has been called until the next chunk is written".

I am leaning towards the explicit options, because the implicit version may require sink authors to artificially extend the time before resolving the write() promise just to stop another chunk from being written.

The implicit version would only do anything when HWM=0. The explicit version could in principle work with any HWM, but there is no obvious use case for it (if we are already behind in writing, why would we ask for more chunks?).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

Successfully merging a pull request may close this issue.

4 participants