Skip to content

Commit

Permalink
Max ack extension (#1898)
Browse files Browse the repository at this point in the history
  • Loading branch information
davidtorres authored and garrettjonesgoogle committed Apr 12, 2017
1 parent e11154e commit 95946ef
Show file tree
Hide file tree
Showing 6 changed files with 174 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ class MessageDispatcher {
private final ApiClock clock;

private final Duration ackExpirationPadding;
private final Duration maxAckExtensionPeriod;
private final MessageReceiver receiver;
private final AckProcessor ackProcessor;

Expand All @@ -87,20 +88,27 @@ class MessageDispatcher {
// it is not modified while inside the queue.
// The hashcode and equals methods are explicitly not implemented to discourage
// the use of this class as keys in maps or similar containers.
private static class ExtensionJob implements Comparable<ExtensionJob> {
private class ExtensionJob implements Comparable<ExtensionJob> {
Instant creation;
Instant expiration;
int nextExtensionSeconds;
ArrayList<AckHandler> ackHandlers;

ExtensionJob(
Instant expiration, int initialAckDeadlineExtension, ArrayList<AckHandler> ackHandlers) {
Instant creation,
Instant expiration,
int initialAckDeadlineExtension,
ArrayList<AckHandler> ackHandlers) {
this.creation = creation;
this.expiration = expiration;
nextExtensionSeconds = initialAckDeadlineExtension;
this.ackHandlers = ackHandlers;
}

void extendExpiration(Instant now) {
expiration = now.plus(Duration.standardSeconds(nextExtensionSeconds));
Instant possibleExtension = now.plus(Duration.standardSeconds(nextExtensionSeconds));
Instant maxExtension = creation.plus(maxAckExtensionPeriod);
expiration = possibleExtension.isBefore(maxExtension) ? possibleExtension : maxExtension;
nextExtensionSeconds = Math.min(2 * nextExtensionSeconds, MAX_ACK_DEADLINE_EXTENSION_SECS);
}

Expand Down Expand Up @@ -217,12 +225,14 @@ void sendAckOperations(
MessageReceiver receiver,
AckProcessor ackProcessor,
Duration ackExpirationPadding,
Duration maxAckExtensionPeriod,
Distribution ackLatencyDistribution,
FlowController flowController,
ScheduledExecutorService executor,
ApiClock clock) {
this.executor = executor;
this.ackExpirationPadding = ackExpirationPadding;
this.maxAckExtensionPeriod = maxAckExtensionPeriod;
this.receiver = receiver;
this.ackProcessor = ackProcessor;
this.flowController = flowController;
Expand Down Expand Up @@ -305,7 +315,11 @@ public void run() {

synchronized (outstandingAckHandlers) {
outstandingAckHandlers.add(
new ExtensionJob(expiration, INITIAL_ACK_DEADLINE_EXTENSION_SECONDS, ackHandlers));
new ExtensionJob(
new Instant(clock.millisTime()),
expiration,
INITIAL_ACK_DEADLINE_EXTENSION_SECONDS,
ackHandlers));
}
setupNextAckDeadlineExtensionAlarm(expiration);

Expand Down Expand Up @@ -380,6 +394,13 @@ public void run() {
&& outstandingAckHandlers.peek().expiration.compareTo(cutOverTime) <= 0) {
ExtensionJob job = outstandingAckHandlers.poll();

if (maxAckExtensionPeriod.getMillis() > 0
&& job.creation.plus(maxAckExtensionPeriod).compareTo(now) <= 0) {
// The job has expired, according to the maxAckExtensionPeriod, we are just going to
// drop it.
continue;
}

// If a message has already been acked, remove it, nothing to do.
for (int i = 0; i < job.ackHandlers.size(); ) {
if (job.ackHandlers.get(i).acked.get()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public PollingSubscriberConnection(
Credentials credentials,
MessageReceiver receiver,
Duration ackExpirationPadding,
Duration maxAckExtensionPeriod,
Distribution ackLatencyDistribution,
Channel channel,
FlowController flowController,
Expand All @@ -82,6 +83,7 @@ public PollingSubscriberConnection(
receiver,
this,
ackExpirationPadding,
maxAckExtensionPeriod,
ackLatencyDistribution,
flowController,
executor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public StreamingSubscriberConnection(
Credentials credentials,
MessageReceiver receiver,
Duration ackExpirationPadding,
Duration maxAckExtensionPeriod,
int streamAckDeadlineSeconds,
Distribution ackLatencyDistribution,
Channel channel,
Expand All @@ -85,6 +86,7 @@ public StreamingSubscriberConnection(
receiver,
this,
ackExpirationPadding,
maxAckExtensionPeriod,
ackLatencyDistribution,
flowController,
executor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ public class Subscriber extends AbstractApiService {
private final String cachedSubscriptionNameString;
private final FlowControlSettings flowControlSettings;
private final Duration ackExpirationPadding;
private final Duration maxAckExtensionPeriod;
private final ScheduledExecutorService executor;
private final Distribution ackLatencyDistribution =
new Distribution(MAX_ACK_DEADLINE_SECONDS + 1);
Expand All @@ -113,6 +114,7 @@ private Subscriber(Builder builder) throws IOException {
subscriptionName = builder.subscriptionName;
cachedSubscriptionNameString = subscriptionName.toString();
ackExpirationPadding = builder.ackExpirationPadding;
maxAckExtensionPeriod = builder.maxAckExtensionPeriod;
streamAckDeadlineSeconds =
Math.max(
INITIAL_ACK_DEADLINE_SECONDS,
Expand Down Expand Up @@ -245,6 +247,7 @@ private void startStreamingConnections() {
credentials,
receiver,
ackExpirationPadding,
maxAckExtensionPeriod,
streamAckDeadlineSeconds,
ackLatencyDistribution,
channelBuilder.build(),
Expand Down Expand Up @@ -321,6 +324,7 @@ private void startPollingConnections() {
credentials,
receiver,
ackExpirationPadding,
maxAckExtensionPeriod,
ackLatencyDistribution,
channelBuilder.build(),
flowController,
Expand Down Expand Up @@ -409,6 +413,7 @@ public void run() {
public static final class Builder {
private static final Duration MIN_ACK_EXPIRATION_PADDING = Duration.millis(100);
private static final Duration DEFAULT_ACK_EXPIRATION_PADDING = Duration.millis(500);
private static final Duration DEFAULT_MAX_ACK_EXTENSION_PERIOD = Duration.standardMinutes(60);

static final ExecutorProvider DEFAULT_EXECUTOR_PROVIDER =
InstantiatingExecutorProvider.newBuilder()
Expand All @@ -423,6 +428,7 @@ public static final class Builder {
MessageReceiver receiver;

Duration ackExpirationPadding = DEFAULT_ACK_EXPIRATION_PADDING;
Duration maxAckExtensionPeriod = DEFAULT_MAX_ACK_EXTENSION_PERIOD;

FlowControlSettings flowControlSettings = FlowControlSettings.getDefaultInstance();

Expand Down Expand Up @@ -483,6 +489,21 @@ public Builder setAckExpirationPadding(Duration ackExpirationPadding) {
return this;
}

/**
* Set the maximum period a message ack deadline will be extended.
*
* <p>It is recommended to set this value to a reasonable upper bound of the subscriber time to
* process any message. This maximum period avoids messages to be <i>locked</i> by a subscriber
* in cases when the {@link AckReply} is lost.
*
* <p>A zero duration effectively disables auto deadline extensions.
*/
public Builder setMaxAckExtensionPeriod(Duration maxAckExtensionPeriod) {
Preconditions.checkArgument(maxAckExtensionPeriod.getMillis() >= 0);
this.maxAckExtensionPeriod = maxAckExtensionPeriod;
return this;
}

/** Gives the ability to set a custom executor. */
public Builder setExecutorProvider(ExecutorProvider executorProvider) {
this.executorProvider = Preconditions.checkNotNull(executorProvider);
Expand All @@ -500,3 +521,4 @@ public Subscriber build() throws IOException {
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.SettableFuture;
import java.util.ArrayList;
import java.util.Deque;
import java.util.LinkedList;
import java.util.List;
import java.util.PriorityQueue;
import java.util.concurrent.AbstractExecutorService;
Expand All @@ -44,6 +46,7 @@ public class FakeScheduledExecutorService extends AbstractExecutorService
private final AtomicBoolean shutdown = new AtomicBoolean(false);
private final PriorityQueue<PendingCallable<?>> pendingCallables = new PriorityQueue<>();
private final FakeClock clock = new FakeClock();
private final Deque<Duration> expectedWorkQueue = new LinkedList<>();

public ApiClock getClock() {
return clock;
Expand Down Expand Up @@ -79,6 +82,35 @@ public ScheduledFuture<?> scheduleWithFixedDelay(
Duration.millis(unit.toMillis(initialDelay)), command, PendingCallableType.FIXED_DELAY));
}

/**
* This allows for adding expectations on future work to be scheduled (
* {@link FakeScheduledExecutorService#schedule}
* or {@link FakeScheduledExecutorService#scheduleAtFixedRate}
* or {@link FakeScheduledExecutorService#scheduleWithFixedDelay}) based on its delay.
*/
public void setupScheduleExpectation(Duration delay) {
synchronized (expectedWorkQueue) {
expectedWorkQueue.add(delay);
}
}

/**
* Blocks the current thread until all the work
* {@link FakeScheduledExecutorService#setupScheduleExpectation(Duration) expected} has been
* scheduled in the executor.
*/
public void waitForExpectedWork() {
synchronized (expectedWorkQueue) {
while (!expectedWorkQueue.isEmpty()) {
try {
expectedWorkQueue.wait();
} catch (InterruptedException e) {
// Wait uninterruptibly
}
}
}
}

/**
* This will advance the reference time of the executor and execute (in the same thread) any
* outstanding callable which execution time has passed.
Expand All @@ -94,13 +126,14 @@ private void work() {
for (;;) {
PendingCallable<?> callable = null;
synchronized (pendingCallables) {
if (pendingCallables.isEmpty() || pendingCallables.peek().getScheduledTime().isAfter(cmpTime)) {
if (pendingCallables.isEmpty()
|| pendingCallables.peek().getScheduledTime().isAfter(cmpTime)) {
break;
}
callable = pendingCallables.poll();
}
if (callable != null) {
try{
try {
callable.call();
} catch (Exception e) {
// We ignore any callable exception, which should be set to the future but not relevant to
Expand Down Expand Up @@ -182,6 +215,16 @@ <V> ScheduledFuture<V> schedulePendingCallable(PendingCallable<V> callable) {
pendingCallables.add(callable);
}
work();
synchronized (expectedWorkQueue) {
// We compare by the callable delay in order decide when to remove expectations from the
// expected work queue, i.e. only the expected work that matches the delay of the scheduled
// callable is removed from the queue.
if (!expectedWorkQueue.isEmpty() && expectedWorkQueue.peek().equals(callable.delay)) {
expectedWorkQueue.poll();
}
expectedWorkQueue.notifyAll();
}

return callable.getScheduledFuture();
}

Expand Down
Loading

0 comments on commit 95946ef

Please sign in to comment.