Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reworking Multicasting: share, connect, and makeConnectable #5634

Merged
merged 8 commits into from
Dec 14, 2020

Conversation

benlesh
Copy link
Member

@benlesh benlesh commented Aug 5, 2020

I'd like to do this instead of what I was doing over at #5432 ..

Overview

  1. Make share configurable, such that it can be used to create things like shareReplay or people can configure the behavior however they like. If they want a shareBehavior or shareLast, they can easily make it work however they like.
  2. Add a newconnect operator that does the "selector" version of multicast, publish, et al. (this also fixes a broken error behavior in publishReplay).
  3. Add a new makeConnectable function that creates a ConnectableObservableLike (instead of ConnectableObservable).

What this sets us up for:

We'd have

  • share (NOTE: I'll leave shareReplay as just a wrapper around this, as seen in this PR)
  • connect
  • connectable

Instead of

  • share
  • multicast
  • publish
  • publishLast
  • publishReplay
  • publishBehavior
  • refCount
  • shareReplay
  • ConnectableObservable

All "operators" at that point would actually only return Observable and never ConnectableObservable, nor would any require ConnectableObservable as a source (such as with refCount).

TODO:

  • Additional documentation
  • Additional tests
  • Add deprecation details to deprecated operators and paths

Other thoughts

It's possible that a better API for makeConnectable might be to return a tuple with a connect function. Like so:

const [ result$, connect ] = makeConnectable(source);
result$.subscribe(console.log);
connect();

// instead of
const result$ = makeConnectable(source);
result$.subscribe(console.log);
result$.connect();

How this "replaces" certain things

Well, the I find the prevalence of publishReplay(1), refCount() very disturbing. For one thing, I doubt many folks using this are aware of what this does completely. It's not retryable, it's not replayable, etc.

a$.pipe(publishReplay(1), refCount())

// is

a$.pipe(
  share({
    connector: () => new ReplaySubject(1),
    resetOnError: false,
    resetOnComplete: false,
    resetOnUnsubscribe: false
  });
)

"But Ben, that's more code!"... yes, and it's explicit code that tells you exactly what you're doing there. That's not retryable, it's not repeatable... it will never reset internally. It also allows the author more control without having to know implementation details of publishReplay. Needing to know implementation details of our multicasting operators has been a sore point of this library, IMO.

NO WORRIES: a$.pipe(share()) will work exactly as it does now. (Notice those tests didn't change)

@benlesh benlesh requested a review from cartant August 5, 2020 20:48
@benlesh benlesh added the AGENDA ITEM Flagged for discussion at core team meetings label Aug 5, 2020
@benlesh benlesh mentioned this pull request Aug 5, 2020
7 tasks
@benlesh
Copy link
Member Author

benlesh commented Aug 5, 2020

Related: #3833

@benlesh
Copy link
Member Author

benlesh commented Aug 6, 2020

Rebased

@benlesh
Copy link
Member Author

benlesh commented Aug 12, 2020

Core team to review... revisit next meeting.

@benlesh
Copy link
Member Author

benlesh commented Sep 19, 2020

This is blocked on #5729

Copy link
Collaborator

@cartant cartant left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Just some questions, a bunch of nitpicks and some names of which I am not a fan. 😅

Also, after this is rebased, the skipped firehose tests should be unskipped and made to pass. A contributor opened a PR to address the skipped tests, but I blocked the PR, as the changes were based on the old multicast infrastructure - see #5834

Comment on lines 21 to 42
return lift(source, function (this: Subscriber<R>, source: Observable<T>) {
const subscriber = this;
let subject: Subject<T>;
try {
subject = connector();
} catch (err) {
subscriber.error(err);
return;
}

let result: Observable<R>;
try {
result = from(setup(subject.asObservable()));
} catch (err) {
subscriber.error(err);
return;
}

const subscription = result.subscribe(subscriber);
subscription.add(source.subscribe(subject));
return subscription;
});
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: This PR predates the smallification, so it should be updated to use operate. And the source.subscribe(subject) subscription should be added to the subscriber, etc.

setup,
}: {
connector?: () => Subject<T>;
setup: (shared: Observable<T>) => ObservableInput<R>;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: I'm not a fan of the setup property's name. TBH, I preferred selector. My main beef with setup is that, IMO, it doesn't really explain what the parameter does. Elsewhere in the API functions that are used in this manner are called selectors and I am a fan of consistency and precedent.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about project?

Copy link
Member Author

@benlesh benlesh Nov 27, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

selector, IMO, also doesn't explain what the parameter does, either. I mean, this sets up the multicast as much as it selects the output. Happy to bikeshed the name, but overall, I guess I'm just not sure what to name this. project is used elsewhere to mean a synchronous map-type function, and I'm not thrilled with its use in things like mergeMap.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe cast? or multicast? or??

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(I'm open to moving to back to the selector, though). I just don't like that it doesn't completely imply what's going on here. catchError is a little different, because it's only called when an error happens, and you have to "select" what to observe after that error shuts things down.

This function is required to setup the multicast. It also selects what sort of values are emitted from the result. I guess I'm now unhappy with either setup or select haha. But I do know I want it to be terse.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is required to setup the multicast. It also selects what sort of values are emitted from the result.

Well, the function is "required to setup" the multicast, but it doesn't actually set it up. It receives the shared/multicast observable and then "selects what sort of values are emitted from the result".

That said, cast or multicast are okay, I suppose, but my preference is still for selector.


result.connect = function () {
if (!connection) {
connection = defer(() => source).subscribe(connector);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: Is there a reason for using defer here instead of from? source is a parameter and is never reassigned, so I don't see why this needs to be lazy.

* @returns A "connectable" observable, that has a `connect()` method, that you must call to
* connect the source to all consumers through the subject provided as the connector.
*/
export function makeConnectable<T>(source: ObservableInput<T>, connector: Subject<T> = new Subject<T>()): ConnectableObservableLike<T> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: Not a fan of the makeConnectable name. IMO, this seems odd, when looking at the other parts of the API. I mean, it's timer, not makeTimer or createTimer. My preference would be for it to be named connectable.


function shareSubjectFactory() {
return new Subject<any>();
interface ShareOptions<T, R> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(very minor) nitpick: In other parts of the API, these types/parameters are referred to as 'config'. I think it should be one or the other and should be consistent throughout the API, but - again - this is very minor.

hasCompleted = hasErrored = false;
};

return lift(source, function (this: Subscriber<T | R>, source: Observable<T>) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: With the changes made in the smallification, this should now used operate.

subject = connector!();
}

const castSubscription = subject.subscribe(subscriber);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: This seems like a weird variable name, IMO. Where is the cast?

let { connector, resetOnComplete = true, resetOnError = true, resetOnUnsubscribe = true } = options;
if (!connector) {
connector = () => new Subject<T>();
}
Copy link
Contributor

@backbone87 backbone87 Nov 18, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isnt this more consistent?

const {
  connector = () => new Subject<T>(),
  resetOnComplete = true,
  resetOnError = true,
  resetOnUnsubscribe = true,
} = options || {};

also shouldnt this be moved into the share functions block insteadof the operator function block? maybe even:

export function share<T, R>({
  connector = () => new Subject<T>(),
  resetOnComplete = true,
  resetOnError = true,
  resetOnUnsubscribe = true,
}: ShareOptions<T, R> = {}): OperatorFunction<T, T | R> {
  // ...
}

also the type param R doesnt seem to be used really?

* If false, when the number of subscribers to the resulting observable reaches zero due to unsubscription, the subject
* will remain connected to the source, and new subscriptions to the result will be connected through that same subject.
*/
resetOnUnsubscribe?: boolean;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i was under the assumption that a complete/error will always result in an implicit unsubscribe afterwards. its not reflected in this description that an unsubscribe that follows a complete/error, will not result in a reset.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This means that the next time the resulting observable is subscribed to, a new subject will be created and the source will be subscribed to again.

This should also be added to the resetOnError/resetOnComplete descriptions. Maybe its worth to describe a reset independantly of these 3 configs in the share operator itself, so that the description of the reset params can be reduced to actual differences between them and therefore causing less mental friction.


function shareSubjectFactory() {
return new Subject<any>();
interface ShareOptions<T, R> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it worth to add a resetNotifier$ in order to have more fine grained control over when resets happen?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could technically do that by composing takeUntil

source$.pipe(
  takeUntil(resetNotifier$),
  share(),
  repeat(),
)

@backbone87
Copy link
Contributor

It's possible that a better API for makeConnectable might be to return a tuple with a connect function. Like so:

just throwing in other alternatives:

const notifier = new Subject();

makeConnectable(source, notifier);
notifier.subscribe(connects(source));

Adds a feature that allows two observables to be tested for equality of output.
…Like`.

BREAKING CHANGE: The TypeScript type `Subscribable` now only supports what is a valid return for `[Symbol.observable]()`.

BREAKING CHANGE: The TypeScript type `Observer` no longer incorrectly has an optional `closed` property.
@benlesh
Copy link
Member Author

benlesh commented Nov 28, 2020

Okay, I've made a few changes to this.

Per @cartant's request, makeConnectable is now connectable.

OTHER VERY IMPORTANT THINGS:

  1. I've added a new TestScheduler helper: expectObservable(a$).toEqual(b$). I needed this so I could test that the deprecation advice I was giving people in the deprecation messages was proven, not just theoretical. It's publicly exposed, and I think we should ship it. It's useful for this exact scenario and it follows the jest and jasmine pattern of toBe and toEqual, IMO.
  2. I've fixed the Observer interface to remove the optional closed property, which shouldn't have been there. (That may be breaking for TypeScript users).
  3. I've fixed the Subscribable interface to just be what is valid as a return value of [Symbol.observable](). That's because I was using with SubjectLike.
  4. I'm now exporting a new type SubjectLike, it's exactly what it sounds like.

TODO

  1. We need to discuss SubjectLike. We might want it to be SubjectLike<In, Out> and not just SubjectLike<T>. This relates to things like WebSocketSubject, etc.
  2. We need to bikeshed the setup function as discussed above.

cc @cartant @kwonoj

@benlesh benlesh requested a review from cartant November 28, 2020 02:13
Copy link
Collaborator

@cartant cartant left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some things that might or might not be issues and some nitpicks and comments/thoughts, etc. No major issues, AFAICT.

src/internal/types.ts Outdated Show resolved Hide resolved
src/internal/types.ts Outdated Show resolved Hide resolved
setup,
}: {
connector?: () => Subject<T>;
setup: (shared: Observable<T>) => ObservableInput<R>;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is required to setup the multicast. It also selects what sort of values are emitted from the result.

Well, the function is "required to setup" the multicast, but it doesn't actually set it up. It receives the shared/multicast observable and then "selects what sort of values are emitted from the result".

That said, cast or multicast are okay, I suppose, but my preference is still for selector.

src/internal/operators/share.ts Outdated Show resolved Hide resolved
spec/schedulers/TestScheduler-spec.ts Outdated Show resolved Hide resolved
src/internal/observable/connectable.ts Outdated Show resolved Hide resolved
src/internal/operators/multicast.ts Outdated Show resolved Hide resolved
spec/operators/share-spec.ts Show resolved Hide resolved
@benlesh benlesh requested a review from cartant December 6, 2020 01:22
@benlesh
Copy link
Member Author

benlesh commented Dec 6, 2020

Requesting rereview.

Note that the change to make Subscribable accept Partial<Observer<T>> also meant that the type for Observable subscribe needed to take Partial<Observer<T>> instead of PartialObserver<T>, the latter being a type that forced at least one callback (for better or worse). I'm totally fine with this, and I'm not sure that bit was helpful anyhow. If anything, I'd bet it was confusing for users at times.

Copy link
Collaborator

@cartant cartant left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM ❤

@benlesh benlesh merged commit 8c13e0a into ReactiveX:master Dec 14, 2020
@cartant
Copy link
Collaborator

cartant commented Dec 14, 2020

🎉

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
AGENDA ITEM Flagged for discussion at core team meetings blocked
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants