From bb3bc5869740489044749b516490e45536fd50d8 Mon Sep 17 00:00:00 2001 From: Praful Makani Date: Tue, 30 Apr 2019 18:20:39 +0530 Subject: [PATCH] Fix Publisher.shutdown --- .../com/google/cloud/pubsub/v1/Publisher.java | 22 +++++------- .../cloud/pubsub/v1/PublisherImplTest.java | 35 +++++++++++++++++++ 2 files changed, 44 insertions(+), 13 deletions(-) diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java index 352822a71b35..044f0266eae4 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java @@ -25,6 +25,8 @@ import com.google.api.core.BetaApi; import com.google.api.core.SettableApiFuture; import com.google.api.gax.batching.BatchingSettings; +import com.google.api.gax.core.BackgroundResource; +import com.google.api.gax.core.BackgroundResourceAggregation; import com.google.api.gax.core.CredentialsProvider; import com.google.api.gax.core.ExecutorAsBackgroundResource; import com.google.api.gax.core.ExecutorProvider; @@ -47,7 +49,6 @@ import com.google.pubsub.v1.TopicNames; import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; import java.util.Iterator; import java.util.LinkedList; import java.util.List; @@ -94,7 +95,7 @@ public class Publisher { private final ScheduledExecutorService executor; private final AtomicBoolean shutdown; - private final List closeables; + private final BackgroundResource backgroundResources; private final MessageWaiter messagesWaiter; private ScheduledFuture currentAlarmFuture; private final ApiFunction messageTransform; @@ -119,11 +120,9 @@ private Publisher(Builder builder) throws IOException { messagesBatchLock = new ReentrantLock(); activeAlarm = new AtomicBoolean(false); executor = builder.executorProvider.getExecutor(); + List backgroundResourceList = new ArrayList<>(); if (builder.executorProvider.shouldAutoClose()) { - closeables = - Collections.singletonList(new ExecutorAsBackgroundResource(executor)); - } else { - closeables = Collections.emptyList(); + backgroundResourceList.add(new ExecutorAsBackgroundResource(executor)); } // Publisher used to take maxAttempt == 0 to mean infinity, but to GAX it means don't retry. @@ -151,7 +150,8 @@ private Publisher(Builder builder) throws IOException { .setRetrySettings(retrySettings) .setBatchingSettings(BatchingSettings.newBuilder().setIsEnabled(false).build()); this.publisherStub = GrpcPublisherStub.create(stubSettings.build()); - + backgroundResourceList.add(publisherStub); + backgroundResources = new BackgroundResourceAggregation(backgroundResourceList); shutdown = new AtomicBoolean(false); messagesWaiter = new MessageWaiter(); } @@ -397,11 +397,7 @@ public void shutdown() throws Exception { currentAlarmFuture.cancel(false); } publishAllOutstanding(); - messagesWaiter.waitNoMessages(); - for (AutoCloseable closeable : closeables) { - closeable.close(); - } - publisherStub.shutdown(); + backgroundResources.shutdown(); } /** @@ -411,7 +407,7 @@ public void shutdown() throws Exception { *

Call this method to make sure all resources are freed properly. */ public boolean awaitTermination(long duration, TimeUnit unit) throws InterruptedException { - return publisherStub.awaitTermination(duration, unit); + return backgroundResources.awaitTermination(duration, unit); } private boolean hasBatchingBytes() { diff --git a/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java b/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java index fc34e5f890a2..acbc82c95c41 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java +++ b/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java @@ -43,6 +43,7 @@ import io.grpc.inprocess.InProcessServerBuilder; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import org.easymock.EasyMock; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -570,6 +571,40 @@ public void testBuilderInvalidArguments() { } } + @Test + public void testAwaitTermination() throws Exception { + Publisher publisher = + getTestPublisherBuilder() + .setExecutorProvider(SINGLE_THREAD_EXECUTOR) + .setRetrySettings( + Publisher.Builder.DEFAULT_RETRY_SETTINGS + .toBuilder() + .setTotalTimeout(Duration.ofSeconds(10)) + .setMaxAttempts(0) + .build()) + .build(); + ApiFuture publishFuture1 = sendTestMessage(publisher, "A"); + publisher.shutdown(); + assertTrue(publisher.awaitTermination(1, TimeUnit.MINUTES)); + } + + @Test + public void testShutDown() throws Exception { + ApiFuture apiFuture = EasyMock.mock(ApiFuture.class); + Publisher publisher = EasyMock.mock(Publisher.class); + EasyMock.expect( + publisher.publish( + PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8("A")).build())) + .andReturn(apiFuture); + EasyMock.expect(publisher.awaitTermination(1, TimeUnit.MINUTES)).andReturn(true); + publisher.shutdown(); + EasyMock.expectLastCall().once(); + EasyMock.replay(publisher); + sendTestMessage(publisher, "A"); + publisher.shutdown(); + assertTrue(publisher.awaitTermination(1, TimeUnit.MINUTES)); + } + private Builder getTestPublisherBuilder() { return Publisher.newBuilder(TEST_TOPIC) .setExecutorProvider(FixedExecutorProvider.create(fakeExecutor))