Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async Iterable EventEmitter.on(emitter, "event") #27847

Closed
benjamingr opened this issue May 24, 2019 · 13 comments
Closed

Async Iterable EventEmitter.on(emitter, "event") #27847

benjamingr opened this issue May 24, 2019 · 13 comments
Labels
events Issues and PRs related to the events subsystem / EventEmitter.

Comments

@benjamingr
Copy link
Member

benjamingr commented May 24, 2019

Continuing the discussion from the once PR. Regarding the discussion of EventEmitter.on(emitter, "eventName")

events.on(emitter, name)

  • emitter <EventEmitter>
  • name <string>
  • Returns: <AsyncIterable>

Creates an AsyncIterable that yields promises for the values of the given event name in the given EventEmitter. The iterable rejects the next promise when the EventEmitter emits 'error'. Calling .return on the iterable removes the subscription which means the iterable unsubscribes automatically when a for await loop is broken or returned from.

Here is an example of the API:

const events = on(emitter, "item");
events.next(); // Promise for the next event value 
events.return(); // unsubscribe
events.throw(); // not sure what we want this to do, we might want to emit `error` on 

Or with for await syntax:

const foos = on(emitter, 'foo');
for await (const foo of foos) {
  foo; // the next value from the emitter
  break; // this calls `.return` on the async iterable and unsubscribes.
}

cc @mcollina @jasnell @MylesBorins @addaleax @BridgeAR

@benjamingr
Copy link
Member Author

Addressing @jasnell 's points from that PR:

Ok, quick investigation... it should be possible to implement an EventEmitter.on(emitter, event) function that is essentially a simplification of the code used for stream.Readable with a few notable differences:

Unlike stream.Readable, there is no natural end to the event stream, and there is no destroy() method, so cleanup and termination of the for await() {} would be entirely dependent on using break and return as suggested.

This is true with a for await loop. It is also possible to call .return on the async iterable too close it.

With stream.Readable, when there are multiple synchronous push()'s, there's still just a single 'readable', and just in case there are multiple read() operations performed. For an arbitrary EventEmitter, we would need a way of reliably handling synchronous emits.

True, and this is by design for the most part. Async iterator libraries like axax have utilities like take which make this easy.

@mcollina
Copy link
Member

My main concern about this API is that EE does not support backpressure. Essentially the following:

const foos = on(emitter, 'foo');
for await (const foo of foos) {
  await doSomethingRemotely()
}

Will have all events queue up and processed one at a time. Are we ok with it? I'm struggling to make up my mind. I think some safety measure should be built in (maybe with an option), like the following:

const foos = on(emitter, 'foo', { maxQueue: 1000 });
// will throw if the number of accumulated events waiting to be processed are
// over 1000.
for await (const foo of foos) {
  await doSomethingRemotely()
}

What do you think?


On another note, we need to make sure the following (or similar) works:

const foos = on(emitter, 'foo');
emitter.on('end', foos.removeListener.bind(foos)); // or similar
for await (const foo of foos) {
  await doSomethingRemotely()
}
// the loop will terminate when 'end' is emitted

@benjamingr
Copy link
Member Author

Will have all events queue up and processed one at a time. Are we ok with it?

I think we are - events that are push can't do backpressure - that's why we have readable streams. We can and should be explicit about this in the documentation. I don't think we have to provide maxQueue because I definitely see use cases where we'd be fine with a queue of a few thousand items. Not feeling strongly about this though and ready to be convinced.

On another note, we need to make sure the following (or similar) works:

Oh interesting, can a listener detect it being removed? Are we sure we actually have to support this?

@devsnek
Copy link
Member

devsnek commented May 24, 2019

we can make it work by having removeListener clear the queue and putting a done: true into the queue of the async iterable

@legendecas
Copy link
Member

we can make it work by having removeListener clear the queue and putting a done: true into the queue of the async iterable

Would it be an issue if the emitter was been removeListener or removeAllListener? The iterator would hang unawarely in the case.

@mcollina
Copy link
Member

Would it be an issue if the emitter was been removeListener or removeAllListener? The iterator would hang unawarely in the case.

Yes, but that would happen even with a normal event handler. I don't think it's going to be a problem at all.

@mcollina
Copy link
Member

A problem that I handled with EventEmitter.once() is variadic arguments of emit(). The actual signature is going to look like the following:

const foos = on(emitter, 'foo');
process.nextTick(() => {
  emitter.emit('foo', 'a', 'b')
})
for await (const [first, second] of foos) {
}

@jasnell
Copy link
Member

jasnell commented May 24, 2019

As a more friendly syntax, we could support this case...

const foos = on(emitter, 'foo');
emitter.on('end', foos.removeListener.bind(foos)); // or similar
for await (const foo of foos) {
  await doSomethingRemotely()
}

... By declaring terminal events when the async iterator is created...

const foos = on(emitter, 'foo', { until: 'end' });
for await (const foo of foos) {
  await doSomethingRemotely()
}

@jasnell
Copy link
Member

jasnell commented May 24, 2019

We also need to consider what happens in the case...

const foos = on(emitter, 'foo');
emitter.removeAllListeners('foo')
for await (const foo of foos) {}

The async iterator needs to be able to clean itself up if the listener it registered has been removed.

@mcollina
Copy link
Member

We can use the removeListener event: https://nodejs.org/dist/latest-v10.x/docs/api/events.html#events_event_removelistener.

@ChALkeR ChALkeR added the events Issues and PRs related to the events subsystem / EventEmitter. label May 24, 2019
@michaelsbradleyjr
Copy link

michaelsbradleyjr commented May 30, 2019

In case it's of interest, something very similar is already possible with IxJS:

https://github.com/ReactiveX/IxJS/blob/master/src/asynciterable/fromevent.ts#L31

Once the emitter is wrapped with Ix, it can be used with for await...of or Ix's own chain-able operators, depending on one's needs and preferences.

@benjamingr
Copy link
Member Author

Matteo already has an implementation :D

@littledan
Copy link

This idea seems good in isolation, but would it be worth seeing whether Emitter would be a good return value for on before landing this API?

mcollina added a commit to mcollina/node that referenced this issue Dec 22, 2019
BridgeAR pushed a commit that referenced this issue Jan 3, 2020
Fixes: #27847
PR-URL: #27994
Reviewed-By: Benjamin Gruenbaum <[email protected]>
Reviewed-By: James M Snell <[email protected]>
Reviewed-By: Gus Caplan <[email protected]>
Reviewed-By: Anna Henningsen <[email protected]>
Reviewed-By: Rich Trott <[email protected]>
targos pushed a commit that referenced this issue Jan 14, 2020
Fixes: #27847
PR-URL: #27994
Reviewed-By: Benjamin Gruenbaum <[email protected]>
Reviewed-By: James M Snell <[email protected]>
Reviewed-By: Gus Caplan <[email protected]>
Reviewed-By: Anna Henningsen <[email protected]>
Reviewed-By: Rich Trott <[email protected]>
BethGriggs pushed a commit that referenced this issue Feb 6, 2020
Fixes: #27847
PR-URL: #27994
Reviewed-By: Benjamin Gruenbaum <[email protected]>
Reviewed-By: James M Snell <[email protected]>
Reviewed-By: Gus Caplan <[email protected]>
Reviewed-By: Anna Henningsen <[email protected]>
Reviewed-By: Rich Trott <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
events Issues and PRs related to the events subsystem / EventEmitter.
Projects
None yet
Development

Successfully merging a pull request may close this issue.

8 participants