Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Need debounceMap (like exhaustMap, but plays the last event) #1777

Closed
Dorus opened this issue Jun 16, 2016 · 15 comments
Closed

Need debounceMap (like exhaustMap, but plays the last event) #1777

Dorus opened this issue Jun 16, 2016 · 15 comments

Comments

@Dorus
Copy link

Dorus commented Jun 16, 2016

http://i.imgur.com/ciO6Kxd.png

It would be nice to have an operator that plays the latest event, like exhaustMap, but also guarantees that the last event from a stream is processed.

A good usecase would be when inner the observable does not support cancellation. switchMap would not be suited because all inner sources would stay active in the background. Instead you might want to use exhaustMap, but then we might miss the latest event. A new operator that would process the last event too would be useful.

@Dorus
Copy link
Author

Dorus commented Sep 15, 2016

Got a correct (but not perfect) implementation that might show better what i want:

function auditMap(project, resultSelector) {
  return Rx.Observable.create(ob => {
    var count = 0;
    return this.do(() => count++)
      .concatMap((e, i) => Rx.Observable.defer(() => 
          count === 1 ? project(e, i) : Rx.Observable.empty()
        ).concat(Rx.Observable.of(0).do(() => count--).ignoreElements()),
        resultSelector
      ).subscribe(ob);
  });
};

Rx.Observable.prototype.auditMap = auditMap;

https://jsbin.com/galuno/6/edit?js,output

The only problem with this solution is that it will keep all emitted values in memory until the inner observable completes. This is inherent to concatMap but would not be needed for this operator.

Some expected in and outputs:

result$ = source$.auditMap(e => Rx.Observable.timer(0, 3050).take(2).mapTo(e))
source$ -1-2-----------
result$ -1--12--2------

source$ -1-2-3---------
result$ -1--12--23--3--

source$ -123-----------
result$ -1--13--3------

source$ -1-2-3-4-------
result$ -1--12--24--4--

I named it auditMap this time, still pondering on what the most descriptive name would be.

@Dorus
Copy link
Author

Dorus commented Oct 4, 2016

I just realize i need this operator too but then as a normal one without selector, like debounce, throttle etc.

For example, with a timeSpan of 3:

source$ -1-2-----------
result$ -1---2---------

source$ -1-2-3---------
result$ -1---2---3-----

source$ -123-----------
result$ -1---3---------

source$ -1-2-3-4-------
result$ -1---2---4-----

source$ -1234---5------
result$ -1---4---5-----

source$ -1234----5-----
result$ -1---4---5-----

source$ -1234-----5-----
result$ -1---4----5-----

I do not believe there is any easy way to get these marbles with current operators. I also realize names like auditMap and debounceMap are not correct because they would have different marbles if used simply like auditTime(3).

@deadbeef84
Copy link

For reference, here's my solution:

function auditMap(project, resultSelector) {
  return Observable.using(
    () => ({
      running$: new BehaviorSubject(false)
    }),
    ({ running$ }) =>
      this.audit(() => {
        return running$
          .first(running => !running)
          .do(() => running$.next(true))
      }).concatMap(
        (value, index) => Observable.from(project(value, index))
          .do(null, null, () => running$.next(false)),
        resultSelector
      )
  )
}

Not exactly pretty. I would prefer an operator in the library, or at least a nicer way of doing it using existing operators.

There's currently (v5.4.1) some issues with audit and synchronous durationSelectors, but there's an issue for it so it should be fixed soon.

@Dorus
Copy link
Author

Dorus commented Jun 19, 2017

Another idea on this subject i wrote up a while ago, posting it here to be able to find it back easier later.

Random idea i like to pitch: map* operators like mergeMap, exhaustMap and switchMap should have both an concurrent parameter and a bufferSize parameter. The default is 1 concurrent and 0 buffer (or endless buffer for mergeMap), actually exhaustMap is just mergeMap with a buffer size of 0, just like concatMap is mergeMap with concurrent set to 1. When operators receive new events they would first put them in the buffer, only once the buffer overflows they move to the normal behaviour (cancelling for switchMap, dropping for exhaustMap etc).
I wonder if there are more flavours possible and if these are useful: Play oldest first, play newest first, drop oldest first, drop newest first etc. For switchMap even cancel oldest or newest.

The idea of the operator here is pretty much an exhaustMap with a buffer size of 1.

@Dorus
Copy link
Author

Dorus commented Apr 11, 2018

Somebody just pointed out to me that throttle has an (undocumented) {leading: boolean, trailing: boolean} config object you can pass. This trailing feature looks a lot like what i want to do here too.

@cartant
Copy link
Collaborator

cartant commented Jan 28, 2019

Closing because:

This trailing feature looks a lot like what i want to do here too.

@cartant cartant closed this as completed Jan 28, 2019
@Dorus
Copy link
Author

Dorus commented Jan 28, 2019

However throttle does not have a map feature so it's not the same.

@cartant
Copy link
Collaborator

cartant commented Jan 28, 2019

Perform the mapping separately. Conflating the mapping and the time-based operation into a single, built-in operator doesn't seem like something that would make for an acceptable core library operator. Also:

Our general recommendation is if implementation is trivial / and doesn't require modification in core, we do recommend for user-land first. We want to gather usecases around how much common it is before we include into core, since adding new api surface is easy but deprecating / changing behavior is exceptionally hard, as well as we don't want to core api surface it too broad (it is already, and we are trying to shrink it down).

@Dorus
Copy link
Author

Dorus commented Jan 28, 2019

Please show a trivial implementation of a mergemap like operator that will execute the first inner observable, but delay the second when the first one is still active when it arrives, or drops the second when a third arrives before the first one finished.

For example this is how you would want exhaust map to behave to make it useful for things like type ahead, instead of switchmap.

@cartant
Copy link
Collaborator

cartant commented Jan 28, 2019

You would not want to use exhaustMap for a typeahead. Rather, you'd want something like this: https://github.com/cartant/rxjs-etc/blob/master/source/operators/switchMapUntil.ts

For background, read the article that's referenced in the comment at the top of that file. And note that the implementation is trivial.

@Dorus
Copy link
Author

Dorus commented Jan 28, 2019

That's a variant of switchmap, the proposal here is to have a function that combines the useful properties of concataMap, exhaustMap and switchMap.

Namely: do not cancel ongoing requests (concat and exhaust)
Cancel / ignore a too high frequency of requests (exhaust and switch)
Run the last requesr (switch and concat).
Run only one at the same time (switch, concat and exhaust).

Notice switchmapuntil still cancels ongoing requests,so it's not suitable for all purposes.

@cartant
Copy link
Collaborator

cartant commented Jan 28, 2019

I see what you mean about non-cancellable requests. However, the debouncing should be done before the mapping, IMO. I would not favour relying upon exhaustMap's behaviour to perform what amounts to debouncing.

Anyway, a user-land implementation is not complicated (almost anything can be built with flatMap/mergeMap):

function exhaustMapWithTrailing<T, R>(
    project: (value: T, index: number) => ObservableInput<R>
): OperatorFunction<T, R> {
    return source => {
        let pending = false;
        let queued: [T, number] | undefined = undefined;
        return source.pipe(
            mergeMap((value, index) => {
                if (pending) {
                    queued = [value, index];
                    return EMPTY;
                }
                pending = true;
                return from(project(value, index)).pipe(
                    concat(defer(() => {
                        if (!queued) {
                            return EMPTY;
                        }
                        const projected = project(...queued);
                        queued = undefined;
                        return from(projected);
                    })),
                    tap({
                        complete: () => pending = false
                    })
                );
            })
        );
    };
}

@ghetolay
Copy link
Contributor

We certainly don't have the same definition of a trivial implementation.
Also in that case I think operator's implementation like switchMap would be even more trivial.

What I think is trivial/common here is the use case leading to the need of that operator: "discard all subsequent but hold most recent".

@Dorus
Copy link
Author

Dorus commented Jan 30, 2019

I'm going to leave this here as it's been 2 days and i have not gotten around to test this, i wonder if the following scenario will pass in exhaustMapWithTrailing @cartant .

source.pipe(
  exhaustMapWithTrailing(e => of(e).pipe(
    delay(30)
  ))
);
source -1-2--3|
result ----1---2---3|

@edusperoni
Copy link

edusperoni commented Feb 22, 2019

@Dorus I just tested it and 3 does not get emitted, unfortunately :(

Edit:

I wrote one, kinda messy, could probably be optimized:

function exhaustMapWithTrailing2<T, R>(
  project: (value: T, index: number) => ObservableInput<R>
): OperatorFunction<T, R> {
  return source => {
    let request = null;
    let subj = new Subject<[T, number] | undefined>();
    let queued: [T, number] | undefined = undefined;
    let req = subj.pipe(
      exhaustMap((v) => from(project(...v)))
    );
    let first = true;
    let doing = false;
    return source.pipe(
      mergeMap((value, index) => {
        if (doing) {
          queued = [value, index];
        } else { subj.next([value, index]); }
        console.log("inside " + value + queued);
        if (first) {
          req = subj.pipe(startWith([value, index] as [T, number]), tap(() => doing = true),
            exhaustMap((v) => from(project(...v)).pipe(finalize(() => {doing = false; if(queued) {const q2 = queued; queued = undefined; subj.next(q2)}})))
          );
          first = false;
          return req;
        }
        return EMPTY;
      })
    )
  };
}

@lock lock bot locked as resolved and limited conversation to collaborators Mar 24, 2019
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants