Skip to content

Commit

Permalink
fix replay max signals & timer concurrency
Browse files Browse the repository at this point in the history
  • Loading branch information
Scottmitch committed Sep 29, 2023
1 parent e8e042a commit 1963359
Show file tree
Hide file tree
Showing 2 changed files with 253 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,15 @@
import java.util.Deque;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Consumer;
import javax.annotation.Nullable;

import static io.servicetalk.concurrent.api.SubscriberApiUtils.unwrapNullUnchecked;
import static io.servicetalk.concurrent.api.SubscriberApiUtils.wrapNull;
import static io.servicetalk.concurrent.internal.EmptySubscriptions.EMPTY_SUBSCRIPTION_NO_THROW;
import static io.servicetalk.concurrent.internal.ConcurrentUtils.releaseLock;
import static io.servicetalk.concurrent.internal.ConcurrentUtils.tryAcquireLock;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.atomic.AtomicReferenceFieldUpdater.newUpdater;
Expand Down Expand Up @@ -92,23 +93,24 @@ public void deliverAccumulation(final Consumer<T> consumer) {
}

private static final class MostRecentTimeLimitedReplayAccumulator<T> implements ReplayAccumulator<T> {
@SuppressWarnings("rawtypes")
private static final AtomicLongFieldUpdater<MostRecentTimeLimitedReplayAccumulator> stateSizeUpdater =
AtomicLongFieldUpdater.newUpdater(MostRecentTimeLimitedReplayAccumulator.class, "stateSize");
private static final Cancellable CANCELLED = () -> { };
@SuppressWarnings("rawtypes")
private static final AtomicReferenceFieldUpdater<MostRecentTimeLimitedReplayAccumulator, Cancellable>
timerCancellableUpdater = newUpdater(MostRecentTimeLimitedReplayAccumulator.class, Cancellable.class,
"timerCancellable");
@SuppressWarnings("rawtypes")
private static final AtomicIntegerFieldUpdater<MostRecentTimeLimitedReplayAccumulator> queueLockUpdater =
AtomicIntegerFieldUpdater.newUpdater(MostRecentTimeLimitedReplayAccumulator.class, "queueLock");
@SuppressWarnings("rawtypes")
private static final AtomicIntegerFieldUpdater<MostRecentTimeLimitedReplayAccumulator> queueSizeUpdater =
AtomicIntegerFieldUpdater.newUpdater(MostRecentTimeLimitedReplayAccumulator.class, "queueSize");
private final Executor executor;
private final Queue<TimeStampSignal<T>> items;
private final long ttlNanos;
private final int maxItems;
/**
* Provide atomic state for size of {@link #items} and also for visibility between the threads consuming and
* producing. The atomically incrementing "state" ensures that any modifications from the producer thread
* are visible from the consumer thread and we never "miss" a timer schedule event if the queue becomes empty.
*/
private volatile long stateSize;
private volatile int queueSize;
@SuppressWarnings("unused")
private volatile int queueLock;
@Nullable
private volatile Cancellable timerCancellable;

Expand All @@ -122,68 +124,102 @@ private static final class MostRecentTimeLimitedReplayAccumulator<T> implements
this.executor = requireNonNull(executor);
this.ttlNanos = ttl.toNanos();
this.maxItems = maxItems;
items = new ConcurrentLinkedQueue<>(); // SpMc
// SpSc, but needs iterator.
// accumulate is called on one thread (no concurrent access on this method).
// timerFire maybe called on another thread
items = new ConcurrentLinkedQueue<>();
}

@Override
public void accumulate(@Nullable final T t) {
// We may exceed max items in the queue but this method isn't invoked concurrently, so we only go over by
// at most 1 item.
items.add(new TimeStampSignal<>(executor.currentTime(NANOSECONDS), t));
for (;;) {
final long currentStateSize = stateSize;
final int currentSize = getSize(currentStateSize);
final int nextState = getState(currentStateSize) + 1;
if (currentSize >= maxItems) {
if (stateSizeUpdater.compareAndSet(this, currentStateSize,
buildStateSize(nextState, currentSize))) {
long scheduleTimerNanos = -1;
final TimeStampSignal<T> signal = new TimeStampSignal<>(executor.currentTime(NANOSECONDS), t);
if (tryAcquireLock(queueLockUpdater, this)) {
for (;;) {
final int qSize = queueSize;
if (qSize < maxItems) {
if (queueSizeUpdater.compareAndSet(this, qSize, qSize + 1)) {
items.add(signal);
if (qSize == 0) {
scheduleTimerNanos = ttlNanos;
}
break;
}
} else if (queueSizeUpdater.compareAndSet(this, qSize, qSize)) {
// Queue removal is only done while queueLock is acquired, so we don't need to worry about
// the timer thread removing items concurrently.
items.poll();
items.add(signal);
break;
}
} else if (stateSizeUpdater.compareAndSet(this, currentStateSize,
buildStateSize(nextState, currentSize + 1))) {
if (currentSize == 0) {
schedulerTimer(ttlNanos);
}
break;
}
if (!releaseLock(queueLockUpdater, this)) {
scheduleTimerNanos = tryDrainQueue();
}
} else {
queueSizeUpdater.incrementAndGet(this);
items.add(signal);
scheduleTimerNanos = tryDrainQueue();
}

if (scheduleTimerNanos >= 0) {
schedulerTimer(scheduleTimerNanos);
}
}

@Override
public void deliverAccumulation(final Consumer<T> consumer) {
int i = 0;
for (TimeStampSignal<T> timeStampSignal : items) {
consumer.accept(timeStampSignal.signal);
// The queue size maybe larger than maxItems if we weren't able to acquire the queueLock while adding.
// This is only a temporary condition while there is concurrent access between timer and accumulator.
// Guard against it here to preserve the invariant that we shouldn't deliver more than maxItems.
if (++i >= maxItems) {
break;
}
}
}

@Override
public void cancelAccumulation() {
final Cancellable cancellable = timerCancellableUpdater.getAndSet(this, EMPTY_SUBSCRIPTION_NO_THROW);
// Stop the background timer and prevent it from being rescheduled. It is possible upstream may deliver
// more data but the queue size is bounded by maxItems and this method should only be called when upstream
// is cancelled which should eventually dereference this object making it eligible for GC (no memory leak).
final Cancellable cancellable = timerCancellableUpdater.getAndSet(this, CANCELLED);
if (cancellable != null) {
cancellable.cancel();
}
}

private static int getSize(long stateSize) {
return (int) stateSize;
}
private long tryDrainQueue() {
long scheduleTimerNanos = -1;
boolean tryAcquire = true;
while (tryAcquire && tryAcquireLock(queueLockUpdater, this)) {
// Ensure the queue contains maxItems or less items.
for (;;) {
final int qSize = queueSize;
if (qSize <= maxItems) {
break;
} else if (queueSizeUpdater.compareAndSet(this, qSize, qSize - 1)) {
items.poll();
}
}

private static int getState(long stateSize) {
return (int) (stateSize >>> 32);
}
scheduleTimerNanos = doExpire();

private static long buildStateSize(int state, int size) {
return (((long) state) << 32) | size;
tryAcquire = !releaseLock(queueLockUpdater, this);
}
return scheduleTimerNanos;
}

private void schedulerTimer(long nanos) {
for (;;) {
final Cancellable currentCancellable = timerCancellable;
if (currentCancellable == EMPTY_SUBSCRIPTION_NO_THROW) {
if (currentCancellable == CANCELLED) {
break;
} else {
final Cancellable nextCancellable = executor.schedule(this::expireSignals, nanos, NANOSECONDS);
final Cancellable nextCancellable = executor.schedule(this::timerFire, nanos, NANOSECONDS);
if (timerCancellableUpdater.compareAndSet(this, currentCancellable, nextCancellable)) {
// Current logic only has 1 timer outstanding at any give time so cancellation of
// the current cancellable shouldn't be necessary but do it for completeness.
Expand All @@ -198,37 +234,44 @@ private void schedulerTimer(long nanos) {
}
}

private void expireSignals() {
// lock must be held!
private long doExpire() {
final long nanoTime = executor.currentTime(NANOSECONDS);
TimeStampSignal<T> item;
for (;;) {
// read stateSize before peek, so if we poll from the queue we are sure to see the correct
// state relative to items in the queue.
final long currentStateSize = stateSize;
final long delta;
item = items.peek();
if (item == null) {
break;
} else if (nanoTime - item.timeStamp >= ttlNanos) {
final int currentSize = getSize(currentStateSize);
if (stateSizeUpdater.compareAndSet(this, currentStateSize,
buildStateSize(getState(currentStateSize) + 1, currentSize - 1))) {
// When we add: we add to the queue we add first, then CAS sizeState.
// When we remove: we CAS the atomic state first, then poll.
// This avoids removing a non-expired item because if the "add" thread is running faster and
// already polled "item" the CAS will fail, and we will try again on the next loop iteration.
items.poll();
if (currentSize == 1) {
// a new timer task will be scheduled after addition if this is the case. break to avoid
// multiple timer tasks running concurrently.
break;
}
}
return -1;
} else if ((delta = nanoTime - item.timeStamp) >= ttlNanos) {
final int qSize = queueSizeUpdater.decrementAndGet(this);
assert qSize >= 0;
// Removal is only done while holding the lock. This means we don't have to worry about the
// accumulator thread running concurrently and removing the peeked item behind our back.
items.poll();
} else {
schedulerTimer(ttlNanos - (nanoTime - item.timeStamp));
break; // elements sorted in increasing time, break when first non-expired entry found.
// elements sorted in increasing time, break when first non-expired entry found.
// delta maybe negative if ttlNanos is small and this method sees newly added items while looping.
return delta <= 0 ? ttlNanos : ttlNanos - (nanoTime - item.timeStamp);
}
}
}

private void timerFire() {
long scheduleTimerNanos;
if (tryAcquireLock(queueLockUpdater, this)) {
scheduleTimerNanos = doExpire();
if (!releaseLock(queueLockUpdater, this)) {
scheduleTimerNanos = tryDrainQueue();
}
} else {
scheduleTimerNanos = tryDrainQueue();
}

if (scheduleTimerNanos >= 0) {
schedulerTimer(scheduleTimerNanos);
}
}
}

private static final class TimeStampSignal<T> {
Expand Down
Loading

0 comments on commit 1963359

Please sign in to comment.