Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

from(AsyncIterable) doesn't finalize async generators #5998

Closed
ptitjes opened this issue Feb 6, 2021 · 1 comment · Fixed by #6062
Closed

from(AsyncIterable) doesn't finalize async generators #5998

ptitjes opened this issue Feb 6, 2021 · 1 comment · Fixed by #6062
Assignees

Comments

@ptitjes
Copy link

ptitjes commented Feb 6, 2021

Bug Report

Current Behavior
When using from() with an async generator, the generator is never finalized.

import { async, concat, from, Observable, Subscriber } from "rxjs";
import { take, tap } from "rxjs/operators";

async function* gen() {
  try {
    let i = 0;
    while (true) {
      yield i++;
      // added or Chrome hangs
      await new Promise((resolve) => setTimeout(resolve, 100));
    }
  } finally {
    console.log("Finalizing generator");
  }
}

const source = from(gen()).pipe(
  take(3),
  tap({ complete: () => console.log("Done") })
);

source.subscribe(console.log);

yields:

0
1
2
Done

Expected behavior
I would expect for the async generator to be finalized, just as is done for the non-async generators.

The above code should yield:

0
1
2
Done
Finalizing generator

Reproduction

https://stackblitz.com/edit/from-async-generator?file=index.ts

Environment

  • Runtime: both Node v12 and Chrome v88
  • RxJS version: 7.0.0-beta.10

Possible Solution
The fromAsyncIterable() function in rxjs/src/internal/observable/from.ts should handle this case, as does fromIterable().

Something along the lines:

function fromAsyncIterable<T>(asyncIterable: AsyncIterable<T> | AsyncGenerator<T>) {
  return new Observable((subscriber: Subscriber<T>) => {
    process(asyncIterable, subscriber).catch(err => subscriber.error(err));
    return () => {
      if (isFunction(asyncIterable?.return)) asyncIterable.return(null);
    };
  });
}

async function process<T>(
  asyncIterable: AsyncIterable<T> | AsyncGenerator<T>,
  subscriber: Subscriber<T>
) {
  for await (const value of asyncIterable) {
    subscriber.next(value);
  }
  subscriber.complete();
}
@benlesh benlesh self-assigned this Feb 26, 2021
benlesh added a commit to benlesh/rxjs that referenced this issue Feb 26, 2021
- Resolves an issue where AsyncGenerators were not properly finalized when an observable result was unsubscribed.
- Resolves another nearly impossible to test scenario that this happens to cover: In the event of an operator like `take()` completing the resulting observable, if an error is thrown in the terminal completion handler, teardown is not invoked, as it would not continue to the next line to call the unsubscribe method of the Subscriber instance. This was resolved by ensuring synchronous calls before the unsubscribe call are called in a try/finally block.

Fixes ReactiveX#5998
benlesh added a commit to benlesh/rxjs that referenced this issue Feb 26, 2021
- Resolves an issue where AsyncGenerators were not properly finalized when an observable result was unsubscribed.
- Resolves another nearly impossible to test scenario that this happens to cover: In the event of an operator like `take()` completing the resulting observable, if an error is thrown in the terminal completion handler, teardown is not invoked, as it would not continue to the next line to call the unsubscribe method of the Subscriber instance. This was resolved by ensuring synchronous calls before the unsubscribe call are called in a try/finally block.

Fixes ReactiveX#5998
benlesh added a commit that referenced this issue Feb 27, 2021
* fix: finalization/teardown fixes for AsyncGenerators and take

- Resolves an issue where AsyncGenerators were not properly finalized when an observable result was unsubscribed.
- Resolves another nearly impossible to test scenario that this happens to cover: In the event of an operator like `take()` completing the resulting observable, if an error is thrown in the terminal completion handler, teardown is not invoked, as it would not continue to the next line to call the unsubscribe method of the Subscriber instance. This was resolved by ensuring synchronous calls before the unsubscribe call are called in a try/finally block.

Fixes #5998

* chore: remove unnecessary try/finally
@ptitjes
Copy link
Author

ptitjes commented Mar 11, 2021

@benlesh Thank you very much!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants