From ec2e656e32d4ad88d1b12750b822c0aa023b767d Mon Sep 17 00:00:00 2001 From: michaelbausor Date: Wed, 1 Mar 2017 08:14:11 -0800 Subject: [PATCH] Add bundling flow control (#213) - Add FlowController into bundling - Move FlowController into core - Move BundlingSettings in bundling - Add FlowControllerRuntimeException --- .../gax/bundling/BundlingFlowController.java | 66 ++++++++ .../{grpc => bundling}/BundlingSettings.java | 19 ++- .../api/gax/bundling/ThresholdBundler.java | 34 +++- .../bundling/ThresholdBundlingForwarder.java | 5 +- .../{grpc => core}/FlowControlSettings.java | 27 +++- .../gax/{grpc => core}/FlowController.java | 48 +++++- .../google/api/gax/grpc/BundlerFactory.java | 32 ++++ .../api/gax/grpc/BundlingCallSettings.java | 1 + .../google/api/gax/grpc/BundlingCallable.java | 10 +- .../google/api/gax/grpc/UnaryCallable.java | 4 +- .../gax/bundling/ThresholdBundlerTest.java | 146 ++++++++++++++++-- .../{grpc => core}/FlowControllerTest.java | 61 +++++--- .../com/google/api/gax/grpc/SettingsTest.java | 1 + .../api/gax/grpc/UnaryCallableTest.java | 1 + 14 files changed, 399 insertions(+), 56 deletions(-) create mode 100644 src/main/java/com/google/api/gax/bundling/BundlingFlowController.java rename src/main/java/com/google/api/gax/{grpc => bundling}/BundlingSettings.java (89%) rename src/main/java/com/google/api/gax/{grpc => core}/FlowControlSettings.java (77%) rename src/main/java/com/google/api/gax/{grpc => core}/FlowController.java (80%) rename src/test/java/com/google/api/gax/{grpc => core}/FlowControllerTest.java (83%) diff --git a/src/main/java/com/google/api/gax/bundling/BundlingFlowController.java b/src/main/java/com/google/api/gax/bundling/BundlingFlowController.java new file mode 100644 index 000000000..ca71ca859 --- /dev/null +++ b/src/main/java/com/google/api/gax/bundling/BundlingFlowController.java @@ -0,0 +1,66 @@ +/* + * Copyright 2017, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +package com.google.api.gax.bundling; + +import com.google.api.gax.core.FlowController; +import com.google.api.gax.core.FlowController.FlowControlException; +import com.google.common.base.Preconditions; +import com.google.common.primitives.Ints; + +/** Wraps a {@link FlowController} for use by Bundling. */ +public class BundlingFlowController { + + private final FlowController flowController; + private final ElementCounter elementCounter; + private final ElementCounter byteCounter; + + public BundlingFlowController( + FlowController flowController, + ElementCounter elementCounter, + ElementCounter byteCounter) { + this.flowController = flowController; + this.elementCounter = elementCounter; + this.byteCounter = byteCounter; + } + + public void reserve(T bundle) throws FlowControlException { + Preconditions.checkNotNull(bundle); + int elements = Ints.checkedCast(elementCounter.count(bundle)); + int bytes = Ints.checkedCast(byteCounter.count(bundle)); + flowController.reserve(elements, bytes); + } + + public void release(T bundle) { + Preconditions.checkNotNull(bundle); + int elements = Ints.checkedCast(elementCounter.count(bundle)); + int bytes = Ints.checkedCast(byteCounter.count(bundle)); + flowController.release(elements, bytes); + } +} diff --git a/src/main/java/com/google/api/gax/grpc/BundlingSettings.java b/src/main/java/com/google/api/gax/bundling/BundlingSettings.java similarity index 89% rename from src/main/java/com/google/api/gax/grpc/BundlingSettings.java rename to src/main/java/com/google/api/gax/bundling/BundlingSettings.java index 7af88ca8f..b002e5d9d 100644 --- a/src/main/java/com/google/api/gax/grpc/BundlingSettings.java +++ b/src/main/java/com/google/api/gax/bundling/BundlingSettings.java @@ -27,8 +27,9 @@ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ -package com.google.api.gax.grpc; +package com.google.api.gax.bundling; +import com.google.api.gax.core.FlowControlSettings; import com.google.auto.value.AutoValue; import com.google.common.base.Preconditions; import javax.annotation.Nullable; @@ -85,6 +86,12 @@ * will not bundle together requests in the resulting bundle will surpass the limit. Thus, a bundle * can be sent that is actually under the threshold if the next request would put the combined * request over the limit. + * + *

+ * Bundling also supports FlowControl. This can be used to prevent the bundling implementation from + * accumulating messages without limit, resulting eventually in an OutOfMemory exception. This can + * occur if messages are created and added to bundling faster than they can be processed. The flow + * control behavior is controlled using FlowControlSettings. */ @AutoValue public abstract class BundlingSettings { @@ -103,9 +110,14 @@ public abstract class BundlingSettings { /** Returns the Boolean object to indicate if the bundling is enabled. Default to true */ public abstract Boolean getIsEnabled(); + /** Get the flow control settings to use. */ + public abstract FlowControlSettings getFlowControlSettings(); + /** Get a new builder. */ public static Builder newBuilder() { - return new AutoValue_BundlingSettings.Builder().setIsEnabled(true); + return new AutoValue_BundlingSettings.Builder() + .setIsEnabled(true) + .setFlowControlSettings(FlowControlSettings.getDefaultInstance()); } /** Get a builder with the same values as this object. */ @@ -155,6 +167,9 @@ public Builder setRequestByteThreshold(Integer requestByteThreshold) { */ public abstract Builder setIsEnabled(Boolean enabled); + /** Set the flow control settings to be used. */ + public abstract Builder setFlowControlSettings(FlowControlSettings flowControlSettings); + abstract BundlingSettings autoBuild(); /** Build the BundlingSettings object. */ diff --git a/src/main/java/com/google/api/gax/bundling/ThresholdBundler.java b/src/main/java/com/google/api/gax/bundling/ThresholdBundler.java index 1164aae6c..2c7526a6a 100644 --- a/src/main/java/com/google/api/gax/bundling/ThresholdBundler.java +++ b/src/main/java/com/google/api/gax/bundling/ThresholdBundler.java @@ -29,6 +29,7 @@ */ package com.google.api.gax.bundling; +import com.google.api.gax.core.FlowController.FlowControlException; import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; import com.google.common.collect.ImmutableList; @@ -50,15 +51,20 @@ public final class ThresholdBundler { private ImmutableList> thresholdPrototypes; private final Duration maxDelay; + private final BundlingFlowController flowController; private final Lock lock = new ReentrantLock(); private final Condition bundleCondition = lock.newCondition(); private Bundle currentOpenBundle; private List closedBundles = new ArrayList<>(); - private ThresholdBundler(ImmutableList> thresholds, Duration maxDelay) { + private ThresholdBundler( + ImmutableList> thresholds, + Duration maxDelay, + BundlingFlowController flowController) { this.thresholdPrototypes = copyResetThresholds(Preconditions.checkNotNull(thresholds)); this.maxDelay = maxDelay; + this.flowController = Preconditions.checkNotNull(flowController); this.currentOpenBundle = null; } @@ -68,6 +74,7 @@ private ThresholdBundler(ImmutableList> thresholds, Duratio public static final class Builder { private List> thresholds; private Duration maxDelay; + private BundlingFlowController flowController; private Builder() { thresholds = Lists.newArrayList(); @@ -97,11 +104,15 @@ public Builder addThreshold(BundlingThreshold threshold) { return this; } - /** - * Build the ThresholdBundler. - */ + /** Set the flow controller for the ThresholdBundler. */ + public Builder setFlowController(BundlingFlowController flowController) { + this.flowController = flowController; + return this; + } + + /** Build the ThresholdBundler. */ public ThresholdBundler build() { - return new ThresholdBundler(ImmutableList.copyOf(thresholds), maxDelay); + return new ThresholdBundler(ImmutableList.copyOf(thresholds), maxDelay, flowController); } } @@ -115,9 +126,14 @@ public static Builder newBuilder() { /** * Adds an element to the bundler. If the element causes the collection to go past any of the * thresholds, the bundle will be made available to consumers. + * + * @throws FlowControlException */ - public void add(E e) { + public void add(E e) throws FlowControlException { final Lock lock = this.lock; + // We need to reserve resources from flowController outside the lock, so that they can be + // released by drainNextBundleTo(). + flowController.reserve(e); lock.lock(); try { boolean signalBundleIsReady = false; @@ -179,7 +195,11 @@ public int drainNextBundleTo(Collection outputCollection) { } if (outBundle != null) { - outputCollection.addAll(outBundle.getData()); + List data = outBundle.getData(); + for (E e : data) { + flowController.release(e); + } + outputCollection.addAll(data); return outputCollection.size(); } else { return 0; diff --git a/src/main/java/com/google/api/gax/bundling/ThresholdBundlingForwarder.java b/src/main/java/com/google/api/gax/bundling/ThresholdBundlingForwarder.java index f8940344a..383fd6793 100644 --- a/src/main/java/com/google/api/gax/bundling/ThresholdBundlingForwarder.java +++ b/src/main/java/com/google/api/gax/bundling/ThresholdBundlingForwarder.java @@ -29,6 +29,7 @@ */ package com.google.api.gax.bundling; +import com.google.api.gax.core.FlowController.FlowControlException; import java.util.ArrayList; import java.util.List; @@ -65,8 +66,10 @@ public void start() { /** * First validates that the receiver can receive the given item (based on the inherent * characteristics of the item), and then hands it off to the bundler. + * + * @throws FlowControlException */ - public void addToNextBundle(T item) { + public void addToNextBundle(T item) throws FlowControlException { bundleReceiver.validateItem(item); bundler.add(item); } diff --git a/src/main/java/com/google/api/gax/grpc/FlowControlSettings.java b/src/main/java/com/google/api/gax/core/FlowControlSettings.java similarity index 77% rename from src/main/java/com/google/api/gax/grpc/FlowControlSettings.java rename to src/main/java/com/google/api/gax/core/FlowControlSettings.java index 6b56f5de8..8a23ea124 100644 --- a/src/main/java/com/google/api/gax/grpc/FlowControlSettings.java +++ b/src/main/java/com/google/api/gax/core/FlowControlSettings.java @@ -27,11 +27,12 @@ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ -package com.google.api.gax.grpc; +package com.google.api.gax.core; +import com.google.api.gax.core.FlowController.FlowControlException; +import com.google.api.gax.core.FlowController.LimitExceededBehavior; import com.google.auto.value.AutoValue; import com.google.common.base.Preconditions; -import java.util.concurrent.Semaphore; import javax.annotation.Nullable; /** Settings for {@link FlowController}. */ @@ -49,12 +50,30 @@ public static FlowControlSettings getDefaultInstance() { @Nullable public abstract Integer getMaxOutstandingRequestBytes(); + /** + * The behavior of {@link FlowController} when the specified limits are exceeded. Defaults to + * Block. + * + *

+ * The expected behavior for each of these values is: + * + *

    + *
  • ThrowException: the FlowController will throw a {@link FlowControlException} if any of the + * limits are exceeded. + *
  • Block: the reserve() method of FlowController will block until the quote is available to be + * reserved. + *
  • Ignore: all flow control limits will be ignored; the FlowController is disabled. + *
+ */ + public abstract LimitExceededBehavior getLimitExceededBehavior(); + public Builder toBuilder() { return new AutoValue_FlowControlSettings.Builder(this); } public static Builder newBuilder() { - return new AutoValue_FlowControlSettings.Builder(); + return new AutoValue_FlowControlSettings.Builder() + .setLimitExceededBehavior(LimitExceededBehavior.Block); } @AutoValue.Builder @@ -63,6 +82,8 @@ public abstract static class Builder { public abstract Builder setMaxOutstandingRequestBytes(Integer value); + public abstract Builder setLimitExceededBehavior(LimitExceededBehavior value); + abstract FlowControlSettings autoBuild(); public FlowControlSettings build() { diff --git a/src/main/java/com/google/api/gax/grpc/FlowController.java b/src/main/java/com/google/api/gax/core/FlowController.java similarity index 80% rename from src/main/java/com/google/api/gax/grpc/FlowController.java rename to src/main/java/com/google/api/gax/core/FlowController.java index c00e2d0f5..c5f864006 100644 --- a/src/main/java/com/google/api/gax/grpc/FlowController.java +++ b/src/main/java/com/google/api/gax/core/FlowController.java @@ -27,9 +27,8 @@ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ -package com.google.api.gax.grpc; +package com.google.api.gax.core; -import com.google.auto.value.AutoValue; import com.google.common.base.Preconditions; import java.util.concurrent.Semaphore; import javax.annotation.Nullable; @@ -41,6 +40,20 @@ public abstract static class FlowControlException extends Exception { private FlowControlException() {} } + /** + * Runtime exception that can be used in place of FlowControlException when an unchecked exception + * is required. + */ + public static class FlowControlRuntimeException extends RuntimeException { + private FlowControlRuntimeException(FlowControlException e) { + super(e); + } + + public static FlowControlRuntimeException fromFlowControlException(FlowControlException e) { + return new FlowControlRuntimeException(e); + } + } + /** * Exception thrown when client-side flow control is enforced based on the maximum number of * outstanding in-memory elements. @@ -87,20 +100,47 @@ public String toString() { } } + /** + * Enumeration of behaviors that FlowController can use in case the flow control limits are + * exceeded. + */ + public enum LimitExceededBehavior { + ThrowException, + Block, + Ignore, + } + @Nullable private final Semaphore outstandingElementCount; @Nullable private final Semaphore outstandingByteCount; private final boolean failOnLimits; @Nullable private final Integer maxOutstandingElementCount; @Nullable private final Integer maxOutstandingRequestBytes; - public FlowController(FlowControlSettings settings, boolean failOnFlowControlLimits) { + public FlowController(FlowControlSettings settings) { + switch (settings.getLimitExceededBehavior()) { + case ThrowException: + this.failOnLimits = true; + break; + case Block: + this.failOnLimits = false; + break; + case Ignore: + this.failOnLimits = false; + this.maxOutstandingElementCount = null; + this.maxOutstandingRequestBytes = null; + this.outstandingElementCount = null; + this.outstandingByteCount = null; + return; + default: + throw new IllegalArgumentException( + "Unknown LimitBehaviour: " + settings.getLimitExceededBehavior()); + } this.maxOutstandingElementCount = settings.getMaxOutstandingElementCount(); this.maxOutstandingRequestBytes = settings.getMaxOutstandingRequestBytes(); outstandingElementCount = maxOutstandingElementCount != null ? new Semaphore(maxOutstandingElementCount) : null; outstandingByteCount = maxOutstandingRequestBytes != null ? new Semaphore(maxOutstandingRequestBytes) : null; - this.failOnLimits = failOnFlowControlLimits; } public void reserve(int elements, int bytes) throws FlowControlException { diff --git a/src/main/java/com/google/api/gax/grpc/BundlerFactory.java b/src/main/java/com/google/api/gax/grpc/BundlerFactory.java index 7b7d26983..00212cfd1 100644 --- a/src/main/java/com/google/api/gax/grpc/BundlerFactory.java +++ b/src/main/java/com/google/api/gax/grpc/BundlerFactory.java @@ -29,11 +29,16 @@ */ package com.google.api.gax.grpc; +import com.google.api.gax.bundling.BundlingFlowController; +import com.google.api.gax.bundling.BundlingSettings; import com.google.api.gax.bundling.BundlingThreshold; import com.google.api.gax.bundling.ElementCounter; import com.google.api.gax.bundling.NumericThreshold; import com.google.api.gax.bundling.ThresholdBundler; import com.google.api.gax.bundling.ThresholdBundlingForwarder; +import com.google.api.gax.core.FlowControlSettings; +import com.google.api.gax.core.FlowController; +import com.google.api.gax.core.FlowController.LimitExceededBehavior; import com.google.common.collect.ImmutableList; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -50,6 +55,7 @@ public final class BundlerFactory implements AutoCloseable private final Map>> forwarders = new ConcurrentHashMap<>(); private final BundlingDescriptor bundlingDescriptor; + private final FlowController flowController; private final BundlingSettings bundlingSettings; private final Object lock = new Object(); @@ -58,6 +64,13 @@ public BundlerFactory( BundlingSettings bundlingSettings) { this.bundlingDescriptor = bundlingDescriptor; this.bundlingSettings = bundlingSettings; + this.flowController = + new FlowController( + bundlingSettings.getFlowControlSettings() != null + ? bundlingSettings.getFlowControlSettings() + : FlowControlSettings.newBuilder() + .setLimitExceededBehavior(LimitExceededBehavior.Ignore) + .build()); } /** @@ -97,12 +110,31 @@ private ThresholdBundlingForwarder> createF ThresholdBundler.>newBuilder() .setThresholds(getThresholds(bundlingSettings)) .setMaxDelay(bundlingSettings.getDelayThreshold()) + .setFlowController(createBundlingFlowController()) .build(); BundleExecutor processor = new BundleExecutor<>(bundlingDescriptor, partitionKey); return new ThresholdBundlingForwarder<>(bundler, processor); } + private BundlingFlowController> + createBundlingFlowController() { + return new BundlingFlowController>( + flowController, + new ElementCounter>() { + @Override + public long count(BundlingContext bundlablePublish) { + return bundlingDescriptor.countElements(bundlablePublish.getRequest()); + } + }, + new ElementCounter>() { + @Override + public long count(BundlingContext bundlablePublish) { + return bundlingDescriptor.countBytes(bundlablePublish.getRequest()); + } + }); + } + @Override public void close() { synchronized (lock) { diff --git a/src/main/java/com/google/api/gax/grpc/BundlingCallSettings.java b/src/main/java/com/google/api/gax/grpc/BundlingCallSettings.java index 92f754660..17df7e208 100644 --- a/src/main/java/com/google/api/gax/grpc/BundlingCallSettings.java +++ b/src/main/java/com/google/api/gax/grpc/BundlingCallSettings.java @@ -29,6 +29,7 @@ */ package com.google.api.gax.grpc; +import com.google.api.gax.bundling.BundlingSettings; import com.google.api.gax.core.RetrySettings; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableSet; diff --git a/src/main/java/com/google/api/gax/grpc/BundlingCallable.java b/src/main/java/com/google/api/gax/grpc/BundlingCallable.java index ebfd4253e..cb7e3b35f 100644 --- a/src/main/java/com/google/api/gax/grpc/BundlingCallable.java +++ b/src/main/java/com/google/api/gax/grpc/BundlingCallable.java @@ -30,6 +30,8 @@ package com.google.api.gax.grpc; import com.google.api.gax.bundling.ThresholdBundlingForwarder; +import com.google.api.gax.core.FlowController.FlowControlException; +import com.google.api.gax.core.FlowController.FlowControlRuntimeException; import com.google.api.gax.core.RpcFuture; import com.google.common.base.Preconditions; @@ -66,8 +68,12 @@ public RpcFuture futureCall(RequestT request, CallContext context) { String partitionKey = bundlingDescriptor.getBundlePartitionKey(request); ThresholdBundlingForwarder> forwarder = bundlerFactory.getForwarder(partitionKey); - forwarder.addToNextBundle(bundlableMessage); - return result; + try { + forwarder.addToNextBundle(bundlableMessage); + return result; + } catch (FlowControlException e) { + throw FlowControlRuntimeException.fromFlowControlException(e); + } } else { return callable.futureCall(request, context); } diff --git a/src/main/java/com/google/api/gax/grpc/UnaryCallable.java b/src/main/java/com/google/api/gax/grpc/UnaryCallable.java index f98b23dc3..4ca760cdf 100644 --- a/src/main/java/com/google/api/gax/grpc/UnaryCallable.java +++ b/src/main/java/com/google/api/gax/grpc/UnaryCallable.java @@ -181,8 +181,8 @@ public static UnaryCallable BundlingFlowController getDisabledBundlingFlowController() { + return new BundlingFlowController<>( + getDisabledFlowController(), + new ElementCounter() { + @Override + public long count(T t) { + return 1; + } + }, + new ElementCounter() { + @Override + public long count(T t) { + return 1; + } + }); + } + + private static BundlingFlowController getIntegerBundlingFlowController( + Integer elementCount, Integer byteCount, LimitExceededBehavior limitExceededBehaviour) { + return new BundlingFlowController<>( + new FlowController( + FlowControlSettings.newBuilder() + .setMaxOutstandingElementCount(elementCount) + .setMaxOutstandingRequestBytes(byteCount) + .setLimitExceededBehavior(limitExceededBehaviour) + .build()), + new ElementCounter() { + @Override + public long count(Integer t) { + return 1; + } + }, + new ElementCounter() { + @Override + public long count(Integer t) { + return t; + } + }); + } + + @Rule public ExpectedException thrown = ExpectedException.none(); + @Test public void testEmptyAddAndDrain() { ThresholdBundler bundler = ThresholdBundler.newBuilder() .setThresholds(BundlingThresholds.of(5)) + .setFlowController(ThresholdBundlerTest.getDisabledBundlingFlowController()) .build(); List resultBundle = new ArrayList<>(); Truth.assertThat(bundler.isEmpty()).isTrue(); @@ -54,10 +109,11 @@ public void testEmptyAddAndDrain() { } @Test - public void testAddAndDrain() { + public void testAddAndDrain() throws FlowControlException { ThresholdBundler bundler = ThresholdBundler.newBuilder() .setThresholds(BundlingThresholds.of(5)) + .setFlowController(ThresholdBundlerTest.getDisabledBundlingFlowController()) .build(); bundler.add(14); Truth.assertThat(bundler.isEmpty()).isFalse(); @@ -79,6 +135,7 @@ public void testBundling() throws Exception { ThresholdBundler bundler = ThresholdBundler.newBuilder() .setThresholds(BundlingThresholds.of(2)) + .setFlowController(ThresholdBundlerTest.getDisabledBundlingFlowController()) .build(); AccumulatingBundleReceiver receiver = new AccumulatingBundleReceiver(); ThresholdBundlingForwarder forwarder = @@ -110,7 +167,10 @@ public void testBundling() throws Exception { @Test public void testBundlingWithDelay() throws Exception { ThresholdBundler bundler = - ThresholdBundler.newBuilder().setMaxDelay(Duration.millis(100)).build(); + ThresholdBundler.newBuilder() + .setMaxDelay(Duration.millis(100)) + .setFlowController(ThresholdBundlerTest.getDisabledBundlingFlowController()) + .build(); AccumulatingBundleReceiver receiver = new AccumulatingBundleReceiver(); ThresholdBundlingForwarder forwarder = new ThresholdBundlingForwarder(bundler, receiver); @@ -137,6 +197,7 @@ public void testFlush() throws Exception { ThresholdBundler bundler = ThresholdBundler.newBuilder() .setThresholds(BundlingThresholds.of(2)) + .setFlowController(ThresholdBundlerTest.getDisabledBundlingFlowController()) .build(); AccumulatingBundleReceiver receiver = new AccumulatingBundleReceiver(); ThresholdBundlingForwarder forwarder = @@ -166,14 +227,75 @@ public void testFlush() throws Exception { Truth.assertThat(receiver.getBundles()).isEqualTo(expected); } - private BundlingThreshold createValueThreshold(long threshold) { - return new NumericThreshold( - threshold, - new ElementCounter() { - @Override - public long count(Integer value) { - return value; - } - }); + @Test + public void testExceptionWithNullFlowController() { + thrown.expect(NullPointerException.class); + ThresholdBundler.newBuilder().build(); + } + + @Test + public void testBundlingWithFlowControl() throws Exception { + ThresholdBundler bundler = + ThresholdBundler.newBuilder() + .setThresholds(BundlingThresholds.of(2)) + .setFlowController( + getIntegerBundlingFlowController(3, null, LimitExceededBehavior.Block)) + .build(); + AccumulatingBundleReceiver receiver = new AccumulatingBundleReceiver(); + ThresholdBundlingForwarder forwarder = + new ThresholdBundlingForwarder(bundler, receiver); + + try { + forwarder.start(); + bundler.add(3); + bundler.add(5); + bundler.add(7); + bundler.add(9); // We expect to block here until the first bundle is handled + bundler.add(11); + + } finally { + forwarder.close(); + } + + List> expected = + Arrays.asList(Arrays.asList(3, 5), Arrays.asList(7, 9), Arrays.asList(11)); + Truth.assertThat(receiver.getBundles()).isEqualTo(expected); + } + + @Test + public void testBundlingFlowControlExceptionRecovery() throws Exception { + ThresholdBundler bundler = + ThresholdBundler.newBuilder() + .setThresholds(BundlingThresholds.of(2)) + .setFlowController( + getIntegerBundlingFlowController(3, null, LimitExceededBehavior.ThrowException)) + .build(); + AccumulatingBundleReceiver receiver = new AccumulatingBundleReceiver(); + ThresholdBundlingForwarder forwarder = + new ThresholdBundlingForwarder(bundler, receiver); + + try { + // Note: do not start the forwarder here, otherwise we have a race condition in the test + // between whether bundler.add(9) executes before the first bundle is processed. + bundler.add(3); + bundler.add(5); + bundler.add(7); + try { + bundler.add(9); + } catch (FlowControlException e) { + } + forwarder.start(); + // Give time for the forwarder thread to catch the bundle + Thread.sleep(100); + bundler.add(11); + bundler.add(13); + + } finally { + forwarder.close(); + } + + List> expected = + Arrays.asList(Arrays.asList(3, 5), Arrays.asList(7, 11), Arrays.asList(13)); + Truth.assertThat(receiver.getBundles()).isEqualTo(expected); } } diff --git a/src/test/java/com/google/api/gax/grpc/FlowControllerTest.java b/src/test/java/com/google/api/gax/core/FlowControllerTest.java similarity index 83% rename from src/test/java/com/google/api/gax/grpc/FlowControllerTest.java rename to src/test/java/com/google/api/gax/core/FlowControllerTest.java index 8b263a46d..64d18d533 100644 --- a/src/test/java/com/google/api/gax/grpc/FlowControllerTest.java +++ b/src/test/java/com/google/api/gax/core/FlowControllerTest.java @@ -27,11 +27,12 @@ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ -package com.google.api.gax.grpc; +package com.google.api.gax.core; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import com.google.api.gax.core.FlowController.LimitExceededBehavior; import com.google.common.util.concurrent.SettableFuture; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -50,8 +51,8 @@ public void testReserveRelease_ok() throws Exception { FlowControlSettings.newBuilder() .setMaxOutstandingElementCount(10) .setMaxOutstandingRequestBytes(10) - .build(), - false); + .setLimitExceededBehavior(LimitExceededBehavior.Block) + .build()); flowController.reserve(1, 1); flowController.release(1, 1); @@ -64,8 +65,8 @@ public void testInvalidArguments() throws Exception { FlowControlSettings.newBuilder() .setMaxOutstandingElementCount(10) .setMaxOutstandingRequestBytes(10) - .build(), - false); + .setLimitExceededBehavior(LimitExceededBehavior.Block) + .build()); flowController.reserve(1, 0); try { @@ -95,8 +96,22 @@ public void testReserveRelease_noLimits_ok() throws Exception { FlowControlSettings.newBuilder() .setMaxOutstandingElementCount(null) .setMaxOutstandingRequestBytes(null) - .build(), - false); + .setLimitExceededBehavior(LimitExceededBehavior.Block) + .build()); + + flowController.reserve(1, 1); + flowController.release(1, 1); + } + + @Test + public void testReserveRelease_ignore_ok() throws Exception { + FlowController flowController = + new FlowController( + FlowControlSettings.newBuilder() + .setMaxOutstandingElementCount(1) + .setMaxOutstandingRequestBytes(1) + .setLimitExceededBehavior(LimitExceededBehavior.Ignore) + .build()); flowController.reserve(1, 1); flowController.release(1, 1); @@ -109,8 +124,8 @@ public void testReserveRelease_blockedByElementCount() throws Exception { FlowControlSettings.newBuilder() .setMaxOutstandingElementCount(10) .setMaxOutstandingRequestBytes(100) - .build(), - false); + .setLimitExceededBehavior(LimitExceededBehavior.Block) + .build()); testBlockingReserveRelease(flowController, 10, 10); } @@ -122,8 +137,8 @@ public void testReserveRelease_blockedByElementCount_noBytesLimit() throws Excep FlowControlSettings.newBuilder() .setMaxOutstandingElementCount(10) .setMaxOutstandingRequestBytes(null) - .build(), - false); + .setLimitExceededBehavior(LimitExceededBehavior.Block) + .build()); testBlockingReserveRelease(flowController, 10, 10); } @@ -135,8 +150,8 @@ public void testReserveRelease_blockedByNumberOfBytes() throws Exception { FlowControlSettings.newBuilder() .setMaxOutstandingElementCount(100) .setMaxOutstandingRequestBytes(10) - .build(), - false); + .setLimitExceededBehavior(LimitExceededBehavior.Block) + .build()); testBlockingReserveRelease(flowController, 10, 10); } @@ -148,8 +163,8 @@ public void testReserveRelease_blockedByNumberOfBytes_noElementCountLimit() thro FlowControlSettings.newBuilder() .setMaxOutstandingElementCount(null) .setMaxOutstandingRequestBytes(10) - .build(), - false); + .setLimitExceededBehavior(LimitExceededBehavior.Block) + .build()); testBlockingReserveRelease(flowController, 10, 10); } @@ -189,8 +204,8 @@ public void testReserveRelease_rejectedByElementCount() throws Exception { FlowControlSettings.newBuilder() .setMaxOutstandingElementCount(10) .setMaxOutstandingRequestBytes(100) - .build(), - true); + .setLimitExceededBehavior(LimitExceededBehavior.ThrowException) + .build()); testRejectedReserveRelease( flowController, 10, 10, FlowController.MaxOutstandingElementCountReachedException.class); @@ -203,8 +218,8 @@ public void testReserveRelease_rejectedByElementCount_noBytesLimit() throws Exce FlowControlSettings.newBuilder() .setMaxOutstandingElementCount(10) .setMaxOutstandingRequestBytes(null) - .build(), - true); + .setLimitExceededBehavior(LimitExceededBehavior.ThrowException) + .build()); testRejectedReserveRelease( flowController, 10, 10, FlowController.MaxOutstandingElementCountReachedException.class); @@ -217,8 +232,8 @@ public void testReserveRelease_rejectedByNumberOfBytes() throws Exception { FlowControlSettings.newBuilder() .setMaxOutstandingElementCount(100) .setMaxOutstandingRequestBytes(10) - .build(), - true); + .setLimitExceededBehavior(LimitExceededBehavior.ThrowException) + .build()); testRejectedReserveRelease( flowController, 10, 10, FlowController.MaxOutstandingRequestBytesReachedException.class); @@ -231,8 +246,8 @@ public void testReserveRelease_rejectedByNumberOfBytes_noElementCountLimit() thr FlowControlSettings.newBuilder() .setMaxOutstandingElementCount(null) .setMaxOutstandingRequestBytes(10) - .build(), - true); + .setLimitExceededBehavior(LimitExceededBehavior.ThrowException) + .build()); testRejectedReserveRelease( flowController, 10, 10, FlowController.MaxOutstandingRequestBytesReachedException.class); diff --git a/src/test/java/com/google/api/gax/grpc/SettingsTest.java b/src/test/java/com/google/api/gax/grpc/SettingsTest.java index 5213739c8..90b1ecc67 100644 --- a/src/test/java/com/google/api/gax/grpc/SettingsTest.java +++ b/src/test/java/com/google/api/gax/grpc/SettingsTest.java @@ -29,6 +29,7 @@ */ package com.google.api.gax.grpc; +import com.google.api.gax.bundling.BundlingSettings; import com.google.api.gax.core.CredentialsProvider; import com.google.api.gax.core.FixedCredentialsProvider; import com.google.api.gax.core.GoogleCredentialsProvider; diff --git a/src/test/java/com/google/api/gax/grpc/UnaryCallableTest.java b/src/test/java/com/google/api/gax/grpc/UnaryCallableTest.java index 70cd2d4ce..fd8c81a24 100644 --- a/src/test/java/com/google/api/gax/grpc/UnaryCallableTest.java +++ b/src/test/java/com/google/api/gax/grpc/UnaryCallableTest.java @@ -29,6 +29,7 @@ */ package com.google.api.gax.grpc; +import com.google.api.gax.bundling.BundlingSettings; import com.google.api.gax.core.FixedSizeCollection; import com.google.api.gax.core.Page; import com.google.api.gax.core.RetrySettings;