From 758c2d00285aee81f1d85e35a9f9c06f98d0b4ba Mon Sep 17 00:00:00 2001 From: Scott Mitchell Date: Fri, 29 Sep 2023 17:23:00 -0700 Subject: [PATCH] add lazy timer impl, remove deliver limit --- .../servicetalk/concurrent/api/Publisher.java | 16 +- .../concurrent/api/ReplayStrategies.java | 144 ++++++++++++++---- .../concurrent/api/ReplayPublisherTest.java | 26 ++-- 3 files changed, 140 insertions(+), 46 deletions(-) diff --git a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Publisher.java b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Publisher.java index 6c2b511720..c59c71d7fb 100644 --- a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Publisher.java +++ b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Publisher.java @@ -3236,7 +3236,8 @@ public final Publisher multicast(int minSubscribers, int queueLimit, boolean * {@link Publisher} but also retains {@code history} of the most recently emitted signals from * {@link Subscriber#onNext(Object)} which are emitted to new downstream {@link Subscriber}s before emitting new * signals. - * @param history max number of items to retain which can be delivered to new subscribers. + * @param history max number of signals (excluding {@link Subscriber#onComplete()} and + * {@link Subscriber#onError(Throwable)}) to retain. * @return A {@link Publisher} that allows for multiple downstream subscribers and emits the previous * {@code history} {@link Subscriber#onNext(Object)} signals to each new subscriber. * @see ReactiveX replay operator @@ -3249,20 +3250,23 @@ public final Publisher replay(int history) { /** * Similar to {@link #multicast(int)} in that multiple downstream {@link Subscriber}s are enabled on the returned - * {@link Publisher} but also retains {@code history} of the most recently emitted signals + * {@link Publisher} but also retains {@code historyHint} of the most recently emitted signals * from {@link Subscriber#onNext(Object)} which are emitted to new downstream {@link Subscriber}s before emitting * new signals. Each item is only retained for {@code ttl} duration of time. - * @param history max number of items to retain which can be delivered to new subscribers. + * @param historyHint hint for max number of signals (excluding {@link Subscriber#onComplete()} and + * {@link Subscriber#onError(Throwable)}) to retain. Due to concurrency between threads (timer, accumulation, + * subscribe) the maximum number of signals delivered to new subscribers may potentially be more but this hint + * provides a general bound for memory when concurrency subsides. * @param ttl duration each element will be retained before being removed. * @param executor used to enforce the {@code ttl} argument. * @return A {@link Publisher} that allows for multiple downstream subscribers and emits the previous - * {@code history} {@link Subscriber#onNext(Object)} signals to each new subscriber. + * {@code historyHint} {@link Subscriber#onNext(Object)} signals to each new subscriber. * @see ReactiveX replay operator * @see ReplayStrategies#historyTtlBuilder(int, Duration, io.servicetalk.concurrent.Executor) * @see #replay(ReplayStrategy) */ - public final Publisher replay(int history, Duration ttl, io.servicetalk.concurrent.Executor executor) { - return replay(ReplayStrategies.historyTtlBuilder(history, ttl, executor).build()); + public final Publisher replay(int historyHint, Duration ttl, io.servicetalk.concurrent.Executor executor) { + return replay(ReplayStrategies.historyTtlBuilder(historyHint, ttl, executor).build()); } /** diff --git a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/ReplayStrategies.java b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/ReplayStrategies.java index 73810097a9..9d0da37d62 100644 --- a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/ReplayStrategies.java +++ b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/ReplayStrategies.java @@ -17,10 +17,12 @@ import io.servicetalk.concurrent.Cancellable; import io.servicetalk.concurrent.Executor; +import io.servicetalk.concurrent.PublisherSource.Subscriber; import java.time.Duration; import java.util.ArrayDeque; import java.util.Deque; +import java.util.Iterator; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; @@ -45,7 +47,8 @@ private ReplayStrategies() { /** * Create a {@link ReplayStrategyBuilder} using the history strategy. - * @param history max number of items to retain which can be delivered to new subscribers. + * @param history max number of signals (excluding {@link Subscriber#onComplete()} and + * {@link Subscriber#onError(Throwable)}) to retain. * @param The type of {@link ReplayStrategyBuilder}. * @return a {@link ReplayStrategyBuilder} using the history strategy. */ @@ -54,20 +57,49 @@ public static ReplayStrategyBuilder historyBuilder(int history) { } /** - * Create a {@link ReplayStrategyBuilder} using the history and TTL strategy. - * @param history max number of items to retain which can be delivered to new subscribers. + * Create a {@link ReplayStrategyBuilder} using the historyHint and TTL strategy. + * @param historyHint hint for max number of signals (excluding {@link Subscriber#onComplete()} and + * {@link Subscriber#onError(Throwable)}) to retain. Due to concurrency between threads (timer, accumulation, + * subscribe) the maximum number of signals delivered to new subscribers may potentially be more but this hint + * provides a general bound for memory when concurrency subsides. * @param ttl duration each element will be retained before being removed. * @param executor used to enforce the {@code ttl} argument. * @param The type of {@link ReplayStrategyBuilder}. - * @return a {@link ReplayStrategyBuilder} using the history and TTL strategy. + * @return a {@link ReplayStrategyBuilder} using the historyHint and TTL strategy. */ - public static ReplayStrategyBuilder historyTtlBuilder(int history, Duration ttl, Executor executor) { - return new ReplayStrategyBuilder<>(() -> new MostRecentTimeLimitedReplayAccumulator<>(history, ttl, executor)); + public static ReplayStrategyBuilder historyTtlBuilder(int historyHint, Duration ttl, Executor executor) { + return historyTtlBuilder(historyHint, ttl, executor, false); + } + + /** + * Create a {@link ReplayStrategyBuilder} using the historyHint and TTL strategy. + * @param historyHint hint for max number of signals (excluding {@link Subscriber#onComplete()} and + * {@link Subscriber#onError(Throwable)}) to retain. Due to concurrency between threads (timer, accumulation, + * subscribe) the maximum number of signals delivered to new subscribers may potentially be more but this hint + * provides a general bound for memory when concurrency subsides. + * @param ttl duration each element will be retained before being removed. + * @param executor used to enforce the {@code ttl} argument. + * @param lazy + *
    + *
  • {@code true} will evict expired items in a lazy fashion when new subscribers arrive. This approach + * is more likely to retain {@code historyHint} elements in memory in steady state, but avoids cost of + * scheduling timer tasks.
  • + *
  • {@code false} will evict expired items eagerly when they expire. If {@code ttl} is lower that + * {@code historyHint} relative to signal arrival rate this can use less memory but schedules time tasks.
  • + *
+ * @param The type of {@link ReplayStrategyBuilder}. + * @return a {@link ReplayStrategyBuilder} using the historyHint and TTL strategy. + */ + public static ReplayStrategyBuilder historyTtlBuilder(int historyHint, Duration ttl, Executor executor, + boolean lazy) { + return new ReplayStrategyBuilder<>(lazy ? + () -> new LazyTimeLimitedReplayAccumulator<>(historyHint, ttl, executor) : + () -> new EagerTimeLimitedReplayAccumulator<>(historyHint, ttl, executor)); } private static final class MostRecentReplayAccumulator implements ReplayAccumulator { private final int maxItems; - private final Deque list = new ArrayDeque<>(); + private final Deque queue = new ArrayDeque<>(); MostRecentReplayAccumulator(final int maxItems) { if (maxItems <= 0) { @@ -78,32 +110,89 @@ private static final class MostRecentReplayAccumulator implements ReplayAccum @Override public void accumulate(@Nullable final T t) { - if (list.size() >= maxItems) { - list.pop(); + if (queue.size() >= maxItems) { + queue.poll(); } - list.add(wrapNull(t)); + queue.add(wrapNull(t)); } @Override public void deliverAccumulation(final Consumer consumer) { - for (Object item : list) { + for (Object item : queue) { consumer.accept(unwrapNullUnchecked(item)); } } } - private static final class MostRecentTimeLimitedReplayAccumulator implements ReplayAccumulator { + private static final class LazyTimeLimitedReplayAccumulator implements ReplayAccumulator { + @SuppressWarnings("rawtypes") + private static final AtomicIntegerFieldUpdater queueSizeUpdater = + AtomicIntegerFieldUpdater.newUpdater(LazyTimeLimitedReplayAccumulator.class, "queueSize"); + private final Executor executor; + private final Queue> items; + private final long ttlNanos; + private final int maxItems; + private volatile int queueSize; + + LazyTimeLimitedReplayAccumulator(final int maxItems, final Duration ttl, final Executor executor) { + if (ttl.isNegative()) { + throw new IllegalArgumentException("ttl: " + ttl + "(expected non-negative)"); + } + if (maxItems <= 0) { + throw new IllegalArgumentException("maxItems: " + maxItems + "(expected >0)"); + } + this.executor = requireNonNull(executor); + this.ttlNanos = ttl.toNanos(); + this.maxItems = maxItems; + items = new ConcurrentLinkedQueue<>(); // SpMc + } + + @Override + public void accumulate(@Nullable final T t) { + final TimeStampSignal signal = new TimeStampSignal<>(executor.currentTime(NANOSECONDS), t); + for (;;) { + final int qSize = queueSize; + if (qSize < maxItems) { + if (queueSizeUpdater.compareAndSet(this, qSize, qSize + 1)) { + items.add(signal); + break; + } + } else if (queueSizeUpdater.compareAndSet(this, qSize, qSize)) { + items.poll(); + items.add(signal); + break; + } + } + } + + @Override + public void deliverAccumulation(final Consumer consumer) { + final Iterator> itr = items.iterator(); + final long nanoTime = executor.currentTime(NANOSECONDS); + while (itr.hasNext()) { + final TimeStampSignal next = itr.next(); + if (nanoTime - next.timeStamp >= ttlNanos) { + queueSizeUpdater.decrementAndGet(this); + itr.remove(); + } else { + consumer.accept(next.signal); + } + } + } + } + + private static final class EagerTimeLimitedReplayAccumulator implements ReplayAccumulator { private static final Cancellable CANCELLED = () -> { }; @SuppressWarnings("rawtypes") - private static final AtomicReferenceFieldUpdater - timerCancellableUpdater = newUpdater(MostRecentTimeLimitedReplayAccumulator.class, Cancellable.class, + private static final AtomicReferenceFieldUpdater + timerCancellableUpdater = newUpdater(EagerTimeLimitedReplayAccumulator.class, Cancellable.class, "timerCancellable"); @SuppressWarnings("rawtypes") - private static final AtomicIntegerFieldUpdater queueLockUpdater = - AtomicIntegerFieldUpdater.newUpdater(MostRecentTimeLimitedReplayAccumulator.class, "queueLock"); + private static final AtomicIntegerFieldUpdater queueLockUpdater = + AtomicIntegerFieldUpdater.newUpdater(EagerTimeLimitedReplayAccumulator.class, "queueLock"); @SuppressWarnings("rawtypes") - private static final AtomicIntegerFieldUpdater queueSizeUpdater = - AtomicIntegerFieldUpdater.newUpdater(MostRecentTimeLimitedReplayAccumulator.class, "queueSize"); + private static final AtomicIntegerFieldUpdater queueSizeUpdater = + AtomicIntegerFieldUpdater.newUpdater(EagerTimeLimitedReplayAccumulator.class, "queueSize"); private final Executor executor; private final Queue> items; private final long ttlNanos; @@ -114,7 +203,7 @@ private static final class MostRecentTimeLimitedReplayAccumulator implements @Nullable private volatile Cancellable timerCancellable; - MostRecentTimeLimitedReplayAccumulator(final int maxItems, final Duration ttl, final Executor executor) { + EagerTimeLimitedReplayAccumulator(final int maxItems, final Duration ttl, final Executor executor) { if (ttl.isNegative()) { throw new IllegalArgumentException("ttl: " + ttl + "(expected non-negative)"); } @@ -124,9 +213,11 @@ private static final class MostRecentTimeLimitedReplayAccumulator implements this.executor = requireNonNull(executor); this.ttlNanos = ttl.toNanos(); this.maxItems = maxItems; - // SpSc, but needs iterator. - // accumulate is called on one thread (no concurrent access on this method). - // timerFire maybe called on another thread + // SpMc + // producer = accumulate (no concurrent access on this method) + // consumer = accumulate (may poll from queue due to capacity) + // consumer = timerFire (removal via poll) + // consumer = deliverAccumulation (iterator over collection) items = new ConcurrentLinkedQueue<>(); } @@ -169,15 +260,8 @@ public void accumulate(@Nullable final T t) { @Override public void deliverAccumulation(final Consumer consumer) { - int i = 0; for (TimeStampSignal timeStampSignal : items) { consumer.accept(timeStampSignal.signal); - // The queue size maybe larger than maxItems if we weren't able to acquire the queueLock while adding. - // This is only a temporary condition while there is concurrent access between timer and accumulator. - // Guard against it here to preserve the invariant that we shouldn't deliver more than maxItems. - if (++i >= maxItems) { - break; - } } } @@ -252,7 +336,7 @@ private long doExpire() { } else { // elements sorted in increasing time, break when first non-expired entry found. // delta maybe negative if ttlNanos is small and this method sees newly added items while looping. - return delta <= 0 ? ttlNanos : ttlNanos - (nanoTime - item.timeStamp); + return delta <= 0 ? ttlNanos : ttlNanos - delta; } } } diff --git a/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/ReplayPublisherTest.java b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/ReplayPublisherTest.java index e6285ecd58..d84fc8b062 100644 --- a/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/ReplayPublisherTest.java +++ b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/ReplayPublisherTest.java @@ -32,6 +32,7 @@ import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; import java.util.function.Function; import javax.annotation.Nullable; @@ -41,6 +42,7 @@ import static java.time.Duration.ofMillis; import static java.time.Duration.ofNanos; import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.NANOSECONDS; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.equalTo; @@ -179,11 +181,12 @@ void threeSubscribersSum(boolean onError) { threeSubscribersTerminate(onError); } - @ParameterizedTest - @ValueSource(booleans = {true, false}) - void threeSubscribersTTL(boolean onError) { + @ParameterizedTest(name = "{displayName} [{index}] onError={0} lazy={1}") + @CsvSource(value = {"true,true", "true,false", "false,true", "false,false"}) + void threeSubscribersTTL(boolean onError, boolean lazy) { final Duration ttl = ofMillis(2); - Publisher publisher = source.replay(2, ttl, executor); + Publisher publisher = source.replay( + ReplayStrategies.historyTtlBuilder(2, ttl, executor, lazy).build()); toSource(publisher).subscribe(subscriber1); subscriber1.awaitSubscription().request(4); assertThat(subscription.requested(), is(4L)); @@ -217,15 +220,15 @@ void threeSubscribersTTL(boolean onError) { threeSubscribersTerminate(onError); } - @ParameterizedTest - @ValueSource(booleans = {true, false}) - void concurrentTTL(boolean onError) throws Exception { + @ParameterizedTest(name = "{displayName} [{index}] onError={0} lazy={1}") + @CsvSource(value = {"true,true", "true,false", "false,true", "false,false"}) + void concurrentTTL(boolean onError, boolean lazy) throws Exception { final Duration ttl = ofNanos(1); final int queueLimit = Integer.MAX_VALUE; Executor executor2 = Executors.newCachedThreadExecutor(); ScheduleQueueExecutor queueExecutor = new ScheduleQueueExecutor(executor2); Publisher publisher = source.replay( - ReplayStrategies.historyTtlBuilder(2, ttl, queueExecutor) + ReplayStrategies.historyTtlBuilder(2, ttl, queueExecutor, lazy) .queueLimitHint(queueLimit).build()); try { toSource(publisher).subscribe(subscriber1); @@ -401,6 +404,7 @@ public void cancel() { private static final class ScheduleQueueExecutor implements io.servicetalk.concurrent.Executor { private final io.servicetalk.concurrent.Executor executor; private final AtomicBoolean enableScheduleQueue = new AtomicBoolean(); + private final AtomicLong queueTime = new AtomicLong(); private final Queue scheduleQueue = new ConcurrentLinkedQueue<>(); private ScheduleQueueExecutor(final io.servicetalk.concurrent.Executor executor) { @@ -408,7 +412,9 @@ private ScheduleQueueExecutor(final io.servicetalk.concurrent.Executor executor) } void enableScheduleQueue() { - enableScheduleQueue.set(true); + if (enableScheduleQueue.compareAndSet(false, true)) { + queueTime.set(executor.currentTime(NANOSECONDS)); + } } void drainScheduleQueue() { @@ -424,7 +430,7 @@ void drainScheduleQueue() { @Override public long currentTime(final TimeUnit unit) { - return executor.currentTime(unit); + return enableScheduleQueue.get() ? NANOSECONDS.convert(queueTime.get(), unit) : executor.currentTime(unit); } @Override