Skip to content
This repository has been archived by the owner on Sep 26, 2023. It is now read-only.

Add bundling flow control #213

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright 2017, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package com.google.api.gax.bundling;

import com.google.api.gax.core.FlowController;
import com.google.api.gax.core.FlowController.FlowControlException;
import com.google.common.base.Preconditions;
import com.google.common.primitives.Ints;

/** Wraps a {@link FlowController} for use by Bundling. */
public class BundlingFlowController<T> {

private final FlowController flowController;
private final ElementCounter<T> elementCounter;
private final ElementCounter<T> byteCounter;

public BundlingFlowController(
FlowController flowController,
ElementCounter<T> elementCounter,
ElementCounter<T> byteCounter) {
this.flowController = flowController;
this.elementCounter = elementCounter;
this.byteCounter = byteCounter;
}

public void reserve(T bundle) throws FlowControlException {
Preconditions.checkNotNull(bundle);
int elements = Ints.checkedCast(elementCounter.count(bundle));
int bytes = Ints.checkedCast(byteCounter.count(bundle));
flowController.reserve(elements, bytes);
}

public void release(T bundle) {
Preconditions.checkNotNull(bundle);
int elements = Ints.checkedCast(elementCounter.count(bundle));
int bytes = Ints.checkedCast(byteCounter.count(bundle));
flowController.release(elements, bytes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package com.google.api.gax.grpc;
package com.google.api.gax.bundling;

import com.google.api.gax.core.FlowControlSettings;
import com.google.auto.value.AutoValue;
import com.google.common.base.Preconditions;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -85,6 +86,12 @@
* will not bundle together requests in the resulting bundle will surpass the limit. Thus, a bundle
* can be sent that is actually under the threshold if the next request would put the combined
* request over the limit.
*
* <p>
* Bundling also supports FlowControl. This can be used to prevent the bundling implementation from
* accumulating messages without limit, resulting eventually in an OutOfMemory exception. This can
* occur if messages are created and added to bundling faster than they can be processed. The flow
* control behavior is controlled using FlowControlSettings.
*/
@AutoValue
public abstract class BundlingSettings {
Expand All @@ -103,9 +110,14 @@ public abstract class BundlingSettings {
/** Returns the Boolean object to indicate if the bundling is enabled. Default to true */
public abstract Boolean getIsEnabled();

/** Get the flow control settings to use. */
public abstract FlowControlSettings getFlowControlSettings();

/** Get a new builder. */
public static Builder newBuilder() {
return new AutoValue_BundlingSettings.Builder().setIsEnabled(true);
return new AutoValue_BundlingSettings.Builder()
.setIsEnabled(true)
.setFlowControlSettings(FlowControlSettings.getDefaultInstance());
}

/** Get a builder with the same values as this object. */
Expand Down Expand Up @@ -155,6 +167,9 @@ public Builder setRequestByteThreshold(Integer requestByteThreshold) {
*/
public abstract Builder setIsEnabled(Boolean enabled);

/** Set the flow control settings to be used. */
public abstract Builder setFlowControlSettings(FlowControlSettings flowControlSettings);

abstract BundlingSettings autoBuild();

/** Build the BundlingSettings object. */
Expand Down
34 changes: 27 additions & 7 deletions src/main/java/com/google/api/gax/bundling/ThresholdBundler.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
*/
package com.google.api.gax.bundling;

import com.google.api.gax.core.FlowController.FlowControlException;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableList;
Expand All @@ -50,15 +51,20 @@ public final class ThresholdBundler<E> {

private ImmutableList<BundlingThreshold<E>> thresholdPrototypes;
private final Duration maxDelay;
private final BundlingFlowController<E> flowController;

private final Lock lock = new ReentrantLock();
private final Condition bundleCondition = lock.newCondition();
private Bundle currentOpenBundle;
private List<Bundle> closedBundles = new ArrayList<>();

private ThresholdBundler(ImmutableList<BundlingThreshold<E>> thresholds, Duration maxDelay) {
private ThresholdBundler(
ImmutableList<BundlingThreshold<E>> thresholds,
Duration maxDelay,
BundlingFlowController<E> flowController) {
this.thresholdPrototypes = copyResetThresholds(Preconditions.checkNotNull(thresholds));
this.maxDelay = maxDelay;
this.flowController = Preconditions.checkNotNull(flowController);
this.currentOpenBundle = null;
}

Expand All @@ -68,6 +74,7 @@ private ThresholdBundler(ImmutableList<BundlingThreshold<E>> thresholds, Duratio
public static final class Builder<E> {
private List<BundlingThreshold<E>> thresholds;
private Duration maxDelay;
private BundlingFlowController<E> flowController;

private Builder() {
thresholds = Lists.newArrayList();
Expand Down Expand Up @@ -97,11 +104,15 @@ public Builder<E> addThreshold(BundlingThreshold<E> threshold) {
return this;
}

/**
* Build the ThresholdBundler.
*/
/** Set the flow controller for the ThresholdBundler. */
public Builder<E> setFlowController(BundlingFlowController<E> flowController) {
this.flowController = flowController;
return this;
}

/** Build the ThresholdBundler. */
public ThresholdBundler<E> build() {
return new ThresholdBundler<E>(ImmutableList.copyOf(thresholds), maxDelay);
return new ThresholdBundler<E>(ImmutableList.copyOf(thresholds), maxDelay, flowController);
}
}

Expand All @@ -115,9 +126,14 @@ public static <T> Builder<T> newBuilder() {
/**
* Adds an element to the bundler. If the element causes the collection to go past any of the
* thresholds, the bundle will be made available to consumers.
*
* @throws FlowControlException
*/
public void add(E e) {
public void add(E e) throws FlowControlException {
final Lock lock = this.lock;
// We need to reserve resources from flowController outside the lock, so that they can be
// released by drainNextBundleTo().
flowController.reserve(e);
lock.lock();
try {
boolean signalBundleIsReady = false;
Expand Down Expand Up @@ -179,7 +195,11 @@ public int drainNextBundleTo(Collection<? super E> outputCollection) {
}

if (outBundle != null) {
outputCollection.addAll(outBundle.getData());
List<E> data = outBundle.getData();
for (E e : data) {
flowController.release(e);
}
outputCollection.addAll(data);
return outputCollection.size();
} else {
return 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
*/
package com.google.api.gax.bundling;

import com.google.api.gax.core.FlowController.FlowControlException;
import java.util.ArrayList;
import java.util.List;

Expand Down Expand Up @@ -65,8 +66,10 @@ public void start() {
/**
* First validates that the receiver can receive the given item (based on the inherent
* characteristics of the item), and then hands it off to the bundler.
*
* @throws FlowControlException
*/
public void addToNextBundle(T item) {
public void addToNextBundle(T item) throws FlowControlException {
bundleReceiver.validateItem(item);
bundler.add(item);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package com.google.api.gax.grpc;
package com.google.api.gax.core;

import com.google.api.gax.core.FlowController.FlowControlException;
import com.google.api.gax.core.FlowController.LimitExceededBehavior;
import com.google.auto.value.AutoValue;
import com.google.common.base.Preconditions;
import java.util.concurrent.Semaphore;
import javax.annotation.Nullable;

/** Settings for {@link FlowController}. */
Expand All @@ -49,12 +50,30 @@ public static FlowControlSettings getDefaultInstance() {
@Nullable
public abstract Integer getMaxOutstandingRequestBytes();

/**
* The behavior of {@link FlowController} when the specified limits are exceeded. Defaults to
* Block.
*
* <p>
* The expected behavior for each of these values is:
*
* <ul>
* <li>ThrowException: the FlowController will throw a {@link FlowControlException} if any of the
* limits are exceeded.
* <li>Block: the reserve() method of FlowController will block until the quote is available to be
* reserved.
* <li>Ignore: all flow control limits will be ignored; the FlowController is disabled.
* </ul>
*/
public abstract LimitExceededBehavior getLimitExceededBehavior();

public Builder toBuilder() {
return new AutoValue_FlowControlSettings.Builder(this);
}

public static Builder newBuilder() {
return new AutoValue_FlowControlSettings.Builder();
return new AutoValue_FlowControlSettings.Builder()
.setLimitExceededBehavior(LimitExceededBehavior.Block);
}

@AutoValue.Builder
Expand All @@ -63,6 +82,8 @@ public abstract static class Builder {

public abstract Builder setMaxOutstandingRequestBytes(Integer value);

public abstract Builder setLimitExceededBehavior(LimitExceededBehavior value);

abstract FlowControlSettings autoBuild();

public FlowControlSettings build() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,8 @@
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package com.google.api.gax.grpc;
package com.google.api.gax.core;

import com.google.auto.value.AutoValue;
import com.google.common.base.Preconditions;
import java.util.concurrent.Semaphore;
import javax.annotation.Nullable;
Expand All @@ -41,6 +40,20 @@ public abstract static class FlowControlException extends Exception {
private FlowControlException() {}
}

/**
* Runtime exception that can be used in place of FlowControlException when an unchecked exception
* is required.
*/
public static class FlowControlRuntimeException extends RuntimeException {
private FlowControlRuntimeException(FlowControlException e) {
super(e);
}

public static FlowControlRuntimeException fromFlowControlException(FlowControlException e) {
return new FlowControlRuntimeException(e);
}
}

/**
* Exception thrown when client-side flow control is enforced based on the maximum number of
* outstanding in-memory elements.
Expand Down Expand Up @@ -87,20 +100,47 @@ public String toString() {
}
}

/**
* Enumeration of behaviors that FlowController can use in case the flow control limits are
* exceeded.
*/
public enum LimitExceededBehavior {
ThrowException,
Block,
Ignore,
}

@Nullable private final Semaphore outstandingElementCount;
@Nullable private final Semaphore outstandingByteCount;
private final boolean failOnLimits;
@Nullable private final Integer maxOutstandingElementCount;
@Nullable private final Integer maxOutstandingRequestBytes;

public FlowController(FlowControlSettings settings, boolean failOnFlowControlLimits) {
public FlowController(FlowControlSettings settings) {
switch (settings.getLimitExceededBehavior()) {
case ThrowException:
this.failOnLimits = true;
break;
case Block:
this.failOnLimits = false;
break;
case Ignore:
this.failOnLimits = false;
this.maxOutstandingElementCount = null;
this.maxOutstandingRequestBytes = null;
this.outstandingElementCount = null;
this.outstandingByteCount = null;
return;
default:
throw new IllegalArgumentException(
"Unknown LimitBehaviour: " + settings.getLimitExceededBehavior());
}
this.maxOutstandingElementCount = settings.getMaxOutstandingElementCount();
this.maxOutstandingRequestBytes = settings.getMaxOutstandingRequestBytes();
outstandingElementCount =
maxOutstandingElementCount != null ? new Semaphore(maxOutstandingElementCount) : null;
outstandingByteCount =
maxOutstandingRequestBytes != null ? new Semaphore(maxOutstandingRequestBytes) : null;
this.failOnLimits = failOnFlowControlLimits;
}

public void reserve(int elements, int bytes) throws FlowControlException {
Expand Down
Loading