Skip to content

Commit

Permalink
add lazy timer impl, remove deliver limit
Browse files Browse the repository at this point in the history
  • Loading branch information
Scottmitch committed Sep 30, 2023
1 parent bcad51d commit 758c2d0
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3236,7 +3236,8 @@ public final Publisher<T> multicast(int minSubscribers, int queueLimit, boolean
* {@link Publisher} but also retains {@code history} of the most recently emitted signals from
* {@link Subscriber#onNext(Object)} which are emitted to new downstream {@link Subscriber}s before emitting new
* signals.
* @param history max number of items to retain which can be delivered to new subscribers.
* @param history max number of signals (excluding {@link Subscriber#onComplete()} and
* {@link Subscriber#onError(Throwable)}) to retain.
* @return A {@link Publisher} that allows for multiple downstream subscribers and emits the previous
* {@code history} {@link Subscriber#onNext(Object)} signals to each new subscriber.
* @see <a href="https://reactivex.io/documentation/operators/replay.html">ReactiveX replay operator</a>
Expand All @@ -3249,20 +3250,23 @@ public final Publisher<T> replay(int history) {

/**
* Similar to {@link #multicast(int)} in that multiple downstream {@link Subscriber}s are enabled on the returned
* {@link Publisher} but also retains {@code history} of the most recently emitted signals
* {@link Publisher} but also retains {@code historyHint} of the most recently emitted signals
* from {@link Subscriber#onNext(Object)} which are emitted to new downstream {@link Subscriber}s before emitting
* new signals. Each item is only retained for {@code ttl} duration of time.
* @param history max number of items to retain which can be delivered to new subscribers.
* @param historyHint hint for max number of signals (excluding {@link Subscriber#onComplete()} and
* {@link Subscriber#onError(Throwable)}) to retain. Due to concurrency between threads (timer, accumulation,
* subscribe) the maximum number of signals delivered to new subscribers may potentially be more but this hint
* provides a general bound for memory when concurrency subsides.
* @param ttl duration each element will be retained before being removed.
* @param executor used to enforce the {@code ttl} argument.
* @return A {@link Publisher} that allows for multiple downstream subscribers and emits the previous
* {@code history} {@link Subscriber#onNext(Object)} signals to each new subscriber.
* {@code historyHint} {@link Subscriber#onNext(Object)} signals to each new subscriber.
* @see <a href="https://reactivex.io/documentation/operators/replay.html">ReactiveX replay operator</a>
* @see ReplayStrategies#historyTtlBuilder(int, Duration, io.servicetalk.concurrent.Executor)
* @see #replay(ReplayStrategy)
*/
public final Publisher<T> replay(int history, Duration ttl, io.servicetalk.concurrent.Executor executor) {
return replay(ReplayStrategies.<T>historyTtlBuilder(history, ttl, executor).build());
public final Publisher<T> replay(int historyHint, Duration ttl, io.servicetalk.concurrent.Executor executor) {
return replay(ReplayStrategies.<T>historyTtlBuilder(historyHint, ttl, executor).build());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@

import io.servicetalk.concurrent.Cancellable;
import io.servicetalk.concurrent.Executor;
import io.servicetalk.concurrent.PublisherSource.Subscriber;

import java.time.Duration;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
Expand All @@ -45,7 +47,8 @@ private ReplayStrategies() {

/**
* Create a {@link ReplayStrategyBuilder} using the history strategy.
* @param history max number of items to retain which can be delivered to new subscribers.
* @param history max number of signals (excluding {@link Subscriber#onComplete()} and
* {@link Subscriber#onError(Throwable)}) to retain.
* @param <T> The type of {@link ReplayStrategyBuilder}.
* @return a {@link ReplayStrategyBuilder} using the history strategy.
*/
Expand All @@ -54,20 +57,49 @@ public static <T> ReplayStrategyBuilder<T> historyBuilder(int history) {
}

/**
* Create a {@link ReplayStrategyBuilder} using the history and TTL strategy.
* @param history max number of items to retain which can be delivered to new subscribers.
* Create a {@link ReplayStrategyBuilder} using the historyHint and TTL strategy.
* @param historyHint hint for max number of signals (excluding {@link Subscriber#onComplete()} and
* {@link Subscriber#onError(Throwable)}) to retain. Due to concurrency between threads (timer, accumulation,
* subscribe) the maximum number of signals delivered to new subscribers may potentially be more but this hint
* provides a general bound for memory when concurrency subsides.
* @param ttl duration each element will be retained before being removed.
* @param executor used to enforce the {@code ttl} argument.
* @param <T> The type of {@link ReplayStrategyBuilder}.
* @return a {@link ReplayStrategyBuilder} using the history and TTL strategy.
* @return a {@link ReplayStrategyBuilder} using the historyHint and TTL strategy.
*/
public static <T> ReplayStrategyBuilder<T> historyTtlBuilder(int history, Duration ttl, Executor executor) {
return new ReplayStrategyBuilder<>(() -> new MostRecentTimeLimitedReplayAccumulator<>(history, ttl, executor));
public static <T> ReplayStrategyBuilder<T> historyTtlBuilder(int historyHint, Duration ttl, Executor executor) {
return historyTtlBuilder(historyHint, ttl, executor, false);
}

/**
* Create a {@link ReplayStrategyBuilder} using the historyHint and TTL strategy.
* @param historyHint hint for max number of signals (excluding {@link Subscriber#onComplete()} and
* {@link Subscriber#onError(Throwable)}) to retain. Due to concurrency between threads (timer, accumulation,
* subscribe) the maximum number of signals delivered to new subscribers may potentially be more but this hint
* provides a general bound for memory when concurrency subsides.
* @param ttl duration each element will be retained before being removed.
* @param executor used to enforce the {@code ttl} argument.
* @param lazy
* <ul>
* <li>{@code true} will evict expired items in a lazy fashion when new subscribers arrive. This approach
* is more likely to retain {@code historyHint} elements in memory in steady state, but avoids cost of
* scheduling timer tasks.</li>
* <li>{@code false} will evict expired items eagerly when they expire. If {@code ttl} is lower that
* {@code historyHint} relative to signal arrival rate this can use less memory but schedules time tasks.</li>
* </ul>
* @param <T> The type of {@link ReplayStrategyBuilder}.
* @return a {@link ReplayStrategyBuilder} using the historyHint and TTL strategy.
*/
public static <T> ReplayStrategyBuilder<T> historyTtlBuilder(int historyHint, Duration ttl, Executor executor,
boolean lazy) {
return new ReplayStrategyBuilder<>(lazy ?
() -> new LazyTimeLimitedReplayAccumulator<>(historyHint, ttl, executor) :
() -> new EagerTimeLimitedReplayAccumulator<>(historyHint, ttl, executor));
}

private static final class MostRecentReplayAccumulator<T> implements ReplayAccumulator<T> {
private final int maxItems;
private final Deque<Object> list = new ArrayDeque<>();
private final Deque<Object> queue = new ArrayDeque<>();

MostRecentReplayAccumulator(final int maxItems) {
if (maxItems <= 0) {
Expand All @@ -78,32 +110,89 @@ private static final class MostRecentReplayAccumulator<T> implements ReplayAccum

@Override
public void accumulate(@Nullable final T t) {
if (list.size() >= maxItems) {
list.pop();
if (queue.size() >= maxItems) {
queue.poll();
}
list.add(wrapNull(t));
queue.add(wrapNull(t));
}

@Override
public void deliverAccumulation(final Consumer<T> consumer) {
for (Object item : list) {
for (Object item : queue) {
consumer.accept(unwrapNullUnchecked(item));
}
}
}

private static final class MostRecentTimeLimitedReplayAccumulator<T> implements ReplayAccumulator<T> {
private static final class LazyTimeLimitedReplayAccumulator<T> implements ReplayAccumulator<T> {
@SuppressWarnings("rawtypes")
private static final AtomicIntegerFieldUpdater<LazyTimeLimitedReplayAccumulator> queueSizeUpdater =
AtomicIntegerFieldUpdater.newUpdater(LazyTimeLimitedReplayAccumulator.class, "queueSize");
private final Executor executor;
private final Queue<TimeStampSignal<T>> items;
private final long ttlNanos;
private final int maxItems;
private volatile int queueSize;

LazyTimeLimitedReplayAccumulator(final int maxItems, final Duration ttl, final Executor executor) {
if (ttl.isNegative()) {
throw new IllegalArgumentException("ttl: " + ttl + "(expected non-negative)");
}
if (maxItems <= 0) {
throw new IllegalArgumentException("maxItems: " + maxItems + "(expected >0)");
}
this.executor = requireNonNull(executor);
this.ttlNanos = ttl.toNanos();
this.maxItems = maxItems;
items = new ConcurrentLinkedQueue<>(); // SpMc
}

@Override
public void accumulate(@Nullable final T t) {
final TimeStampSignal<T> signal = new TimeStampSignal<>(executor.currentTime(NANOSECONDS), t);
for (;;) {
final int qSize = queueSize;
if (qSize < maxItems) {
if (queueSizeUpdater.compareAndSet(this, qSize, qSize + 1)) {
items.add(signal);
break;
}
} else if (queueSizeUpdater.compareAndSet(this, qSize, qSize)) {
items.poll();
items.add(signal);
break;
}
}
}

@Override
public void deliverAccumulation(final Consumer<T> consumer) {
final Iterator<TimeStampSignal<T>> itr = items.iterator();
final long nanoTime = executor.currentTime(NANOSECONDS);
while (itr.hasNext()) {
final TimeStampSignal<T> next = itr.next();
if (nanoTime - next.timeStamp >= ttlNanos) {
queueSizeUpdater.decrementAndGet(this);
itr.remove();
} else {
consumer.accept(next.signal);
}
}
}
}

private static final class EagerTimeLimitedReplayAccumulator<T> implements ReplayAccumulator<T> {
private static final Cancellable CANCELLED = () -> { };
@SuppressWarnings("rawtypes")
private static final AtomicReferenceFieldUpdater<MostRecentTimeLimitedReplayAccumulator, Cancellable>
timerCancellableUpdater = newUpdater(MostRecentTimeLimitedReplayAccumulator.class, Cancellable.class,
private static final AtomicReferenceFieldUpdater<EagerTimeLimitedReplayAccumulator, Cancellable>
timerCancellableUpdater = newUpdater(EagerTimeLimitedReplayAccumulator.class, Cancellable.class,
"timerCancellable");
@SuppressWarnings("rawtypes")
private static final AtomicIntegerFieldUpdater<MostRecentTimeLimitedReplayAccumulator> queueLockUpdater =
AtomicIntegerFieldUpdater.newUpdater(MostRecentTimeLimitedReplayAccumulator.class, "queueLock");
private static final AtomicIntegerFieldUpdater<EagerTimeLimitedReplayAccumulator> queueLockUpdater =
AtomicIntegerFieldUpdater.newUpdater(EagerTimeLimitedReplayAccumulator.class, "queueLock");
@SuppressWarnings("rawtypes")
private static final AtomicIntegerFieldUpdater<MostRecentTimeLimitedReplayAccumulator> queueSizeUpdater =
AtomicIntegerFieldUpdater.newUpdater(MostRecentTimeLimitedReplayAccumulator.class, "queueSize");
private static final AtomicIntegerFieldUpdater<EagerTimeLimitedReplayAccumulator> queueSizeUpdater =
AtomicIntegerFieldUpdater.newUpdater(EagerTimeLimitedReplayAccumulator.class, "queueSize");
private final Executor executor;
private final Queue<TimeStampSignal<T>> items;
private final long ttlNanos;
Expand All @@ -114,7 +203,7 @@ private static final class MostRecentTimeLimitedReplayAccumulator<T> implements
@Nullable
private volatile Cancellable timerCancellable;

MostRecentTimeLimitedReplayAccumulator(final int maxItems, final Duration ttl, final Executor executor) {
EagerTimeLimitedReplayAccumulator(final int maxItems, final Duration ttl, final Executor executor) {
if (ttl.isNegative()) {
throw new IllegalArgumentException("ttl: " + ttl + "(expected non-negative)");
}
Expand All @@ -124,9 +213,11 @@ private static final class MostRecentTimeLimitedReplayAccumulator<T> implements
this.executor = requireNonNull(executor);
this.ttlNanos = ttl.toNanos();
this.maxItems = maxItems;
// SpSc, but needs iterator.
// accumulate is called on one thread (no concurrent access on this method).
// timerFire maybe called on another thread
// SpMc
// producer = accumulate (no concurrent access on this method)
// consumer = accumulate (may poll from queue due to capacity)
// consumer = timerFire (removal via poll)
// consumer = deliverAccumulation (iterator over collection)
items = new ConcurrentLinkedQueue<>();
}

Expand Down Expand Up @@ -169,15 +260,8 @@ public void accumulate(@Nullable final T t) {

@Override
public void deliverAccumulation(final Consumer<T> consumer) {
int i = 0;
for (TimeStampSignal<T> timeStampSignal : items) {
consumer.accept(timeStampSignal.signal);
// The queue size maybe larger than maxItems if we weren't able to acquire the queueLock while adding.
// This is only a temporary condition while there is concurrent access between timer and accumulator.
// Guard against it here to preserve the invariant that we shouldn't deliver more than maxItems.
if (++i >= maxItems) {
break;
}
}
}

Expand Down Expand Up @@ -252,7 +336,7 @@ private long doExpire() {
} else {
// elements sorted in increasing time, break when first non-expired entry found.
// delta maybe negative if ttlNanos is small and this method sees newly added items while looping.
return delta <= 0 ? ttlNanos : ttlNanos - (nanoTime - item.timeStamp);
return delta <= 0 ? ttlNanos : ttlNanos - delta;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Function;
import javax.annotation.Nullable;
Expand All @@ -41,6 +42,7 @@
import static java.time.Duration.ofMillis;
import static java.time.Duration.ofNanos;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -179,11 +181,12 @@ void threeSubscribersSum(boolean onError) {
threeSubscribersTerminate(onError);
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
void threeSubscribersTTL(boolean onError) {
@ParameterizedTest(name = "{displayName} [{index}] onError={0} lazy={1}")
@CsvSource(value = {"true,true", "true,false", "false,true", "false,false"})
void threeSubscribersTTL(boolean onError, boolean lazy) {
final Duration ttl = ofMillis(2);
Publisher<Integer> publisher = source.replay(2, ttl, executor);
Publisher<Integer> publisher = source.replay(
ReplayStrategies.<Integer>historyTtlBuilder(2, ttl, executor, lazy).build());
toSource(publisher).subscribe(subscriber1);
subscriber1.awaitSubscription().request(4);
assertThat(subscription.requested(), is(4L));
Expand Down Expand Up @@ -217,15 +220,15 @@ void threeSubscribersTTL(boolean onError) {
threeSubscribersTerminate(onError);
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
void concurrentTTL(boolean onError) throws Exception {
@ParameterizedTest(name = "{displayName} [{index}] onError={0} lazy={1}")
@CsvSource(value = {"true,true", "true,false", "false,true", "false,false"})
void concurrentTTL(boolean onError, boolean lazy) throws Exception {
final Duration ttl = ofNanos(1);
final int queueLimit = Integer.MAX_VALUE;
Executor executor2 = Executors.newCachedThreadExecutor();
ScheduleQueueExecutor queueExecutor = new ScheduleQueueExecutor(executor2);
Publisher<Integer> publisher = source.replay(
ReplayStrategies.<Integer>historyTtlBuilder(2, ttl, queueExecutor)
ReplayStrategies.<Integer>historyTtlBuilder(2, ttl, queueExecutor, lazy)
.queueLimitHint(queueLimit).build());
try {
toSource(publisher).subscribe(subscriber1);
Expand Down Expand Up @@ -401,14 +404,17 @@ public void cancel() {
private static final class ScheduleQueueExecutor implements io.servicetalk.concurrent.Executor {
private final io.servicetalk.concurrent.Executor executor;
private final AtomicBoolean enableScheduleQueue = new AtomicBoolean();
private final AtomicLong queueTime = new AtomicLong();
private final Queue<ScheduleHolder> scheduleQueue = new ConcurrentLinkedQueue<>();

private ScheduleQueueExecutor(final io.servicetalk.concurrent.Executor executor) {
this.executor = executor;
}

void enableScheduleQueue() {
enableScheduleQueue.set(true);
if (enableScheduleQueue.compareAndSet(false, true)) {
queueTime.set(executor.currentTime(NANOSECONDS));
}
}

void drainScheduleQueue() {
Expand All @@ -424,7 +430,7 @@ void drainScheduleQueue() {

@Override
public long currentTime(final TimeUnit unit) {
return executor.currentTime(unit);
return enableScheduleQueue.get() ? NANOSECONDS.convert(queueTime.get(), unit) : executor.currentTime(unit);
}

@Override
Expand Down

0 comments on commit 758c2d0

Please sign in to comment.