From 532bcd5e09efbc802c8f791cf3ef0aabb224abe2 Mon Sep 17 00:00:00 2001 From: Daniel Collins Date: Thu, 28 Feb 2019 10:05:42 -0500 Subject: [PATCH] Remove package private dead code from the subscriber client library. None of this code is ever called, and none of it is usable outside of the library. --- .../v1/StreamingSubscriberConnection.java | 46 -------- .../google/cloud/pubsub/v1/Subscriber.java | 63 +--------- .../v1/StreamingSubscriberConnectionTest.java | 111 ------------------ 3 files changed, 3 insertions(+), 217 deletions(-) delete mode 100644 google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnectionTest.java diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java index f8892fdf6dd4..defd66806a84 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java @@ -21,7 +21,6 @@ import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutureCallback; import com.google.api.core.ApiFutures; -import com.google.api.core.InternalApi; import com.google.api.core.SettableApiFuture; import com.google.api.gax.batching.FlowController; import com.google.api.gax.core.Distribution; @@ -43,7 +42,6 @@ import com.google.pubsub.v1.StreamingPullRequest; import com.google.pubsub.v1.StreamingPullResponse; import io.grpc.Status; -import java.util.ArrayList; import java.util.Deque; import java.util.List; import java.util.concurrent.ScheduledExecutorService; @@ -317,48 +315,4 @@ public void onFailure(Throwable t) { ApiFutures.addCallback(future, loggingCallback); } } - - @InternalApi - static List partitionAckOperations( - List acksToSend, List ackDeadlineExtensions, int size) { - int numExtensions = 0; - for (PendingModifyAckDeadline modify : ackDeadlineExtensions) { - numExtensions += modify.ackIds.size(); - } - int numChanges = Math.max(numExtensions, acksToSend.size()); - int numRequests = numChanges / size + (numChanges % size == 0 ? 0 : 1); - - List requests = new ArrayList<>(numRequests); - for (int i = 0; i < numRequests; i++) { - requests.add(StreamingPullRequest.newBuilder()); - } - - int reqCount = 0; - for (List acksChunk : Lists.partition(acksToSend, size)) { - requests.get(reqCount).addAllAckIds(acksChunk); - reqCount++; - } - - reqCount = 0; - int ackCount = 0; - for (PendingModifyAckDeadline modify : ackDeadlineExtensions) { - for (String ackId : modify.ackIds) { - requests - .get(reqCount) - .addModifyDeadlineSeconds(modify.deadlineExtensionSeconds) - .addModifyDeadlineAckIds(ackId); - ackCount++; - if (ackCount == size) { - reqCount++; - ackCount = 0; - } - } - } - - List ret = new ArrayList<>(requests.size()); - for (StreamingPullRequest.Builder builder : requests) { - ret.add(builder.build()); - } - return ret; - } } diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java index 41d60c3a2bc9..ed5e8551d75e 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java @@ -48,7 +48,6 @@ import java.util.LinkedList; import java.util.List; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; import java.util.logging.Level; import java.util.logging.Logger; import javax.annotation.Nullable; @@ -94,6 +93,7 @@ public class Subscriber extends AbstractApiService { @InternalApi static final int MAX_ACK_DEADLINE_SECONDS = 600; @InternalApi static final int MIN_ACK_DEADLINE_SECONDS = 10; private static final Duration UNARY_TIMEOUT = Duration.ofSeconds(60); + private static final Duration ACK_EXPIRATION_PADDING = Duration.ofSeconds(5); private static final ScheduledExecutorService SHARED_SYSTEM_EXECUTOR = InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(6).build().getExecutor(); @@ -102,7 +102,6 @@ public class Subscriber extends AbstractApiService { private final String subscriptionName; private final FlowControlSettings flowControlSettings; - private final Duration ackExpirationPadding; private final Duration maxAckExtensionPeriod; private final ScheduledExecutorService executor; @Nullable private final ScheduledExecutorService alarmsExecutor; @@ -120,20 +119,12 @@ public class Subscriber extends AbstractApiService { new LinkedList<>(); private final ApiClock clock; private final List closeables = new ArrayList<>(); - private ScheduledFuture ackDeadlineUpdater; private Subscriber(Builder builder) { receiver = builder.receiver; flowControlSettings = builder.flowControlSettings; subscriptionName = builder.subscriptionName; - Preconditions.checkArgument( - builder.ackExpirationPadding.compareTo(Duration.ZERO) > 0, "padding must be positive"); - Preconditions.checkArgument( - builder.ackExpirationPadding.compareTo(Duration.ofSeconds(MIN_ACK_DEADLINE_SECONDS)) < 0, - "padding must be less than %s seconds", - MIN_ACK_DEADLINE_SECONDS); - ackExpirationPadding = builder.ackExpirationPadding; maxAckExtensionPeriod = builder.maxAckExtensionPeriod; clock = builder.clock.isPresent() ? builder.clock.get() : CurrentMillisClock.getDefaultClock(); @@ -228,12 +219,6 @@ public String getSubscriptionNameString() { return subscriptionName; } - /** Acknowledgement expiration padding. See {@link Builder#setAckExpirationPadding}. */ - @InternalApi - Duration getAckExpirationPadding() { - return ackExpirationPadding; - } - /** The flow control settings the Subscriber is configured with. */ public FlowControlSettings getFlowControlSettings() { return flowControlSettings; @@ -331,14 +316,14 @@ public void run() { .start(); } - private void startStreamingConnections() throws IOException { + private void startStreamingConnections() { synchronized (streamingSubscriberConnections) { for (int i = 0; i < numPullers; i++) { streamingSubscriberConnections.add( new StreamingSubscriberConnection( subscriptionName, receiver, - ackExpirationPadding, + ACK_EXPIRATION_PADDING, maxAckExtensionPeriod, ackLatencyDistribution, subStub, @@ -372,9 +357,6 @@ public void failed(State from, Throwable failure) { private void stopAllStreamingConnections() { stopConnections(streamingSubscriberConnections); - if (ackDeadlineUpdater != null) { - ackDeadlineUpdater.cancel(true); - } } private void startConnections( @@ -410,8 +392,6 @@ private void stopConnections(List connections) { /** Builder of {@link Subscriber Subscribers}. */ public static final class Builder { - private static final Duration MIN_ACK_EXPIRATION_PADDING = Duration.ofMillis(100); - private static final Duration DEFAULT_ACK_EXPIRATION_PADDING = Duration.ofSeconds(5); private static final Duration DEFAULT_MAX_ACK_EXTENSION_PERIOD = Duration.ofMinutes(60); static final ExecutorProvider DEFAULT_EXECUTOR_PROVIDER = @@ -423,7 +403,6 @@ public static final class Builder { String subscriptionName; MessageReceiver receiver; - Duration ackExpirationPadding = DEFAULT_ACK_EXPIRATION_PADDING; Duration maxAckExtensionPeriod = DEFAULT_MAX_ACK_EXTENSION_PERIOD; FlowControlSettings flowControlSettings = @@ -437,8 +416,6 @@ public static final class Builder { .setKeepAliveTime(Duration.ofMinutes(5)) .build(); HeaderProvider headerProvider = new NoHeaderProvider(); - HeaderProvider internalHeaderProvider = - SubscriptionAdminSettings.defaultApiClientHeaderProviderBuilder().build(); CredentialsProvider credentialsProvider = SubscriptionAdminSettings.defaultCredentialsProviderBuilder().build(); Optional clock = Optional.absent(); @@ -478,21 +455,6 @@ public Builder setHeaderProvider(HeaderProvider headerProvider) { return this; } - /** - * Sets the static header provider for getting internal (library-defined) headers. The header - * provider will be called during client construction only once. The headers returned by the - * provider will be cached and supplied as is for each request issued by the constructed client. - * Some reserved headers can be overridden (e.g. Content-Type) or merged with the default value - * (e.g. User-Agent) by the underlying transport layer. - * - * @param internalHeaderProvider the internal header provider - * @return the builder - */ - Builder setInternalHeaderProvider(HeaderProvider internalHeaderProvider) { - this.internalHeaderProvider = Preconditions.checkNotNull(internalHeaderProvider); - return this; - } - /** * Sets the flow control settings. * @@ -523,25 +485,6 @@ public Builder setFlowControlSettings(FlowControlSettings flowControlSettings) { return this; } - /** - * Set acknowledgement expiration padding. - * - *

This is the time accounted before a message expiration is to happen, so the {@link - * Subscriber} is able to send an ack extension beforehand. - * - *

This padding duration is configurable so you can account for network latency. A reasonable - * number must be provided so messages don't expire because of network latency between when the - * ack extension is required and when it reaches the Pub/Sub service. - * - * @param ackExpirationPadding must be greater or equal to {@link #MIN_ACK_EXPIRATION_PADDING} - */ - @InternalApi - Builder setAckExpirationPadding(Duration ackExpirationPadding) { - Preconditions.checkArgument(ackExpirationPadding.compareTo(MIN_ACK_EXPIRATION_PADDING) >= 0); - this.ackExpirationPadding = ackExpirationPadding; - return this; - } - /** * Set the maximum period a message ack deadline will be extended. Defaults to one hour. * diff --git a/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnectionTest.java b/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnectionTest.java deleted file mode 100644 index 0dfcb4241730..000000000000 --- a/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnectionTest.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Copyright 2017 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.google.cloud.pubsub.v1; - -import com.google.cloud.pubsub.v1.MessageDispatcher.PendingModifyAckDeadline; -import com.google.common.truth.Truth; -import com.google.pubsub.v1.StreamingPullRequest; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import org.junit.Test; - -public class StreamingSubscriberConnectionTest { - @Test - public void testPartitionAckOperations() { - List requests; - - requests = - StreamingSubscriberConnection.partitionAckOperations( - Collections.emptyList(), Collections.emptyList(), 3); - Truth.assertThat(requests).isEmpty(); - - requests = - StreamingSubscriberConnection.partitionAckOperations( - Arrays.asList("a", "b", "c"), Collections.emptyList(), 3); - Truth.assertThat(requests) - .containsExactly( - StreamingPullRequest.newBuilder().addAckIds("a").addAckIds("b").addAckIds("c").build()) - .inOrder(); - - requests = - StreamingSubscriberConnection.partitionAckOperations( - Arrays.asList("a", "b", "c", "d"), - Collections.emptyList(), - 3); - Truth.assertThat(requests) - .containsExactly( - StreamingPullRequest.newBuilder().addAckIds("a").addAckIds("b").addAckIds("c").build(), - StreamingPullRequest.newBuilder().addAckIds("d").build()) - .inOrder(); - - requests = - StreamingSubscriberConnection.partitionAckOperations( - Arrays.asList("a", "b", "c", "d"), - Arrays.asList(new PendingModifyAckDeadline(42, "w")), - 3); - Truth.assertThat(requests) - .containsExactly( - StreamingPullRequest.newBuilder() - .addAckIds("a") - .addAckIds("b") - .addAckIds("c") - .addModifyDeadlineAckIds("w") - .addModifyDeadlineSeconds(42) - .build(), - StreamingPullRequest.newBuilder().addAckIds("d").build()) - .inOrder(); - - requests = - StreamingSubscriberConnection.partitionAckOperations( - Arrays.asList("a"), Arrays.asList(new PendingModifyAckDeadline(42, "w", "x")), 3); - Truth.assertThat(requests) - .containsExactly( - StreamingPullRequest.newBuilder() - .addAckIds("a") - .addModifyDeadlineAckIds("w") - .addModifyDeadlineSeconds(42) - .addModifyDeadlineAckIds("x") - .addModifyDeadlineSeconds(42) - .build()) - .inOrder(); - - requests = - StreamingSubscriberConnection.partitionAckOperations( - Arrays.asList("a"), - Arrays.asList( - new PendingModifyAckDeadline(42, "w", "x"), - new PendingModifyAckDeadline(43, "y", "z")), - 3); - Truth.assertThat(requests) - .containsExactly( - StreamingPullRequest.newBuilder() - .addAckIds("a") - .addModifyDeadlineAckIds("w") - .addModifyDeadlineSeconds(42) - .addModifyDeadlineAckIds("x") - .addModifyDeadlineSeconds(42) - .addModifyDeadlineAckIds("y") - .addModifyDeadlineSeconds(43) - .build(), - StreamingPullRequest.newBuilder() - .addModifyDeadlineAckIds("z") - .addModifyDeadlineSeconds(43) - .build()) - .inOrder(); - } -}