Skip to content

Commit

Permalink
Add bundling flow control (googleapis#213)
Browse files Browse the repository at this point in the history
- Add FlowController into bundling
- Move FlowController into core
- Move BundlingSettings in bundling
- Add FlowControllerRuntimeException
  • Loading branch information
michaelbausor authored Mar 1, 2017
1 parent f049262 commit ec2e656
Show file tree
Hide file tree
Showing 14 changed files with 399 additions and 56 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright 2017, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package com.google.api.gax.bundling;

import com.google.api.gax.core.FlowController;
import com.google.api.gax.core.FlowController.FlowControlException;
import com.google.common.base.Preconditions;
import com.google.common.primitives.Ints;

/** Wraps a {@link FlowController} for use by Bundling. */
public class BundlingFlowController<T> {

private final FlowController flowController;
private final ElementCounter<T> elementCounter;
private final ElementCounter<T> byteCounter;

public BundlingFlowController(
FlowController flowController,
ElementCounter<T> elementCounter,
ElementCounter<T> byteCounter) {
this.flowController = flowController;
this.elementCounter = elementCounter;
this.byteCounter = byteCounter;
}

public void reserve(T bundle) throws FlowControlException {
Preconditions.checkNotNull(bundle);
int elements = Ints.checkedCast(elementCounter.count(bundle));
int bytes = Ints.checkedCast(byteCounter.count(bundle));
flowController.reserve(elements, bytes);
}

public void release(T bundle) {
Preconditions.checkNotNull(bundle);
int elements = Ints.checkedCast(elementCounter.count(bundle));
int bytes = Ints.checkedCast(byteCounter.count(bundle));
flowController.release(elements, bytes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package com.google.api.gax.grpc;
package com.google.api.gax.bundling;

import com.google.api.gax.core.FlowControlSettings;
import com.google.auto.value.AutoValue;
import com.google.common.base.Preconditions;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -85,6 +86,12 @@
* will not bundle together requests in the resulting bundle will surpass the limit. Thus, a bundle
* can be sent that is actually under the threshold if the next request would put the combined
* request over the limit.
*
* <p>
* Bundling also supports FlowControl. This can be used to prevent the bundling implementation from
* accumulating messages without limit, resulting eventually in an OutOfMemory exception. This can
* occur if messages are created and added to bundling faster than they can be processed. The flow
* control behavior is controlled using FlowControlSettings.
*/
@AutoValue
public abstract class BundlingSettings {
Expand All @@ -103,9 +110,14 @@ public abstract class BundlingSettings {
/** Returns the Boolean object to indicate if the bundling is enabled. Default to true */
public abstract Boolean getIsEnabled();

/** Get the flow control settings to use. */
public abstract FlowControlSettings getFlowControlSettings();

/** Get a new builder. */
public static Builder newBuilder() {
return new AutoValue_BundlingSettings.Builder().setIsEnabled(true);
return new AutoValue_BundlingSettings.Builder()
.setIsEnabled(true)
.setFlowControlSettings(FlowControlSettings.getDefaultInstance());
}

/** Get a builder with the same values as this object. */
Expand Down Expand Up @@ -155,6 +167,9 @@ public Builder setRequestByteThreshold(Integer requestByteThreshold) {
*/
public abstract Builder setIsEnabled(Boolean enabled);

/** Set the flow control settings to be used. */
public abstract Builder setFlowControlSettings(FlowControlSettings flowControlSettings);

abstract BundlingSettings autoBuild();

/** Build the BundlingSettings object. */
Expand Down
34 changes: 27 additions & 7 deletions src/main/java/com/google/api/gax/bundling/ThresholdBundler.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
*/
package com.google.api.gax.bundling;

import com.google.api.gax.core.FlowController.FlowControlException;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableList;
Expand All @@ -50,15 +51,20 @@ public final class ThresholdBundler<E> {

private ImmutableList<BundlingThreshold<E>> thresholdPrototypes;
private final Duration maxDelay;
private final BundlingFlowController<E> flowController;

private final Lock lock = new ReentrantLock();
private final Condition bundleCondition = lock.newCondition();
private Bundle currentOpenBundle;
private List<Bundle> closedBundles = new ArrayList<>();

private ThresholdBundler(ImmutableList<BundlingThreshold<E>> thresholds, Duration maxDelay) {
private ThresholdBundler(
ImmutableList<BundlingThreshold<E>> thresholds,
Duration maxDelay,
BundlingFlowController<E> flowController) {
this.thresholdPrototypes = copyResetThresholds(Preconditions.checkNotNull(thresholds));
this.maxDelay = maxDelay;
this.flowController = Preconditions.checkNotNull(flowController);
this.currentOpenBundle = null;
}

Expand All @@ -68,6 +74,7 @@ private ThresholdBundler(ImmutableList<BundlingThreshold<E>> thresholds, Duratio
public static final class Builder<E> {
private List<BundlingThreshold<E>> thresholds;
private Duration maxDelay;
private BundlingFlowController<E> flowController;

private Builder() {
thresholds = Lists.newArrayList();
Expand Down Expand Up @@ -97,11 +104,15 @@ public Builder<E> addThreshold(BundlingThreshold<E> threshold) {
return this;
}

/**
* Build the ThresholdBundler.
*/
/** Set the flow controller for the ThresholdBundler. */
public Builder<E> setFlowController(BundlingFlowController<E> flowController) {
this.flowController = flowController;
return this;
}

/** Build the ThresholdBundler. */
public ThresholdBundler<E> build() {
return new ThresholdBundler<E>(ImmutableList.copyOf(thresholds), maxDelay);
return new ThresholdBundler<E>(ImmutableList.copyOf(thresholds), maxDelay, flowController);
}
}

Expand All @@ -115,9 +126,14 @@ public static <T> Builder<T> newBuilder() {
/**
* Adds an element to the bundler. If the element causes the collection to go past any of the
* thresholds, the bundle will be made available to consumers.
*
* @throws FlowControlException
*/
public void add(E e) {
public void add(E e) throws FlowControlException {
final Lock lock = this.lock;
// We need to reserve resources from flowController outside the lock, so that they can be
// released by drainNextBundleTo().
flowController.reserve(e);
lock.lock();
try {
boolean signalBundleIsReady = false;
Expand Down Expand Up @@ -179,7 +195,11 @@ public int drainNextBundleTo(Collection<? super E> outputCollection) {
}

if (outBundle != null) {
outputCollection.addAll(outBundle.getData());
List<E> data = outBundle.getData();
for (E e : data) {
flowController.release(e);
}
outputCollection.addAll(data);
return outputCollection.size();
} else {
return 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
*/
package com.google.api.gax.bundling;

import com.google.api.gax.core.FlowController.FlowControlException;
import java.util.ArrayList;
import java.util.List;

Expand Down Expand Up @@ -65,8 +66,10 @@ public void start() {
/**
* First validates that the receiver can receive the given item (based on the inherent
* characteristics of the item), and then hands it off to the bundler.
*
* @throws FlowControlException
*/
public void addToNextBundle(T item) {
public void addToNextBundle(T item) throws FlowControlException {
bundleReceiver.validateItem(item);
bundler.add(item);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package com.google.api.gax.grpc;
package com.google.api.gax.core;

import com.google.api.gax.core.FlowController.FlowControlException;
import com.google.api.gax.core.FlowController.LimitExceededBehavior;
import com.google.auto.value.AutoValue;
import com.google.common.base.Preconditions;
import java.util.concurrent.Semaphore;
import javax.annotation.Nullable;

/** Settings for {@link FlowController}. */
Expand All @@ -49,12 +50,30 @@ public static FlowControlSettings getDefaultInstance() {
@Nullable
public abstract Integer getMaxOutstandingRequestBytes();

/**
* The behavior of {@link FlowController} when the specified limits are exceeded. Defaults to
* Block.
*
* <p>
* The expected behavior for each of these values is:
*
* <ul>
* <li>ThrowException: the FlowController will throw a {@link FlowControlException} if any of the
* limits are exceeded.
* <li>Block: the reserve() method of FlowController will block until the quote is available to be
* reserved.
* <li>Ignore: all flow control limits will be ignored; the FlowController is disabled.
* </ul>
*/
public abstract LimitExceededBehavior getLimitExceededBehavior();

public Builder toBuilder() {
return new AutoValue_FlowControlSettings.Builder(this);
}

public static Builder newBuilder() {
return new AutoValue_FlowControlSettings.Builder();
return new AutoValue_FlowControlSettings.Builder()
.setLimitExceededBehavior(LimitExceededBehavior.Block);
}

@AutoValue.Builder
Expand All @@ -63,6 +82,8 @@ public abstract static class Builder {

public abstract Builder setMaxOutstandingRequestBytes(Integer value);

public abstract Builder setLimitExceededBehavior(LimitExceededBehavior value);

abstract FlowControlSettings autoBuild();

public FlowControlSettings build() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,8 @@
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package com.google.api.gax.grpc;
package com.google.api.gax.core;

import com.google.auto.value.AutoValue;
import com.google.common.base.Preconditions;
import java.util.concurrent.Semaphore;
import javax.annotation.Nullable;
Expand All @@ -41,6 +40,20 @@ public abstract static class FlowControlException extends Exception {
private FlowControlException() {}
}

/**
* Runtime exception that can be used in place of FlowControlException when an unchecked exception
* is required.
*/
public static class FlowControlRuntimeException extends RuntimeException {
private FlowControlRuntimeException(FlowControlException e) {
super(e);
}

public static FlowControlRuntimeException fromFlowControlException(FlowControlException e) {
return new FlowControlRuntimeException(e);
}
}

/**
* Exception thrown when client-side flow control is enforced based on the maximum number of
* outstanding in-memory elements.
Expand Down Expand Up @@ -87,20 +100,47 @@ public String toString() {
}
}

/**
* Enumeration of behaviors that FlowController can use in case the flow control limits are
* exceeded.
*/
public enum LimitExceededBehavior {
ThrowException,
Block,
Ignore,
}

@Nullable private final Semaphore outstandingElementCount;
@Nullable private final Semaphore outstandingByteCount;
private final boolean failOnLimits;
@Nullable private final Integer maxOutstandingElementCount;
@Nullable private final Integer maxOutstandingRequestBytes;

public FlowController(FlowControlSettings settings, boolean failOnFlowControlLimits) {
public FlowController(FlowControlSettings settings) {
switch (settings.getLimitExceededBehavior()) {
case ThrowException:
this.failOnLimits = true;
break;
case Block:
this.failOnLimits = false;
break;
case Ignore:
this.failOnLimits = false;
this.maxOutstandingElementCount = null;
this.maxOutstandingRequestBytes = null;
this.outstandingElementCount = null;
this.outstandingByteCount = null;
return;
default:
throw new IllegalArgumentException(
"Unknown LimitBehaviour: " + settings.getLimitExceededBehavior());
}
this.maxOutstandingElementCount = settings.getMaxOutstandingElementCount();
this.maxOutstandingRequestBytes = settings.getMaxOutstandingRequestBytes();
outstandingElementCount =
maxOutstandingElementCount != null ? new Semaphore(maxOutstandingElementCount) : null;
outstandingByteCount =
maxOutstandingRequestBytes != null ? new Semaphore(maxOutstandingRequestBytes) : null;
this.failOnLimits = failOnFlowControlLimits;
}

public void reserve(int elements, int bytes) throws FlowControlException {
Expand Down
Loading

0 comments on commit ec2e656

Please sign in to comment.