diff --git a/src/main/java/com/google/api/gax/bundling/BundlingFlowController.java b/src/main/java/com/google/api/gax/bundling/BundlingFlowController.java deleted file mode 100644 index ca71ca859..000000000 --- a/src/main/java/com/google/api/gax/bundling/BundlingFlowController.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Copyright 2017, Google Inc. All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - */ -package com.google.api.gax.bundling; - -import com.google.api.gax.core.FlowController; -import com.google.api.gax.core.FlowController.FlowControlException; -import com.google.common.base.Preconditions; -import com.google.common.primitives.Ints; - -/** Wraps a {@link FlowController} for use by Bundling. */ -public class BundlingFlowController { - - private final FlowController flowController; - private final ElementCounter elementCounter; - private final ElementCounter byteCounter; - - public BundlingFlowController( - FlowController flowController, - ElementCounter elementCounter, - ElementCounter byteCounter) { - this.flowController = flowController; - this.elementCounter = elementCounter; - this.byteCounter = byteCounter; - } - - public void reserve(T bundle) throws FlowControlException { - Preconditions.checkNotNull(bundle); - int elements = Ints.checkedCast(elementCounter.count(bundle)); - int bytes = Ints.checkedCast(byteCounter.count(bundle)); - flowController.reserve(elements, bytes); - } - - public void release(T bundle) { - Preconditions.checkNotNull(bundle); - int elements = Ints.checkedCast(elementCounter.count(bundle)); - int bytes = Ints.checkedCast(byteCounter.count(bundle)); - flowController.release(elements, bytes); - } -} diff --git a/src/main/java/com/google/api/gax/bundling/ThresholdBundler.java b/src/main/java/com/google/api/gax/bundling/ThresholdBundler.java index 2c7526a6a..1164aae6c 100644 --- a/src/main/java/com/google/api/gax/bundling/ThresholdBundler.java +++ b/src/main/java/com/google/api/gax/bundling/ThresholdBundler.java @@ -29,7 +29,6 @@ */ package com.google.api.gax.bundling; -import com.google.api.gax.core.FlowController.FlowControlException; import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; import com.google.common.collect.ImmutableList; @@ -51,20 +50,15 @@ public final class ThresholdBundler { private ImmutableList> thresholdPrototypes; private final Duration maxDelay; - private final BundlingFlowController flowController; private final Lock lock = new ReentrantLock(); private final Condition bundleCondition = lock.newCondition(); private Bundle currentOpenBundle; private List closedBundles = new ArrayList<>(); - private ThresholdBundler( - ImmutableList> thresholds, - Duration maxDelay, - BundlingFlowController flowController) { + private ThresholdBundler(ImmutableList> thresholds, Duration maxDelay) { this.thresholdPrototypes = copyResetThresholds(Preconditions.checkNotNull(thresholds)); this.maxDelay = maxDelay; - this.flowController = Preconditions.checkNotNull(flowController); this.currentOpenBundle = null; } @@ -74,7 +68,6 @@ private ThresholdBundler( public static final class Builder { private List> thresholds; private Duration maxDelay; - private BundlingFlowController flowController; private Builder() { thresholds = Lists.newArrayList(); @@ -104,15 +97,11 @@ public Builder addThreshold(BundlingThreshold threshold) { return this; } - /** Set the flow controller for the ThresholdBundler. */ - public Builder setFlowController(BundlingFlowController flowController) { - this.flowController = flowController; - return this; - } - - /** Build the ThresholdBundler. */ + /** + * Build the ThresholdBundler. + */ public ThresholdBundler build() { - return new ThresholdBundler(ImmutableList.copyOf(thresholds), maxDelay, flowController); + return new ThresholdBundler(ImmutableList.copyOf(thresholds), maxDelay); } } @@ -126,14 +115,9 @@ public static Builder newBuilder() { /** * Adds an element to the bundler. If the element causes the collection to go past any of the * thresholds, the bundle will be made available to consumers. - * - * @throws FlowControlException */ - public void add(E e) throws FlowControlException { + public void add(E e) { final Lock lock = this.lock; - // We need to reserve resources from flowController outside the lock, so that they can be - // released by drainNextBundleTo(). - flowController.reserve(e); lock.lock(); try { boolean signalBundleIsReady = false; @@ -195,11 +179,7 @@ public int drainNextBundleTo(Collection outputCollection) { } if (outBundle != null) { - List data = outBundle.getData(); - for (E e : data) { - flowController.release(e); - } - outputCollection.addAll(data); + outputCollection.addAll(outBundle.getData()); return outputCollection.size(); } else { return 0; diff --git a/src/main/java/com/google/api/gax/bundling/ThresholdBundlingForwarder.java b/src/main/java/com/google/api/gax/bundling/ThresholdBundlingForwarder.java index 383fd6793..f8940344a 100644 --- a/src/main/java/com/google/api/gax/bundling/ThresholdBundlingForwarder.java +++ b/src/main/java/com/google/api/gax/bundling/ThresholdBundlingForwarder.java @@ -29,7 +29,6 @@ */ package com.google.api.gax.bundling; -import com.google.api.gax.core.FlowController.FlowControlException; import java.util.ArrayList; import java.util.List; @@ -66,10 +65,8 @@ public void start() { /** * First validates that the receiver can receive the given item (based on the inherent * characteristics of the item), and then hands it off to the bundler. - * - * @throws FlowControlException */ - public void addToNextBundle(T item) throws FlowControlException { + public void addToNextBundle(T item) { bundleReceiver.validateItem(item); bundler.add(item); } diff --git a/src/main/java/com/google/api/gax/grpc/BundlerFactory.java b/src/main/java/com/google/api/gax/grpc/BundlerFactory.java index 00212cfd1..7b7d26983 100644 --- a/src/main/java/com/google/api/gax/grpc/BundlerFactory.java +++ b/src/main/java/com/google/api/gax/grpc/BundlerFactory.java @@ -29,16 +29,11 @@ */ package com.google.api.gax.grpc; -import com.google.api.gax.bundling.BundlingFlowController; -import com.google.api.gax.bundling.BundlingSettings; import com.google.api.gax.bundling.BundlingThreshold; import com.google.api.gax.bundling.ElementCounter; import com.google.api.gax.bundling.NumericThreshold; import com.google.api.gax.bundling.ThresholdBundler; import com.google.api.gax.bundling.ThresholdBundlingForwarder; -import com.google.api.gax.core.FlowControlSettings; -import com.google.api.gax.core.FlowController; -import com.google.api.gax.core.FlowController.LimitExceededBehavior; import com.google.common.collect.ImmutableList; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -55,7 +50,6 @@ public final class BundlerFactory implements AutoCloseable private final Map>> forwarders = new ConcurrentHashMap<>(); private final BundlingDescriptor bundlingDescriptor; - private final FlowController flowController; private final BundlingSettings bundlingSettings; private final Object lock = new Object(); @@ -64,13 +58,6 @@ public BundlerFactory( BundlingSettings bundlingSettings) { this.bundlingDescriptor = bundlingDescriptor; this.bundlingSettings = bundlingSettings; - this.flowController = - new FlowController( - bundlingSettings.getFlowControlSettings() != null - ? bundlingSettings.getFlowControlSettings() - : FlowControlSettings.newBuilder() - .setLimitExceededBehavior(LimitExceededBehavior.Ignore) - .build()); } /** @@ -110,31 +97,12 @@ private ThresholdBundlingForwarder> createF ThresholdBundler.>newBuilder() .setThresholds(getThresholds(bundlingSettings)) .setMaxDelay(bundlingSettings.getDelayThreshold()) - .setFlowController(createBundlingFlowController()) .build(); BundleExecutor processor = new BundleExecutor<>(bundlingDescriptor, partitionKey); return new ThresholdBundlingForwarder<>(bundler, processor); } - private BundlingFlowController> - createBundlingFlowController() { - return new BundlingFlowController>( - flowController, - new ElementCounter>() { - @Override - public long count(BundlingContext bundlablePublish) { - return bundlingDescriptor.countElements(bundlablePublish.getRequest()); - } - }, - new ElementCounter>() { - @Override - public long count(BundlingContext bundlablePublish) { - return bundlingDescriptor.countBytes(bundlablePublish.getRequest()); - } - }); - } - @Override public void close() { synchronized (lock) { diff --git a/src/main/java/com/google/api/gax/grpc/BundlingCallSettings.java b/src/main/java/com/google/api/gax/grpc/BundlingCallSettings.java index 17df7e208..92f754660 100644 --- a/src/main/java/com/google/api/gax/grpc/BundlingCallSettings.java +++ b/src/main/java/com/google/api/gax/grpc/BundlingCallSettings.java @@ -29,7 +29,6 @@ */ package com.google.api.gax.grpc; -import com.google.api.gax.bundling.BundlingSettings; import com.google.api.gax.core.RetrySettings; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableSet; diff --git a/src/main/java/com/google/api/gax/grpc/BundlingCallable.java b/src/main/java/com/google/api/gax/grpc/BundlingCallable.java index cb7e3b35f..ebfd4253e 100644 --- a/src/main/java/com/google/api/gax/grpc/BundlingCallable.java +++ b/src/main/java/com/google/api/gax/grpc/BundlingCallable.java @@ -30,8 +30,6 @@ package com.google.api.gax.grpc; import com.google.api.gax.bundling.ThresholdBundlingForwarder; -import com.google.api.gax.core.FlowController.FlowControlException; -import com.google.api.gax.core.FlowController.FlowControlRuntimeException; import com.google.api.gax.core.RpcFuture; import com.google.common.base.Preconditions; @@ -68,12 +66,8 @@ public RpcFuture futureCall(RequestT request, CallContext context) { String partitionKey = bundlingDescriptor.getBundlePartitionKey(request); ThresholdBundlingForwarder> forwarder = bundlerFactory.getForwarder(partitionKey); - try { - forwarder.addToNextBundle(bundlableMessage); - return result; - } catch (FlowControlException e) { - throw FlowControlRuntimeException.fromFlowControlException(e); - } + forwarder.addToNextBundle(bundlableMessage); + return result; } else { return callable.futureCall(request, context); } diff --git a/src/main/java/com/google/api/gax/bundling/BundlingSettings.java b/src/main/java/com/google/api/gax/grpc/BundlingSettings.java similarity index 89% rename from src/main/java/com/google/api/gax/bundling/BundlingSettings.java rename to src/main/java/com/google/api/gax/grpc/BundlingSettings.java index b002e5d9d..7af88ca8f 100644 --- a/src/main/java/com/google/api/gax/bundling/BundlingSettings.java +++ b/src/main/java/com/google/api/gax/grpc/BundlingSettings.java @@ -27,9 +27,8 @@ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ -package com.google.api.gax.bundling; +package com.google.api.gax.grpc; -import com.google.api.gax.core.FlowControlSettings; import com.google.auto.value.AutoValue; import com.google.common.base.Preconditions; import javax.annotation.Nullable; @@ -86,12 +85,6 @@ * will not bundle together requests in the resulting bundle will surpass the limit. Thus, a bundle * can be sent that is actually under the threshold if the next request would put the combined * request over the limit. - * - *

- * Bundling also supports FlowControl. This can be used to prevent the bundling implementation from - * accumulating messages without limit, resulting eventually in an OutOfMemory exception. This can - * occur if messages are created and added to bundling faster than they can be processed. The flow - * control behavior is controlled using FlowControlSettings. */ @AutoValue public abstract class BundlingSettings { @@ -110,14 +103,9 @@ public abstract class BundlingSettings { /** Returns the Boolean object to indicate if the bundling is enabled. Default to true */ public abstract Boolean getIsEnabled(); - /** Get the flow control settings to use. */ - public abstract FlowControlSettings getFlowControlSettings(); - /** Get a new builder. */ public static Builder newBuilder() { - return new AutoValue_BundlingSettings.Builder() - .setIsEnabled(true) - .setFlowControlSettings(FlowControlSettings.getDefaultInstance()); + return new AutoValue_BundlingSettings.Builder().setIsEnabled(true); } /** Get a builder with the same values as this object. */ @@ -167,9 +155,6 @@ public Builder setRequestByteThreshold(Integer requestByteThreshold) { */ public abstract Builder setIsEnabled(Boolean enabled); - /** Set the flow control settings to be used. */ - public abstract Builder setFlowControlSettings(FlowControlSettings flowControlSettings); - abstract BundlingSettings autoBuild(); /** Build the BundlingSettings object. */ diff --git a/src/main/java/com/google/api/gax/core/FlowControlSettings.java b/src/main/java/com/google/api/gax/grpc/FlowControlSettings.java similarity index 77% rename from src/main/java/com/google/api/gax/core/FlowControlSettings.java rename to src/main/java/com/google/api/gax/grpc/FlowControlSettings.java index 8a23ea124..6b56f5de8 100644 --- a/src/main/java/com/google/api/gax/core/FlowControlSettings.java +++ b/src/main/java/com/google/api/gax/grpc/FlowControlSettings.java @@ -27,12 +27,11 @@ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ -package com.google.api.gax.core; +package com.google.api.gax.grpc; -import com.google.api.gax.core.FlowController.FlowControlException; -import com.google.api.gax.core.FlowController.LimitExceededBehavior; import com.google.auto.value.AutoValue; import com.google.common.base.Preconditions; +import java.util.concurrent.Semaphore; import javax.annotation.Nullable; /** Settings for {@link FlowController}. */ @@ -50,30 +49,12 @@ public static FlowControlSettings getDefaultInstance() { @Nullable public abstract Integer getMaxOutstandingRequestBytes(); - /** - * The behavior of {@link FlowController} when the specified limits are exceeded. Defaults to - * Block. - * - *

- * The expected behavior for each of these values is: - * - *

    - *
  • ThrowException: the FlowController will throw a {@link FlowControlException} if any of the - * limits are exceeded. - *
  • Block: the reserve() method of FlowController will block until the quote is available to be - * reserved. - *
  • Ignore: all flow control limits will be ignored; the FlowController is disabled. - *
- */ - public abstract LimitExceededBehavior getLimitExceededBehavior(); - public Builder toBuilder() { return new AutoValue_FlowControlSettings.Builder(this); } public static Builder newBuilder() { - return new AutoValue_FlowControlSettings.Builder() - .setLimitExceededBehavior(LimitExceededBehavior.Block); + return new AutoValue_FlowControlSettings.Builder(); } @AutoValue.Builder @@ -82,8 +63,6 @@ public abstract static class Builder { public abstract Builder setMaxOutstandingRequestBytes(Integer value); - public abstract Builder setLimitExceededBehavior(LimitExceededBehavior value); - abstract FlowControlSettings autoBuild(); public FlowControlSettings build() { diff --git a/src/main/java/com/google/api/gax/core/FlowController.java b/src/main/java/com/google/api/gax/grpc/FlowController.java similarity index 80% rename from src/main/java/com/google/api/gax/core/FlowController.java rename to src/main/java/com/google/api/gax/grpc/FlowController.java index c5f864006..c00e2d0f5 100644 --- a/src/main/java/com/google/api/gax/core/FlowController.java +++ b/src/main/java/com/google/api/gax/grpc/FlowController.java @@ -27,8 +27,9 @@ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ -package com.google.api.gax.core; +package com.google.api.gax.grpc; +import com.google.auto.value.AutoValue; import com.google.common.base.Preconditions; import java.util.concurrent.Semaphore; import javax.annotation.Nullable; @@ -40,20 +41,6 @@ public abstract static class FlowControlException extends Exception { private FlowControlException() {} } - /** - * Runtime exception that can be used in place of FlowControlException when an unchecked exception - * is required. - */ - public static class FlowControlRuntimeException extends RuntimeException { - private FlowControlRuntimeException(FlowControlException e) { - super(e); - } - - public static FlowControlRuntimeException fromFlowControlException(FlowControlException e) { - return new FlowControlRuntimeException(e); - } - } - /** * Exception thrown when client-side flow control is enforced based on the maximum number of * outstanding in-memory elements. @@ -100,47 +87,20 @@ public String toString() { } } - /** - * Enumeration of behaviors that FlowController can use in case the flow control limits are - * exceeded. - */ - public enum LimitExceededBehavior { - ThrowException, - Block, - Ignore, - } - @Nullable private final Semaphore outstandingElementCount; @Nullable private final Semaphore outstandingByteCount; private final boolean failOnLimits; @Nullable private final Integer maxOutstandingElementCount; @Nullable private final Integer maxOutstandingRequestBytes; - public FlowController(FlowControlSettings settings) { - switch (settings.getLimitExceededBehavior()) { - case ThrowException: - this.failOnLimits = true; - break; - case Block: - this.failOnLimits = false; - break; - case Ignore: - this.failOnLimits = false; - this.maxOutstandingElementCount = null; - this.maxOutstandingRequestBytes = null; - this.outstandingElementCount = null; - this.outstandingByteCount = null; - return; - default: - throw new IllegalArgumentException( - "Unknown LimitBehaviour: " + settings.getLimitExceededBehavior()); - } + public FlowController(FlowControlSettings settings, boolean failOnFlowControlLimits) { this.maxOutstandingElementCount = settings.getMaxOutstandingElementCount(); this.maxOutstandingRequestBytes = settings.getMaxOutstandingRequestBytes(); outstandingElementCount = maxOutstandingElementCount != null ? new Semaphore(maxOutstandingElementCount) : null; outstandingByteCount = maxOutstandingRequestBytes != null ? new Semaphore(maxOutstandingRequestBytes) : null; + this.failOnLimits = failOnFlowControlLimits; } public void reserve(int elements, int bytes) throws FlowControlException { diff --git a/src/main/java/com/google/api/gax/grpc/UnaryCallable.java b/src/main/java/com/google/api/gax/grpc/UnaryCallable.java index 4ca760cdf..f98b23dc3 100644 --- a/src/main/java/com/google/api/gax/grpc/UnaryCallable.java +++ b/src/main/java/com/google/api/gax/grpc/UnaryCallable.java @@ -181,8 +181,8 @@ public static UnaryCallable BundlingFlowController getDisabledBundlingFlowController() { - return new BundlingFlowController<>( - getDisabledFlowController(), - new ElementCounter() { - @Override - public long count(T t) { - return 1; - } - }, - new ElementCounter() { - @Override - public long count(T t) { - return 1; - } - }); - } - - private static BundlingFlowController getIntegerBundlingFlowController( - Integer elementCount, Integer byteCount, LimitExceededBehavior limitExceededBehaviour) { - return new BundlingFlowController<>( - new FlowController( - FlowControlSettings.newBuilder() - .setMaxOutstandingElementCount(elementCount) - .setMaxOutstandingRequestBytes(byteCount) - .setLimitExceededBehavior(limitExceededBehaviour) - .build()), - new ElementCounter() { - @Override - public long count(Integer t) { - return 1; - } - }, - new ElementCounter() { - @Override - public long count(Integer t) { - return t; - } - }); - } - - @Rule public ExpectedException thrown = ExpectedException.none(); - @Test public void testEmptyAddAndDrain() { ThresholdBundler bundler = ThresholdBundler.newBuilder() .setThresholds(BundlingThresholds.of(5)) - .setFlowController(ThresholdBundlerTest.getDisabledBundlingFlowController()) .build(); List resultBundle = new ArrayList<>(); Truth.assertThat(bundler.isEmpty()).isTrue(); @@ -109,11 +54,10 @@ public void testEmptyAddAndDrain() { } @Test - public void testAddAndDrain() throws FlowControlException { + public void testAddAndDrain() { ThresholdBundler bundler = ThresholdBundler.newBuilder() .setThresholds(BundlingThresholds.of(5)) - .setFlowController(ThresholdBundlerTest.getDisabledBundlingFlowController()) .build(); bundler.add(14); Truth.assertThat(bundler.isEmpty()).isFalse(); @@ -135,7 +79,6 @@ public void testBundling() throws Exception { ThresholdBundler bundler = ThresholdBundler.newBuilder() .setThresholds(BundlingThresholds.of(2)) - .setFlowController(ThresholdBundlerTest.getDisabledBundlingFlowController()) .build(); AccumulatingBundleReceiver receiver = new AccumulatingBundleReceiver(); ThresholdBundlingForwarder forwarder = @@ -167,10 +110,7 @@ public void testBundling() throws Exception { @Test public void testBundlingWithDelay() throws Exception { ThresholdBundler bundler = - ThresholdBundler.newBuilder() - .setMaxDelay(Duration.millis(100)) - .setFlowController(ThresholdBundlerTest.getDisabledBundlingFlowController()) - .build(); + ThresholdBundler.newBuilder().setMaxDelay(Duration.millis(100)).build(); AccumulatingBundleReceiver receiver = new AccumulatingBundleReceiver(); ThresholdBundlingForwarder forwarder = new ThresholdBundlingForwarder(bundler, receiver); @@ -197,7 +137,6 @@ public void testFlush() throws Exception { ThresholdBundler bundler = ThresholdBundler.newBuilder() .setThresholds(BundlingThresholds.of(2)) - .setFlowController(ThresholdBundlerTest.getDisabledBundlingFlowController()) .build(); AccumulatingBundleReceiver receiver = new AccumulatingBundleReceiver(); ThresholdBundlingForwarder forwarder = @@ -227,75 +166,14 @@ public void testFlush() throws Exception { Truth.assertThat(receiver.getBundles()).isEqualTo(expected); } - @Test - public void testExceptionWithNullFlowController() { - thrown.expect(NullPointerException.class); - ThresholdBundler.newBuilder().build(); - } - - @Test - public void testBundlingWithFlowControl() throws Exception { - ThresholdBundler bundler = - ThresholdBundler.newBuilder() - .setThresholds(BundlingThresholds.of(2)) - .setFlowController( - getIntegerBundlingFlowController(3, null, LimitExceededBehavior.Block)) - .build(); - AccumulatingBundleReceiver receiver = new AccumulatingBundleReceiver(); - ThresholdBundlingForwarder forwarder = - new ThresholdBundlingForwarder(bundler, receiver); - - try { - forwarder.start(); - bundler.add(3); - bundler.add(5); - bundler.add(7); - bundler.add(9); // We expect to block here until the first bundle is handled - bundler.add(11); - - } finally { - forwarder.close(); - } - - List> expected = - Arrays.asList(Arrays.asList(3, 5), Arrays.asList(7, 9), Arrays.asList(11)); - Truth.assertThat(receiver.getBundles()).isEqualTo(expected); - } - - @Test - public void testBundlingFlowControlExceptionRecovery() throws Exception { - ThresholdBundler bundler = - ThresholdBundler.newBuilder() - .setThresholds(BundlingThresholds.of(2)) - .setFlowController( - getIntegerBundlingFlowController(3, null, LimitExceededBehavior.ThrowException)) - .build(); - AccumulatingBundleReceiver receiver = new AccumulatingBundleReceiver(); - ThresholdBundlingForwarder forwarder = - new ThresholdBundlingForwarder(bundler, receiver); - - try { - // Note: do not start the forwarder here, otherwise we have a race condition in the test - // between whether bundler.add(9) executes before the first bundle is processed. - bundler.add(3); - bundler.add(5); - bundler.add(7); - try { - bundler.add(9); - } catch (FlowControlException e) { - } - forwarder.start(); - // Give time for the forwarder thread to catch the bundle - Thread.sleep(100); - bundler.add(11); - bundler.add(13); - - } finally { - forwarder.close(); - } - - List> expected = - Arrays.asList(Arrays.asList(3, 5), Arrays.asList(7, 11), Arrays.asList(13)); - Truth.assertThat(receiver.getBundles()).isEqualTo(expected); + private BundlingThreshold createValueThreshold(long threshold) { + return new NumericThreshold( + threshold, + new ElementCounter() { + @Override + public long count(Integer value) { + return value; + } + }); } } diff --git a/src/test/java/com/google/api/gax/core/FlowControllerTest.java b/src/test/java/com/google/api/gax/grpc/FlowControllerTest.java similarity index 83% rename from src/test/java/com/google/api/gax/core/FlowControllerTest.java rename to src/test/java/com/google/api/gax/grpc/FlowControllerTest.java index 64d18d533..8b263a46d 100644 --- a/src/test/java/com/google/api/gax/core/FlowControllerTest.java +++ b/src/test/java/com/google/api/gax/grpc/FlowControllerTest.java @@ -27,12 +27,11 @@ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ -package com.google.api.gax.core; +package com.google.api.gax.grpc; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import com.google.api.gax.core.FlowController.LimitExceededBehavior; import com.google.common.util.concurrent.SettableFuture; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -51,8 +50,8 @@ public void testReserveRelease_ok() throws Exception { FlowControlSettings.newBuilder() .setMaxOutstandingElementCount(10) .setMaxOutstandingRequestBytes(10) - .setLimitExceededBehavior(LimitExceededBehavior.Block) - .build()); + .build(), + false); flowController.reserve(1, 1); flowController.release(1, 1); @@ -65,8 +64,8 @@ public void testInvalidArguments() throws Exception { FlowControlSettings.newBuilder() .setMaxOutstandingElementCount(10) .setMaxOutstandingRequestBytes(10) - .setLimitExceededBehavior(LimitExceededBehavior.Block) - .build()); + .build(), + false); flowController.reserve(1, 0); try { @@ -96,22 +95,8 @@ public void testReserveRelease_noLimits_ok() throws Exception { FlowControlSettings.newBuilder() .setMaxOutstandingElementCount(null) .setMaxOutstandingRequestBytes(null) - .setLimitExceededBehavior(LimitExceededBehavior.Block) - .build()); - - flowController.reserve(1, 1); - flowController.release(1, 1); - } - - @Test - public void testReserveRelease_ignore_ok() throws Exception { - FlowController flowController = - new FlowController( - FlowControlSettings.newBuilder() - .setMaxOutstandingElementCount(1) - .setMaxOutstandingRequestBytes(1) - .setLimitExceededBehavior(LimitExceededBehavior.Ignore) - .build()); + .build(), + false); flowController.reserve(1, 1); flowController.release(1, 1); @@ -124,8 +109,8 @@ public void testReserveRelease_blockedByElementCount() throws Exception { FlowControlSettings.newBuilder() .setMaxOutstandingElementCount(10) .setMaxOutstandingRequestBytes(100) - .setLimitExceededBehavior(LimitExceededBehavior.Block) - .build()); + .build(), + false); testBlockingReserveRelease(flowController, 10, 10); } @@ -137,8 +122,8 @@ public void testReserveRelease_blockedByElementCount_noBytesLimit() throws Excep FlowControlSettings.newBuilder() .setMaxOutstandingElementCount(10) .setMaxOutstandingRequestBytes(null) - .setLimitExceededBehavior(LimitExceededBehavior.Block) - .build()); + .build(), + false); testBlockingReserveRelease(flowController, 10, 10); } @@ -150,8 +135,8 @@ public void testReserveRelease_blockedByNumberOfBytes() throws Exception { FlowControlSettings.newBuilder() .setMaxOutstandingElementCount(100) .setMaxOutstandingRequestBytes(10) - .setLimitExceededBehavior(LimitExceededBehavior.Block) - .build()); + .build(), + false); testBlockingReserveRelease(flowController, 10, 10); } @@ -163,8 +148,8 @@ public void testReserveRelease_blockedByNumberOfBytes_noElementCountLimit() thro FlowControlSettings.newBuilder() .setMaxOutstandingElementCount(null) .setMaxOutstandingRequestBytes(10) - .setLimitExceededBehavior(LimitExceededBehavior.Block) - .build()); + .build(), + false); testBlockingReserveRelease(flowController, 10, 10); } @@ -204,8 +189,8 @@ public void testReserveRelease_rejectedByElementCount() throws Exception { FlowControlSettings.newBuilder() .setMaxOutstandingElementCount(10) .setMaxOutstandingRequestBytes(100) - .setLimitExceededBehavior(LimitExceededBehavior.ThrowException) - .build()); + .build(), + true); testRejectedReserveRelease( flowController, 10, 10, FlowController.MaxOutstandingElementCountReachedException.class); @@ -218,8 +203,8 @@ public void testReserveRelease_rejectedByElementCount_noBytesLimit() throws Exce FlowControlSettings.newBuilder() .setMaxOutstandingElementCount(10) .setMaxOutstandingRequestBytes(null) - .setLimitExceededBehavior(LimitExceededBehavior.ThrowException) - .build()); + .build(), + true); testRejectedReserveRelease( flowController, 10, 10, FlowController.MaxOutstandingElementCountReachedException.class); @@ -232,8 +217,8 @@ public void testReserveRelease_rejectedByNumberOfBytes() throws Exception { FlowControlSettings.newBuilder() .setMaxOutstandingElementCount(100) .setMaxOutstandingRequestBytes(10) - .setLimitExceededBehavior(LimitExceededBehavior.ThrowException) - .build()); + .build(), + true); testRejectedReserveRelease( flowController, 10, 10, FlowController.MaxOutstandingRequestBytesReachedException.class); @@ -246,8 +231,8 @@ public void testReserveRelease_rejectedByNumberOfBytes_noElementCountLimit() thr FlowControlSettings.newBuilder() .setMaxOutstandingElementCount(null) .setMaxOutstandingRequestBytes(10) - .setLimitExceededBehavior(LimitExceededBehavior.ThrowException) - .build()); + .build(), + true); testRejectedReserveRelease( flowController, 10, 10, FlowController.MaxOutstandingRequestBytesReachedException.class); diff --git a/src/test/java/com/google/api/gax/grpc/SettingsTest.java b/src/test/java/com/google/api/gax/grpc/SettingsTest.java index 90b1ecc67..5213739c8 100644 --- a/src/test/java/com/google/api/gax/grpc/SettingsTest.java +++ b/src/test/java/com/google/api/gax/grpc/SettingsTest.java @@ -29,7 +29,6 @@ */ package com.google.api.gax.grpc; -import com.google.api.gax.bundling.BundlingSettings; import com.google.api.gax.core.CredentialsProvider; import com.google.api.gax.core.FixedCredentialsProvider; import com.google.api.gax.core.GoogleCredentialsProvider; diff --git a/src/test/java/com/google/api/gax/grpc/UnaryCallableTest.java b/src/test/java/com/google/api/gax/grpc/UnaryCallableTest.java index fd8c81a24..70cd2d4ce 100644 --- a/src/test/java/com/google/api/gax/grpc/UnaryCallableTest.java +++ b/src/test/java/com/google/api/gax/grpc/UnaryCallableTest.java @@ -29,7 +29,6 @@ */ package com.google.api.gax.grpc; -import com.google.api.gax.bundling.BundlingSettings; import com.google.api.gax.core.FixedSizeCollection; import com.google.api.gax.core.Page; import com.google.api.gax.core.RetrySettings;