Skip to content
This repository has been archived by the owner on Sep 26, 2023. It is now read-only.

Add bundling flow control #213

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright 2017, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package com.google.api.gax.bundling;

import com.google.api.gax.core.FlowController;
import com.google.api.gax.core.FlowController.FlowControlException;
import com.google.common.base.Preconditions;
import com.google.common.primitives.Ints;

public class BundlingFlowController<T> {

private final FlowController flowController;
private final ElementCounter<T> elementCounter;
private final ElementCounter<T> byteCounter;

public BundlingFlowController(
FlowController flowController,
ElementCounter<T> elementCounter,
ElementCounter<T> byteCounter) {
this.flowController = flowController;
this.elementCounter = elementCounter;
this.byteCounter = byteCounter;
}

public void reserve(T bundle) throws FlowControlException {
Preconditions.checkNotNull(bundle);
int elements = Ints.checkedCast(elementCounter.count(bundle));
int bytes = Ints.checkedCast(byteCounter.count(bundle));
flowController.reserve(elements, bytes);
}

public void release(T bundle) {
Preconditions.checkNotNull(bundle);
int elements = Ints.checkedCast(elementCounter.count(bundle));
int bytes = Ints.checkedCast(byteCounter.count(bundle));
flowController.release(elements, bytes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package com.google.api.gax.grpc;
package com.google.api.gax.bundling;

import com.google.api.gax.core.FlowControlSettings;
import com.google.auto.value.AutoValue;
import com.google.common.base.Preconditions;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -103,9 +104,14 @@ public abstract class BundlingSettings {
/** Returns the Boolean object to indicate if the bundling is enabled. Default to true */
public abstract Boolean getIsEnabled();

/** Get the flow control settings to use. */
public abstract FlowControlSettings getFlowControlSettings();

This comment was marked as spam.


/** Get a new builder. */
public static Builder newBuilder() {
return new AutoValue_BundlingSettings.Builder().setIsEnabled(true);
return new AutoValue_BundlingSettings.Builder()
.setIsEnabled(true)
.setFlowControlSettings(FlowControlSettings.getDefaultInstance());
}

/** Get a builder with the same values as this object. */
Expand Down Expand Up @@ -155,6 +161,8 @@ public Builder setRequestByteThreshold(Integer requestByteThreshold) {
*/
public abstract Builder setIsEnabled(Boolean enabled);

public abstract Builder setFlowControlSettings(FlowControlSettings flowControlSettings);

abstract BundlingSettings autoBuild();

/** Build the BundlingSettings object. */
Expand Down
35 changes: 28 additions & 7 deletions src/main/java/com/google/api/gax/bundling/ThresholdBundler.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
*/
package com.google.api.gax.bundling;

import com.google.api.gax.core.FlowController.FlowControlException;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableList;
Expand All @@ -50,15 +51,20 @@ public final class ThresholdBundler<E> {

private ImmutableList<BundlingThreshold<E>> thresholdPrototypes;
private final Duration maxDelay;
private final BundlingFlowController<E> flowController;

private final Lock lock = new ReentrantLock();
private final Condition bundleCondition = lock.newCondition();
private Bundle currentOpenBundle;
private List<Bundle> closedBundles = new ArrayList<>();

private ThresholdBundler(ImmutableList<BundlingThreshold<E>> thresholds, Duration maxDelay) {
private ThresholdBundler(
ImmutableList<BundlingThreshold<E>> thresholds,
Duration maxDelay,
BundlingFlowController<E> flowController) {
this.thresholdPrototypes = copyResetThresholds(Preconditions.checkNotNull(thresholds));
this.maxDelay = maxDelay;
this.flowController = Preconditions.checkNotNull(flowController);
this.currentOpenBundle = null;
}

Expand All @@ -68,6 +74,7 @@ private ThresholdBundler(ImmutableList<BundlingThreshold<E>> thresholds, Duratio
public static final class Builder<E> {
private List<BundlingThreshold<E>> thresholds;
private Duration maxDelay;
private BundlingFlowController<E> flowController;

private Builder() {
thresholds = Lists.newArrayList();
Expand Down Expand Up @@ -97,11 +104,15 @@ public Builder<E> addThreshold(BundlingThreshold<E> threshold) {
return this;
}

/**
* Build the ThresholdBundler.
*/
/** Set the flow controller for the ThresholdBundler. */
public Builder<E> setFlowController(BundlingFlowController<E> flowController) {
this.flowController = flowController;
return this;
}

/** Build the ThresholdBundler. */
public ThresholdBundler<E> build() {
return new ThresholdBundler<E>(ImmutableList.copyOf(thresholds), maxDelay);
return new ThresholdBundler<E>(ImmutableList.copyOf(thresholds), maxDelay, flowController);
}
}

Expand All @@ -115,9 +126,15 @@ public static <T> Builder<T> newBuilder() {
/**
* Adds an element to the bundler. If the element causes the collection to go past any of the
* thresholds, the bundle will be made available to consumers.
*
* @throws FlowControlException
*/
public void add(E e) {
public synchronized void add(E e) throws FlowControlException {

This comment was marked as spam.

This comment was marked as spam.

final Lock lock = this.lock;
flowController.reserve(e);
// We need to reserve resources from flowController outside the lock, but we also need to
// prevent concurrent calls to add(E e) from all reserving flowController resources and then
// waiting to acquire the lock, so this method is marked as synchronized.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

lock.lock();
try {
boolean signalBundleIsReady = false;
Expand Down Expand Up @@ -179,7 +196,11 @@ public int drainNextBundleTo(Collection<? super E> outputCollection) {
}

if (outBundle != null) {
outputCollection.addAll(outBundle.getData());
List<E> data = outBundle.getData();
for (E e : data) {
flowController.release(e);
}
outputCollection.addAll(data);
return outputCollection.size();
} else {
return 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
*/
package com.google.api.gax.bundling;

import com.google.api.gax.core.FlowController.FlowControlException;
import java.util.ArrayList;
import java.util.List;

Expand Down Expand Up @@ -65,8 +66,10 @@ public void start() {
/**
* First validates that the receiver can receive the given item (based on the inherent
* characteristics of the item), and then hands it off to the bundler.
*
* @throws FlowControlException
*/
public void addToNextBundle(T item) {
public void addToNextBundle(T item) throws FlowControlException {
bundleReceiver.validateItem(item);
bundler.add(item);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package com.google.api.gax.grpc;
package com.google.api.gax.core;

import com.google.api.gax.core.FlowController.LimitExceededBehavior;
import com.google.auto.value.AutoValue;
import com.google.common.base.Preconditions;
import java.util.concurrent.Semaphore;
import javax.annotation.Nullable;

/** Settings for {@link FlowController}. */
Expand All @@ -49,12 +49,23 @@ public static FlowControlSettings getDefaultInstance() {
@Nullable
public abstract Integer getMaxOutstandingRequestBytes();

/** Is flow control enabled. Defaults to true. */
public abstract boolean getIsEnabled();

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.


/**
* The behavior of FlowController when the specified limits are exceeded. Defaults to
* ThrowException.
*/

This comment was marked as spam.

This comment was marked as spam.

public abstract LimitExceededBehavior getExceedLimitBehavior();

public Builder toBuilder() {
return new AutoValue_FlowControlSettings.Builder(this);
}

public static Builder newBuilder() {
return new AutoValue_FlowControlSettings.Builder();
return new AutoValue_FlowControlSettings.Builder()
.setIsEnabled(true)
.setExceedLimitBehavior(LimitExceededBehavior.ThrowException);

This comment was marked as spam.

This comment was marked as spam.

}

@AutoValue.Builder
Expand All @@ -63,6 +74,10 @@ public abstract static class Builder {

public abstract Builder setMaxOutstandingRequestBytes(Integer value);

public abstract Builder setIsEnabled(boolean value);

public abstract Builder setExceedLimitBehavior(LimitExceededBehavior value);

abstract FlowControlSettings autoBuild();

public FlowControlSettings build() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,8 @@
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package com.google.api.gax.grpc;
package com.google.api.gax.core;

import com.google.auto.value.AutoValue;
import com.google.common.base.Preconditions;
import java.util.concurrent.Semaphore;
import javax.annotation.Nullable;
Expand All @@ -41,6 +40,16 @@ public abstract static class FlowControlException extends Exception {
private FlowControlException() {}
}

public static class FlowControlRuntimeException extends RuntimeException {
private FlowControlRuntimeException(FlowControlException e) {
super(e);
}

public static FlowControlRuntimeException fromFlowControlException(FlowControlException e) {
return new FlowControlRuntimeException(e);
}
}

/**
* Exception thrown when client-side flow control is enforced based on the maximum number of
* outstanding in-memory elements.
Expand Down Expand Up @@ -87,23 +96,43 @@ public String toString() {
}
}

public enum LimitExceededBehavior {
ThrowException,
Block,
}

@Nullable private final Semaphore outstandingElementCount;
@Nullable private final Semaphore outstandingByteCount;
private final boolean flowControlEnabled;

This comment was marked as spam.

This comment was marked as spam.

private final boolean failOnLimits;
@Nullable private final Integer maxOutstandingElementCount;
@Nullable private final Integer maxOutstandingRequestBytes;

public FlowController(FlowControlSettings settings, boolean failOnFlowControlLimits) {
public FlowController(FlowControlSettings settings) {
this.maxOutstandingElementCount = settings.getMaxOutstandingElementCount();
this.maxOutstandingRequestBytes = settings.getMaxOutstandingRequestBytes();
outstandingElementCount =
maxOutstandingElementCount != null ? new Semaphore(maxOutstandingElementCount) : null;
outstandingByteCount =
maxOutstandingRequestBytes != null ? new Semaphore(maxOutstandingRequestBytes) : null;
this.failOnLimits = failOnFlowControlLimits;
this.flowControlEnabled = settings.getIsEnabled();
switch (settings.getExceedLimitBehavior()) {
case ThrowException:
this.failOnLimits = true;
break;
case Block:
this.failOnLimits = false;
break;
default:
throw new IllegalArgumentException(
"Unknown LimitBehaviour: " + settings.getExceedLimitBehavior());
}
}

public void reserve(int elements, int bytes) throws FlowControlException {
if (!flowControlEnabled) {
return;
}
Preconditions.checkArgument(elements > 0);

if (outstandingElementCount != null) {
Expand All @@ -127,6 +156,9 @@ public void reserve(int elements, int bytes) throws FlowControlException {
}

public void release(int elements, int bytes) {
if (!flowControlEnabled) {
return;
}
Preconditions.checkArgument(elements > 0);

if (outstandingElementCount != null) {
Expand Down
Loading