Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pass async function to new Observable() #2827

Closed
RafaelKr opened this issue Sep 5, 2017 · 1 comment
Closed

Pass async function to new Observable() #2827

RafaelKr opened this issue Sep 5, 2017 · 1 comment

Comments

@RafaelKr
Copy link

RafaelKr commented Sep 5, 2017

RxJS version: 5.4.0

Code to reproduce:

public receiveData(): Observable<any> {
  return new Observable(async (observer) => {
    let data

    // wait until service is initialized
    await this.ready()

    // get some data here or do something else
    observer.next(data)

    // get some more data
    observer.next(data)

    observer.complete()
  })
}

Expected behavior:
I should be able to pass an async function as first parameter to new Observable().
I know that it's possible to use Observable.from(new Promise(async (...) => { ... })), but then I don't have the ability to emit more than one item anymore.
Also I think if an error is thrown it should be passed to observer.error automatically.

Actual behavior:
TypeScript error:
Argument of type '(this: Observable<{}>, observer: Subscriber<{}>) => Promise<void>' is not assignable to parameter of type '(this: Observable<{}>, subscriber: Subscriber<{}>) => TeardownLogic'. Type 'Promise<void>' is not assignable to type 'TeardownLogic'. Type 'Promise<void>' is not assignable to type 'AnonymousSubscription'. Property 'unsubscribe' is missing in type 'Promise<void>'.

Additional information:
Currently this is possible by doing something like this, which isn't really nice:

public receiveData(): Observable<any> {
  return new Observable((observer) => {
    (async () => {
      let data

      // wait until service is initialized
      await this.ready()

      // get some data here or do something else
      observer.next(data)

      // get some more data
      observer.next(data)

      observer.complete()
    })()
    // HACK: prevent linter warning when `no-floating-promises` is set
    .then(null, observer.error)
  })
}

Keywords: async/await Observable, TeardownLogic Promise

@kwonoj
Copy link
Member

kwonoj commented Sep 5, 2017

You can use defer with promise for multiple subscriptions. It isn't direct accept for async fn that you still need to interop, it works as expected.

On the other hand, I think if an error is thrown it should be passed to observer.error automatically. - Rx intentionally throws out exception in observable creation to not to propagate into a subscription. Refer #1833 / #2313 .

@lock lock bot locked as resolved and limited conversation to collaborators Jul 29, 2018
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants