Skip to content

Commit

Permalink
Cleaning up deprecated pubsub code. (#4949)
Browse files Browse the repository at this point in the history
* Cleaning up deprecated pubsub code.

* Fixing format
  • Loading branch information
sduskis authored Apr 16, 2019
1 parent 3c8c8d6 commit e5e7a21
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -376,8 +376,7 @@ public void run() {
/** Compute the ideal deadline, set subsequent modacks to this deadline, and return it. */
@InternalApi
int computeDeadlineSeconds() {
long secLong = ackLatencyDistribution.getNthPercentile(PERCENTILE_FOR_ACK_DEADLINE_UPDATES);
int sec = Ints.saturatedCast(secLong);
int sec = ackLatencyDistribution.getPercentile(PERCENTILE_FOR_ACK_DEADLINE_UPDATES);

// Use Ints.constrainToRange when we get guava 21.
if (sec < Subscriber.MIN_ACK_DEADLINE_SECONDS) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.google.cloud.pubsub.v1;

import static com.google.common.util.concurrent.MoreExecutors.directExecutor;

import com.google.api.core.ApiFunction;
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
Expand Down Expand Up @@ -357,7 +359,8 @@ public void onFailure(Throwable t) {
messagesWaiter.incrementPendingMessages(-outstandingBatch.size());
}
}
});
},
directExecutor());
}

private static final class OutstandingBatch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.google.cloud.pubsub.v1;

import static com.google.common.util.concurrent.MoreExecutors.directExecutor;

import com.google.api.core.AbstractApiService;
import com.google.api.core.ApiClock;
import com.google.api.core.ApiFuture;
Expand Down Expand Up @@ -291,7 +293,7 @@ public void onFailure(Throwable t) {
.addAllAckIds(idChunk)
.setAckDeadlineSeconds(modack.deadlineExtensionSeconds)
.build());
ApiFutures.addCallback(future, loggingCallback);
ApiFutures.addCallback(future, loggingCallback, directExecutor());
}
}

Expand All @@ -303,7 +305,7 @@ public void onFailure(Throwable t) {
.setSubscription(subscription)
.addAllAckIds(idChunk)
.build());
ApiFutures.addCallback(future, loggingCallback);
ApiFutures.addCallback(future, loggingCallback, directExecutor());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import junit.framework.Assert;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
Expand Down

0 comments on commit e5e7a21

Please sign in to comment.