Skip to content

Commit

Permalink
Add Publisher.replay (#2684)
Browse files Browse the repository at this point in the history
Motivation:
Publisher.replay provides the ability to keep state that is
preserved for multiple subscribers and across resubscribes.
  • Loading branch information
Scottmitch authored Oct 2, 2023
1 parent ee7db1e commit da1ea3b
Show file tree
Hide file tree
Showing 10 changed files with 1,665 additions and 112 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,17 @@
import static io.servicetalk.concurrent.api.EmptyPublisher.emptyPublisher;
import static io.servicetalk.concurrent.api.Executors.global;
import static io.servicetalk.concurrent.api.FilterPublisher.newDistinctSupplier;
import static io.servicetalk.concurrent.api.MulticastPublisher.DEFAULT_MULTICAST_QUEUE_LIMIT;
import static io.servicetalk.concurrent.api.MulticastPublisher.DEFAULT_MULTICAST_TERM_RESUB;
import static io.servicetalk.concurrent.api.MulticastPublisher.newMulticastPublisher;
import static io.servicetalk.concurrent.api.NeverPublisher.neverPublisher;
import static io.servicetalk.concurrent.api.PublisherDoOnUtils.doOnCancelSupplier;
import static io.servicetalk.concurrent.api.PublisherDoOnUtils.doOnCompleteSupplier;
import static io.servicetalk.concurrent.api.PublisherDoOnUtils.doOnErrorSupplier;
import static io.servicetalk.concurrent.api.PublisherDoOnUtils.doOnNextSupplier;
import static io.servicetalk.concurrent.api.PublisherDoOnUtils.doOnRequestSupplier;
import static io.servicetalk.concurrent.api.PublisherDoOnUtils.doOnSubscribeSupplier;
import static io.servicetalk.concurrent.api.ReplayPublisher.newReplayPublisher;
import static io.servicetalk.concurrent.internal.SubscriberUtils.deliverErrorFromSource;
import static io.servicetalk.utils.internal.DurationUtils.toNanos;
import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -2991,7 +2995,7 @@ public final <Key> Publisher<GroupedPublisher<Key, T>> groupToMany(
*/
@Deprecated
public final Publisher<T> multicastToExactly(int expectedSubscribers) {
return multicastToExactly(expectedSubscribers, 64);
return multicastToExactly(expectedSubscribers, DEFAULT_MULTICAST_QUEUE_LIMIT);
}

/**
Expand Down Expand Up @@ -3023,7 +3027,7 @@ public final Publisher<T> multicastToExactly(int expectedSubscribers) {
*/
@Deprecated
public final Publisher<T> multicastToExactly(int expectedSubscribers, int queueLimit) {
return new MulticastPublisher<>(this, expectedSubscribers, true, true, queueLimit, t -> completed());
return newMulticastPublisher(this, expectedSubscribers, true, true, queueLimit, t -> completed());
}

/**
Expand Down Expand Up @@ -3082,7 +3086,7 @@ public final Publisher<T> multicast(int minSubscribers) {
* @see <a href="https://reactivex.io/documentation/operators/publish.html">ReactiveX multicast operator</a>
*/
public final Publisher<T> multicast(int minSubscribers, boolean cancelUpstream) {
return multicast(minSubscribers, 64, cancelUpstream);
return multicast(minSubscribers, DEFAULT_MULTICAST_QUEUE_LIMIT, cancelUpstream);
}

/**
Expand Down Expand Up @@ -3145,7 +3149,7 @@ public final Publisher<T> multicast(int minSubscribers, int queueLimit) {
* @see <a href="https://reactivex.io/documentation/operators/publish.html">ReactiveX multicast operator</a>
*/
public final Publisher<T> multicast(int minSubscribers, int queueLimit, boolean cancelUpstream) {
return multicast(minSubscribers, queueLimit, cancelUpstream, t -> completed());
return multicast(minSubscribers, queueLimit, cancelUpstream, DEFAULT_MULTICAST_TERM_RESUB);
}

/**
Expand Down Expand Up @@ -3224,7 +3228,78 @@ public final Publisher<T> multicast(int minSubscribers, int queueLimit,
*/
public final Publisher<T> multicast(int minSubscribers, int queueLimit, boolean cancelUpstream,
Function<Throwable, Completable> terminalResubscribe) {
return new MulticastPublisher<>(this, minSubscribers, false, cancelUpstream, queueLimit, terminalResubscribe);
return newMulticastPublisher(this, minSubscribers, false, cancelUpstream, queueLimit, terminalResubscribe);
}

/**
* Similar to {@link #multicast(int)} in that multiple downstream {@link Subscriber}s are enabled on the returned
* {@link Publisher} but also retains {@code history} of the most recently emitted signals from
* {@link Subscriber#onNext(Object)} which are emitted to new downstream {@link Subscriber}s before emitting new
* signals.
* @param history max number of signals (excluding {@link Subscriber#onComplete()} and
* {@link Subscriber#onError(Throwable)}) to retain.
* @return A {@link Publisher} that allows for multiple downstream subscribers and emits the previous
* {@code history} {@link Subscriber#onNext(Object)} signals to each new subscriber.
* @see <a href="https://reactivex.io/documentation/operators/replay.html">ReactiveX replay operator</a>
* @see ReplayStrategies#historyBuilder(int)
* @see #replay(ReplayStrategy)
*/
public final Publisher<T> replay(int history) {
return replay(ReplayStrategies.<T>historyBuilder(history).build());
}

/**
* Similar to {@link #multicast(int)} in that multiple downstream {@link Subscriber}s are enabled on the returned
* {@link Publisher} but also retains {@code historyHint} of the most recently emitted signals
* from {@link Subscriber#onNext(Object)} which are emitted to new downstream {@link Subscriber}s before emitting
* new signals. Each item is only retained for {@code ttl} duration of time.
* @param historyHint hint for max number of signals (excluding {@link Subscriber#onComplete()} and
* {@link Subscriber#onError(Throwable)}) to retain. Due to concurrency between threads (timer, accumulation,
* subscribe) the maximum number of signals delivered to new subscribers may potentially be more but this hint
* provides a general bound for memory when concurrency subsides.
* @param ttl duration each element will be retained before being removed.
* @param executor used to enforce the {@code ttl} argument.
* @return A {@link Publisher} that allows for multiple downstream subscribers and emits the previous
* {@code historyHint} {@link Subscriber#onNext(Object)} signals to each new subscriber.
* @see <a href="https://reactivex.io/documentation/operators/replay.html">ReactiveX replay operator</a>
* @see ReplayStrategies#historyTtlBuilder(int, Duration, io.servicetalk.concurrent.Executor)
* @see ReplayStrategies#historyTtlBuilder(int, Duration, io.servicetalk.concurrent.Executor, boolean)
* @see #replay(ReplayStrategy)
*/
public final Publisher<T> replay(int historyHint, Duration ttl, io.servicetalk.concurrent.Executor executor) {
return replay(ReplayStrategies.<T>historyTtlBuilder(historyHint, ttl, executor).build());
}

/**
* Similar to {@link #multicast(int)} in that multiple downstream {@link Subscriber}s are enabled on the returned
* {@link Publisher} but will also retain some history of {@link Subscriber#onNext(Object)} signals
* according to the {@link ReplayAccumulator} {@code accumulatorSupplier}.
* @param accumulatorSupplier supplies a {@link ReplayAccumulator} on each subscribe to upstream that can retain
* history of {@link Subscriber#onNext(Object)} signals to deliver to new downstream subscribers.
* @return A {@link Publisher} that allows for multiple downstream subscribers that can retain
* history of {@link Subscriber#onNext(Object)} signals to deliver to new downstream subscribers.
* @see <a href="https://reactivex.io/documentation/operators/replay.html">ReactiveX replay operator</a>
* @see #replay(ReplayStrategy)
*/
public final Publisher<T> replay(Supplier<ReplayAccumulator<T>> accumulatorSupplier) {
return replay(new ReplayStrategyBuilder<>(accumulatorSupplier).build());
}

/**
* Similar to {@link #multicast(int)} in that multiple downstream {@link Subscriber}s are enabled on the returned
* {@link Publisher} but will also retain some history of {@link Subscriber#onNext(Object)} signals
* according to the {@link ReplayStrategy} {@code replayStrategy}.
* @param replayStrategy a {@link ReplayStrategy} that determines the replay behavior and history retention logic.
* @return A {@link Publisher} that allows for multiple downstream subscribers that can retain
* history of {@link Subscriber#onNext(Object)} signals to deliver to new downstream subscribers.
* @see <a href="https://reactivex.io/documentation/operators/replay.html">ReactiveX replay operator</a>
* @see ReplayStrategyBuilder
* @see ReplayStrategies
*/
public final Publisher<T> replay(ReplayStrategy<T> replayStrategy) {
return newReplayPublisher(this, replayStrategy.accumulatorSupplier(), replayStrategy.minSubscribers(),
replayStrategy.cancelUpstream(), replayStrategy.queueLimitHint(),
replayStrategy.terminalResubscribe());
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright © 2023 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.concurrent.api;

import io.servicetalk.concurrent.PublisherSource.Subscriber;

import java.util.function.Consumer;
import javax.annotation.Nullable;

/**
* Accumulates signals for the {@link Publisher} replay operator.
* @param <T> The type of data to accumulate.
*/
public interface ReplayAccumulator<T> {
/**
* Called on each {@link Subscriber#onNext(Object)} and intended to accumulate the signal so that new
* {@link Subscriber}s will see this value via {@link #deliverAccumulation(Consumer)}.
* <p>
* This method won't be called concurrently, but should return quickly to minimize performance impacts.
* @param t An {@link Subscriber#onNext(Object)} to accumulate.
*/
void accumulate(@Nullable T t);

/**
* Called to deliver the signals from {@link #accumulate(Object)} to new {@code consumer}.
* @param consumer The consumer of the signals previously aggregated via {@link #accumulate(Object)}.
*/
void deliverAccumulation(Consumer<T> consumer);

/**
* Called if the accumulation can be cancelled and any asynchronous resources can be cleaned up (e.g. timers).
*/
default void cancelAccumulation() {
}
}
Loading

0 comments on commit da1ea3b

Please sign in to comment.