Skip to content

Commit

Permalink
add lazy timer impl, remove deliver limit
Browse files Browse the repository at this point in the history
  • Loading branch information
Scottmitch committed Sep 30, 2023
1 parent 1963359 commit 3d6cc9a
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
Expand Down Expand Up @@ -62,7 +63,23 @@ public static <T> ReplayStrategyBuilder<T> historyBuilder(int history) {
* @return a {@link ReplayStrategyBuilder} using the history and TTL strategy.
*/
public static <T> ReplayStrategyBuilder<T> historyTtlBuilder(int history, Duration ttl, Executor executor) {
return new ReplayStrategyBuilder<>(() -> new MostRecentTimeLimitedReplayAccumulator<>(history, ttl, executor));
return historyTtlBuilder(history, ttl, executor, false);
}

/**
* Create a {@link ReplayStrategyBuilder} using the history and TTL strategy.
* @param history max number of items to retain which can be delivered to new subscribers.
* @param ttl duration each element will be retained before being removed.
* @param executor used to enforce the {@code ttl} argument.
* @param lazy {@code true} will evict expired items in a lazy fashion when new subscribers arrive. {@code false}
* will evict expired items eagerly when they expire.
* @param <T> The type of {@link ReplayStrategyBuilder}.
* @return a {@link ReplayStrategyBuilder} using the history and TTL strategy.
*/
static <T> ReplayStrategyBuilder<T> historyTtlBuilder(int history, Duration ttl, Executor executor, boolean lazy) {
return new ReplayStrategyBuilder<>(lazy ?
() -> new LazyTimeLimitedReplayAccumulator<>(history, ttl, executor) :
() -> new EagerTimeLimitedReplayAccumulator<>(history, ttl, executor));
}

private static final class MostRecentReplayAccumulator<T> implements ReplayAccumulator<T> {
Expand Down Expand Up @@ -92,18 +109,75 @@ public void deliverAccumulation(final Consumer<T> consumer) {
}
}

private static final class MostRecentTimeLimitedReplayAccumulator<T> implements ReplayAccumulator<T> {
private static final class LazyTimeLimitedReplayAccumulator<T> implements ReplayAccumulator<T> {
@SuppressWarnings("rawtypes")
private static final AtomicIntegerFieldUpdater<LazyTimeLimitedReplayAccumulator> queueSizeUpdater =
AtomicIntegerFieldUpdater.newUpdater(LazyTimeLimitedReplayAccumulator.class, "queueSize");
private final Executor executor;
private final Queue<TimeStampSignal<T>> items;
private final long ttlNanos;
private final int maxItems;
private volatile int queueSize;

LazyTimeLimitedReplayAccumulator(final int maxItems, final Duration ttl, final Executor executor) {
if (ttl.isNegative()) {
throw new IllegalArgumentException("ttl: " + ttl + "(expected non-negative)");
}
if (maxItems <= 0) {
throw new IllegalArgumentException("maxItems: " + maxItems + "(expected >0)");
}
this.executor = requireNonNull(executor);
this.ttlNanos = ttl.toNanos();
this.maxItems = maxItems;
items = new ConcurrentLinkedQueue<>(); // SpMc
}

@Override
public void accumulate(@Nullable final T t) {
final TimeStampSignal<T> signal = new TimeStampSignal<>(executor.currentTime(NANOSECONDS), t);
for (;;) {
final int qSize = queueSize;
if (qSize < maxItems) {
if (queueSizeUpdater.compareAndSet(this, qSize, qSize + 1)) {
items.add(signal);
break;
}
} else if (queueSizeUpdater.compareAndSet(this, qSize, qSize)) {
items.poll();
items.add(signal);
break;
}
}
}

@Override
public void deliverAccumulation(final Consumer<T> consumer) {
final Iterator<TimeStampSignal<T>> itr = items.iterator();
final long nanoTime = executor.currentTime(NANOSECONDS);
while (itr.hasNext()) {
final TimeStampSignal<T> next = itr.next();
if (nanoTime - next.timeStamp >= ttlNanos) {
queueSizeUpdater.decrementAndGet(this);
itr.remove();
} else {
consumer.accept(next.signal);
}
}
}
}

private static final class EagerTimeLimitedReplayAccumulator<T> implements ReplayAccumulator<T> {
private static final Cancellable CANCELLED = () -> { };
@SuppressWarnings("rawtypes")
private static final AtomicReferenceFieldUpdater<MostRecentTimeLimitedReplayAccumulator, Cancellable>
timerCancellableUpdater = newUpdater(MostRecentTimeLimitedReplayAccumulator.class, Cancellable.class,
private static final AtomicReferenceFieldUpdater<EagerTimeLimitedReplayAccumulator, Cancellable>
timerCancellableUpdater = newUpdater(EagerTimeLimitedReplayAccumulator.class, Cancellable.class,
"timerCancellable");
@SuppressWarnings("rawtypes")
private static final AtomicIntegerFieldUpdater<MostRecentTimeLimitedReplayAccumulator> queueLockUpdater =
AtomicIntegerFieldUpdater.newUpdater(MostRecentTimeLimitedReplayAccumulator.class, "queueLock");
private static final AtomicIntegerFieldUpdater<EagerTimeLimitedReplayAccumulator> queueLockUpdater =
AtomicIntegerFieldUpdater.newUpdater(EagerTimeLimitedReplayAccumulator.class, "queueLock");
@SuppressWarnings("rawtypes")
private static final AtomicIntegerFieldUpdater<MostRecentTimeLimitedReplayAccumulator> queueSizeUpdater =
AtomicIntegerFieldUpdater.newUpdater(MostRecentTimeLimitedReplayAccumulator.class, "queueSize");
private static final AtomicIntegerFieldUpdater<EagerTimeLimitedReplayAccumulator> queueSizeUpdater =
AtomicIntegerFieldUpdater.newUpdater(EagerTimeLimitedReplayAccumulator.class, "queueSize");
private final Executor executor;
private final Queue<TimeStampSignal<T>> items;
private final long ttlNanos;
Expand All @@ -114,7 +188,7 @@ private static final class MostRecentTimeLimitedReplayAccumulator<T> implements
@Nullable
private volatile Cancellable timerCancellable;

MostRecentTimeLimitedReplayAccumulator(final int maxItems, final Duration ttl, final Executor executor) {
EagerTimeLimitedReplayAccumulator(final int maxItems, final Duration ttl, final Executor executor) {
if (ttl.isNegative()) {
throw new IllegalArgumentException("ttl: " + ttl + "(expected non-negative)");
}
Expand All @@ -124,9 +198,11 @@ private static final class MostRecentTimeLimitedReplayAccumulator<T> implements
this.executor = requireNonNull(executor);
this.ttlNanos = ttl.toNanos();
this.maxItems = maxItems;
// SpSc, but needs iterator.
// accumulate is called on one thread (no concurrent access on this method).
// timerFire maybe called on another thread
// SpMc
// producer = accumulate (no concurrent access on this method)
// consumer = accumulate (may poll from queue due to capacity)
// consumer = timerFire (removal via poll)
// consumer = deliverAccumulation (iterator over collection)
items = new ConcurrentLinkedQueue<>();
}

Expand Down Expand Up @@ -169,15 +245,8 @@ public void accumulate(@Nullable final T t) {

@Override
public void deliverAccumulation(final Consumer<T> consumer) {
int i = 0;
for (TimeStampSignal<T> timeStampSignal : items) {
consumer.accept(timeStampSignal.signal);
// The queue size maybe larger than maxItems if we weren't able to acquire the queueLock while adding.
// This is only a temporary condition while there is concurrent access between timer and accumulator.
// Guard against it here to preserve the invariant that we shouldn't deliver more than maxItems.
if (++i >= maxItems) {
break;
}
}
}

Expand Down Expand Up @@ -252,7 +321,7 @@ private long doExpire() {
} else {
// elements sorted in increasing time, break when first non-expired entry found.
// delta maybe negative if ttlNanos is small and this method sees newly added items while looping.
return delta <= 0 ? ttlNanos : ttlNanos - (nanoTime - item.timeStamp);
return delta <= 0 ? ttlNanos : ttlNanos - delta;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Function;
import javax.annotation.Nullable;
Expand All @@ -41,6 +42,7 @@
import static java.time.Duration.ofMillis;
import static java.time.Duration.ofNanos;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -179,11 +181,12 @@ void threeSubscribersSum(boolean onError) {
threeSubscribersTerminate(onError);
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
void threeSubscribersTTL(boolean onError) {
@ParameterizedTest(name = "{displayName} [{index}] onError={0} lazy={1}")
@CsvSource(value = {"true,true", "true,false", "false,true", "false,false"})
void threeSubscribersTTL(boolean onError, boolean lazy) {
final Duration ttl = ofMillis(2);
Publisher<Integer> publisher = source.replay(2, ttl, executor);
Publisher<Integer> publisher = source.replay(
ReplayStrategies.<Integer>historyTtlBuilder(2, ttl, executor, lazy).build());
toSource(publisher).subscribe(subscriber1);
subscriber1.awaitSubscription().request(4);
assertThat(subscription.requested(), is(4L));
Expand Down Expand Up @@ -217,15 +220,15 @@ void threeSubscribersTTL(boolean onError) {
threeSubscribersTerminate(onError);
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
void concurrentTTL(boolean onError) throws Exception {
@ParameterizedTest(name = "{displayName} [{index}] onError={0} lazy={1}")
@CsvSource(value = {"true,true", "true,false", "false,true", "false,false"})
void concurrentTTL(boolean onError, boolean lazy) throws Exception {
final Duration ttl = ofNanos(1);
final int queueLimit = Integer.MAX_VALUE;
Executor executor2 = Executors.newCachedThreadExecutor();
ScheduleQueueExecutor queueExecutor = new ScheduleQueueExecutor(executor2);
Publisher<Integer> publisher = source.replay(
ReplayStrategies.<Integer>historyTtlBuilder(2, ttl, queueExecutor)
ReplayStrategies.<Integer>historyTtlBuilder(2, ttl, queueExecutor, lazy)
.queueLimitHint(queueLimit).build());
try {
toSource(publisher).subscribe(subscriber1);
Expand Down Expand Up @@ -401,14 +404,17 @@ public void cancel() {
private static final class ScheduleQueueExecutor implements io.servicetalk.concurrent.Executor {
private final io.servicetalk.concurrent.Executor executor;
private final AtomicBoolean enableScheduleQueue = new AtomicBoolean();
private final AtomicLong queueTime = new AtomicLong();
private final Queue<ScheduleHolder> scheduleQueue = new ConcurrentLinkedQueue<>();

private ScheduleQueueExecutor(final io.servicetalk.concurrent.Executor executor) {
this.executor = executor;
}

void enableScheduleQueue() {
enableScheduleQueue.set(true);
if (enableScheduleQueue.compareAndSet(false, true)) {
queueTime.set(executor.currentTime(NANOSECONDS));
}
}

void drainScheduleQueue() {
Expand All @@ -424,7 +430,7 @@ void drainScheduleQueue() {

@Override
public long currentTime(final TimeUnit unit) {
return executor.currentTime(unit);
return enableScheduleQueue.get() ? NANOSECONDS.convert(queueTime.get(), unit) : executor.currentTime(unit);
}

@Override
Expand Down

0 comments on commit 3d6cc9a

Please sign in to comment.