Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove package private dead code from the subscriber client library. #4589

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.core.InternalApi;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.batching.FlowController;
import com.google.api.gax.core.Distribution;
Expand All @@ -43,7 +42,6 @@
import com.google.pubsub.v1.StreamingPullRequest;
import com.google.pubsub.v1.StreamingPullResponse;
import io.grpc.Status;
import java.util.ArrayList;
import java.util.Deque;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -317,48 +315,4 @@ public void onFailure(Throwable t) {
ApiFutures.addCallback(future, loggingCallback);
}
}

@InternalApi
static List<StreamingPullRequest> partitionAckOperations(
List<String> acksToSend, List<PendingModifyAckDeadline> ackDeadlineExtensions, int size) {
int numExtensions = 0;
for (PendingModifyAckDeadline modify : ackDeadlineExtensions) {
numExtensions += modify.ackIds.size();
}
int numChanges = Math.max(numExtensions, acksToSend.size());
int numRequests = numChanges / size + (numChanges % size == 0 ? 0 : 1);

List<StreamingPullRequest.Builder> requests = new ArrayList<>(numRequests);
for (int i = 0; i < numRequests; i++) {
requests.add(StreamingPullRequest.newBuilder());
}

int reqCount = 0;
for (List<String> acksChunk : Lists.partition(acksToSend, size)) {
requests.get(reqCount).addAllAckIds(acksChunk);
reqCount++;
}

reqCount = 0;
int ackCount = 0;
for (PendingModifyAckDeadline modify : ackDeadlineExtensions) {
for (String ackId : modify.ackIds) {
requests
.get(reqCount)
.addModifyDeadlineSeconds(modify.deadlineExtensionSeconds)
.addModifyDeadlineAckIds(ackId);
ackCount++;
if (ackCount == size) {
reqCount++;
ackCount = 0;
}
}
}

List<StreamingPullRequest> ret = new ArrayList<>(requests.size());
for (StreamingPullRequest.Builder builder : requests) {
ret.add(builder.build());
}
return ret;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -94,6 +93,7 @@ public class Subscriber extends AbstractApiService {
@InternalApi static final int MAX_ACK_DEADLINE_SECONDS = 600;
@InternalApi static final int MIN_ACK_DEADLINE_SECONDS = 10;
private static final Duration UNARY_TIMEOUT = Duration.ofSeconds(60);
private static final Duration ACK_EXPIRATION_PADDING = Duration.ofSeconds(5);

private static final ScheduledExecutorService SHARED_SYSTEM_EXECUTOR =
InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(6).build().getExecutor();
Expand All @@ -102,7 +102,6 @@ public class Subscriber extends AbstractApiService {

private final String subscriptionName;
private final FlowControlSettings flowControlSettings;
private final Duration ackExpirationPadding;
private final Duration maxAckExtensionPeriod;
private final ScheduledExecutorService executor;
@Nullable private final ScheduledExecutorService alarmsExecutor;
Expand All @@ -120,20 +119,12 @@ public class Subscriber extends AbstractApiService {
new LinkedList<>();
private final ApiClock clock;
private final List<AutoCloseable> closeables = new ArrayList<>();
private ScheduledFuture<?> ackDeadlineUpdater;

private Subscriber(Builder builder) {
receiver = builder.receiver;
flowControlSettings = builder.flowControlSettings;
subscriptionName = builder.subscriptionName;

Preconditions.checkArgument(
builder.ackExpirationPadding.compareTo(Duration.ZERO) > 0, "padding must be positive");
Preconditions.checkArgument(
builder.ackExpirationPadding.compareTo(Duration.ofSeconds(MIN_ACK_DEADLINE_SECONDS)) < 0,
"padding must be less than %s seconds",
MIN_ACK_DEADLINE_SECONDS);
ackExpirationPadding = builder.ackExpirationPadding;
maxAckExtensionPeriod = builder.maxAckExtensionPeriod;
clock = builder.clock.isPresent() ? builder.clock.get() : CurrentMillisClock.getDefaultClock();

Expand Down Expand Up @@ -228,12 +219,6 @@ public String getSubscriptionNameString() {
return subscriptionName;
}

/** Acknowledgement expiration padding. See {@link Builder#setAckExpirationPadding}. */
@InternalApi
Duration getAckExpirationPadding() {
return ackExpirationPadding;
}

/** The flow control settings the Subscriber is configured with. */
public FlowControlSettings getFlowControlSettings() {
return flowControlSettings;
Expand Down Expand Up @@ -331,14 +316,14 @@ public void run() {
.start();
}

private void startStreamingConnections() throws IOException {
private void startStreamingConnections() {
synchronized (streamingSubscriberConnections) {
for (int i = 0; i < numPullers; i++) {
streamingSubscriberConnections.add(
new StreamingSubscriberConnection(
subscriptionName,
receiver,
ackExpirationPadding,
ACK_EXPIRATION_PADDING,
maxAckExtensionPeriod,
ackLatencyDistribution,
subStub,
Expand Down Expand Up @@ -372,9 +357,6 @@ public void failed(State from, Throwable failure) {

private void stopAllStreamingConnections() {
stopConnections(streamingSubscriberConnections);
if (ackDeadlineUpdater != null) {
ackDeadlineUpdater.cancel(true);
}
}

private void startConnections(
Expand Down Expand Up @@ -410,8 +392,6 @@ private void stopConnections(List<? extends ApiService> connections) {

/** Builder of {@link Subscriber Subscribers}. */
public static final class Builder {
private static final Duration MIN_ACK_EXPIRATION_PADDING = Duration.ofMillis(100);
private static final Duration DEFAULT_ACK_EXPIRATION_PADDING = Duration.ofSeconds(5);
private static final Duration DEFAULT_MAX_ACK_EXTENSION_PERIOD = Duration.ofMinutes(60);

static final ExecutorProvider DEFAULT_EXECUTOR_PROVIDER =
Expand All @@ -423,7 +403,6 @@ public static final class Builder {
String subscriptionName;
MessageReceiver receiver;

Duration ackExpirationPadding = DEFAULT_ACK_EXPIRATION_PADDING;
Duration maxAckExtensionPeriod = DEFAULT_MAX_ACK_EXTENSION_PERIOD;

FlowControlSettings flowControlSettings =
Expand All @@ -437,8 +416,6 @@ public static final class Builder {
.setKeepAliveTime(Duration.ofMinutes(5))
.build();
HeaderProvider headerProvider = new NoHeaderProvider();
HeaderProvider internalHeaderProvider =
SubscriptionAdminSettings.defaultApiClientHeaderProviderBuilder().build();
CredentialsProvider credentialsProvider =
SubscriptionAdminSettings.defaultCredentialsProviderBuilder().build();
Optional<ApiClock> clock = Optional.absent();
Expand Down Expand Up @@ -478,21 +455,6 @@ public Builder setHeaderProvider(HeaderProvider headerProvider) {
return this;
}

/**
* Sets the static header provider for getting internal (library-defined) headers. The header
* provider will be called during client construction only once. The headers returned by the
* provider will be cached and supplied as is for each request issued by the constructed client.
* Some reserved headers can be overridden (e.g. Content-Type) or merged with the default value
* (e.g. User-Agent) by the underlying transport layer.
*
* @param internalHeaderProvider the internal header provider
* @return the builder
*/
Builder setInternalHeaderProvider(HeaderProvider internalHeaderProvider) {
this.internalHeaderProvider = Preconditions.checkNotNull(internalHeaderProvider);
return this;
}

/**
* Sets the flow control settings.
*
Expand Down Expand Up @@ -523,25 +485,6 @@ public Builder setFlowControlSettings(FlowControlSettings flowControlSettings) {
return this;
}

/**
* Set acknowledgement expiration padding.
*
* <p>This is the time accounted before a message expiration is to happen, so the {@link
* Subscriber} is able to send an ack extension beforehand.
*
* <p>This padding duration is configurable so you can account for network latency. A reasonable
* number must be provided so messages don't expire because of network latency between when the
* ack extension is required and when it reaches the Pub/Sub service.
*
* @param ackExpirationPadding must be greater or equal to {@link #MIN_ACK_EXPIRATION_PADDING}
*/
@InternalApi
Builder setAckExpirationPadding(Duration ackExpirationPadding) {
Preconditions.checkArgument(ackExpirationPadding.compareTo(MIN_ACK_EXPIRATION_PADDING) >= 0);
this.ackExpirationPadding = ackExpirationPadding;
return this;
}

/**
* Set the maximum period a message ack deadline will be extended. Defaults to one hour.
*
Expand Down

This file was deleted.