Skip to content
This repository has been archived by the owner on Sep 26, 2023. It is now read-only.

Add bundling flow control #213

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.google.common.base.Preconditions;
import com.google.common.primitives.Ints;

/** Wraps a {@link FlowController} for use by Bundling. */
public class BundlingFlowController<T> {

private final FlowController flowController;
Expand Down
58 changes: 29 additions & 29 deletions src/main/java/com/google/api/gax/bundling/BundlingSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,54 +38,53 @@
/**
* Represents the bundling settings to use for an API method that is capable of bundling.
*
* <p>
* Warning: With the wrong settings, it is possible to cause long periods of dead waiting time.
* <p>Warning: With the wrong settings, it is possible to cause long periods of dead waiting time.
*
* <p>
* When bundling is turned on for an API method, a call to that method will result in the request
* <p>When bundling is turned on for an API method, a call to that method will result in the request
* being queued up with other requests. When any of the set thresholds are reached, the queued up
* requests are packaged together in a bundle and set to the service as a single RPC. When the
* response comes back, it is split apart into individual responses according to the individual
* input requests.
*
* <p>
* There are several supported thresholds:
* <p>There are several supported thresholds:
*
* <p>
*
* <ul>
* <li><b>Delay Threshold</b>: Counting from the time that the first message is queued, once this
* delay has passed, then send the bundle.
* <li><b>Message Count Threshold</b>: Once this many messages are queued, send all of the messages
* in a single call, even if the delay threshold hasn't elapsed yet.
* <li><b>Request Byte Threshold</b>: Once the number of bytes in the bundled request reaches this
* threshold, send all of the messages in a single call, even if neither the delay or message count
* thresholds have been exceeded yet.
* <li><b>Delay Threshold</b>: Counting from the time that the first message is queued, once this
* delay has passed, then send the bundle.
* <li><b>Message Count Threshold</b>: Once this many messages are queued, send all of the
* messages in a single call, even if the delay threshold hasn't elapsed yet.
* <li><b>Request Byte Threshold</b>: Once the number of bytes in the bundled request reaches this
* threshold, send all of the messages in a single call, even if neither the delay or message
* count thresholds have been exceeded yet.
* </ul>
*
* <p>
* These thresholds are treated as triggers, not as limits. Thus, if a request is made with 2x the
* message count threshold, it will not be split apart (unless one of the limits listed further down
* is crossed); only one bundle will be sent. Each threshold is an independent trigger and doesn't
* have any knowledge of the other thresholds.
* <p>These thresholds are treated as triggers, not as limits. Thus, if a request is made with 2x
* the message count threshold, it will not be split apart (unless one of the limits listed further
* down is crossed); only one bundle will be sent. Each threshold is an independent trigger and
* doesn't have any knowledge of the other thresholds.
*
* <p>
* Two of the values above also have limits:
* <p>Two of the values above also have limits:
*
* <p>
*
* <ul>
* <li><b>Message Count Limit</b>: The limit of the number of messages that the server will accept
* in a single request.
* <li><b>Request Byte Limit</b>: The limit of the byte size of a request that the server will
* accept.
* <li><b>Message Count Limit</b>: The limit of the number of messages that the server will accept
* in a single request.
* <li><b>Request Byte Limit</b>: The limit of the byte size of a request that the server will
* accept.
* </ul>
*
* <p>
* For these values, individual requests that surpass the limit are rejected, and the bundling logic
* will not bundle together requests in the resulting bundle will surpass the limit. Thus, a bundle
* can be sent that is actually under the threshold if the next request would put the combined
* request over the limit.
* <p>For these values, individual requests that surpass the limit are rejected, and the bundling
* logic will not bundle together requests in the resulting bundle will surpass the limit. Thus, a
* bundle can be sent that is actually under the threshold if the next request would put the
* combined request over the limit.
*
* <p>Bundling also supports FlowControl. This can be used to prevent the bundling implementation
* from accumulating messages without limit, resulting eventually in an OutOfMemory exception. This
* can occur if messages are created and added to bundling faster than they can be processed. The
* flow control behavior is controlled using FlowControlSettings.
*/
@AutoValue
public abstract class BundlingSettings {
Expand Down Expand Up @@ -161,6 +160,7 @@ public Builder setRequestByteThreshold(Integer requestByteThreshold) {
*/
public abstract Builder setIsEnabled(Boolean enabled);

/** Set the flow control settings to be used. */
public abstract Builder setFlowControlSettings(FlowControlSettings flowControlSettings);

abstract BundlingSettings autoBuild();
Expand Down
46 changes: 19 additions & 27 deletions src/main/java/com/google/api/gax/bundling/ThresholdBundler.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ public final class ThresholdBundler<E> {
private final BundlingFlowController<E> flowController;

private final Lock lock = new ReentrantLock();
private final Lock addMethodLock = new ReentrantLock();
private final Condition bundleCondition = lock.newCondition();
private Bundle currentOpenBundle;
private List<Bundle> closedBundles = new ArrayList<>();
Expand Down Expand Up @@ -131,38 +130,31 @@ public static <T> Builder<T> newBuilder() {
* @throws FlowControlException
*/
public void add(E e) throws FlowControlException {
final Lock addMethodLock = this.addMethodLock;
final Lock lock = this.lock;
addMethodLock.lock();
// We need to reserve resources from flowController outside the lock, so that they can be
// released by drainNextBundleTo().
flowController.reserve(e);
lock.lock();
try {
flowController.reserve(e);
// We need to reserve resources from flowController outside the lock, but we also need to
// prevent concurrent calls to add(E e) from all reserving flowController resources and then
// waiting to acquire the lock, so we use the additional addMethodLock.
lock.lock();
try {
boolean signalBundleIsReady = false;
if (currentOpenBundle == null) {
currentOpenBundle = new Bundle(thresholdPrototypes, maxDelay);
currentOpenBundle.start();
signalBundleIsReady = true;
}
boolean signalBundleIsReady = false;
if (currentOpenBundle == null) {
currentOpenBundle = new Bundle(thresholdPrototypes, maxDelay);
currentOpenBundle.start();
signalBundleIsReady = true;
}

currentOpenBundle.add(e);
if (currentOpenBundle.isAnyThresholdReached()) {
signalBundleIsReady = true;
closedBundles.add(currentOpenBundle);
currentOpenBundle = null;
}
currentOpenBundle.add(e);
if (currentOpenBundle.isAnyThresholdReached()) {
signalBundleIsReady = true;
closedBundles.add(currentOpenBundle);
currentOpenBundle = null;
}

if (signalBundleIsReady) {
bundleCondition.signalAll();
}
} finally {
lock.unlock();
if (signalBundleIsReady) {
bundleCondition.signalAll();
}
} finally {
addMethodLock.unlock();
lock.unlock();
}
}

Expand Down
18 changes: 9 additions & 9 deletions src/main/java/com/google/api/gax/core/FlowControlSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
*/
package com.google.api.gax.core;

import com.google.api.gax.core.FlowController.FlowControlException;
import com.google.api.gax.core.FlowController.LimitExceededBehavior;
import com.google.auto.value.AutoValue;
import com.google.common.base.Preconditions;
Expand All @@ -49,12 +50,14 @@ public static FlowControlSettings getDefaultInstance() {
@Nullable
public abstract Integer getMaxOutstandingRequestBytes();

/** Is flow control enabled. Defaults to true. */
public abstract boolean getIsEnabled();

/**
* The behavior of FlowController when the specified limits are exceeded. Defaults to
* ThrowException.
* The behavior of {@link FlowController} when the specified limits are exceeded. Defaults to
* Block.
*
* <p>The expected behavior for each of these values is: ThrowException: the FlowController will
* throw a {@link FlowControlException} if any of the limits are exceeded. Block: the reserve()
* method of FlowController will block until the quote is available to be reserved. Ignore: all
* flow control limits will be ignored; the FlowController is disabled.

This comment was marked as spam.

*/

This comment was marked as spam.

This comment was marked as spam.

public abstract LimitExceededBehavior getLimitExceededBehavior();

Expand All @@ -64,8 +67,7 @@ public Builder toBuilder() {

public static Builder newBuilder() {
return new AutoValue_FlowControlSettings.Builder()
.setIsEnabled(true)
.setLimitExceededBehavior(LimitExceededBehavior.ThrowException);
.setLimitExceededBehavior(LimitExceededBehavior.Block);
}

@AutoValue.Builder
Expand All @@ -74,8 +76,6 @@ public abstract static class Builder {

public abstract Builder setMaxOutstandingRequestBytes(Integer value);

public abstract Builder setIsEnabled(boolean value);

public abstract Builder setLimitExceededBehavior(LimitExceededBehavior value);

abstract FlowControlSettings autoBuild();
Expand Down
36 changes: 22 additions & 14 deletions src/main/java/com/google/api/gax/core/FlowController.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ public abstract static class FlowControlException extends Exception {
private FlowControlException() {}
}

/**
* Runtime exception that can be used in place of FlowControlException when an unchecked exception
* is required.
*/
public static class FlowControlRuntimeException extends RuntimeException {
private FlowControlRuntimeException(FlowControlException e) {
super(e);
Expand Down Expand Up @@ -96,43 +100,50 @@ public String toString() {
}
}

/**
* Enumeration of behaviors that FlowController can use in case the flow control limits are
* exceeded.
*/
public enum LimitExceededBehavior {
ThrowException,
Block,
Ignore,
}

@Nullable private final Semaphore outstandingElementCount;
@Nullable private final Semaphore outstandingByteCount;
private final boolean flowControlEnabled;
private final boolean failOnLimits;
@Nullable private final Integer maxOutstandingElementCount;
@Nullable private final Integer maxOutstandingRequestBytes;

public FlowController(FlowControlSettings settings) {
this.maxOutstandingElementCount = settings.getMaxOutstandingElementCount();
this.maxOutstandingRequestBytes = settings.getMaxOutstandingRequestBytes();
outstandingElementCount =
maxOutstandingElementCount != null ? new Semaphore(maxOutstandingElementCount) : null;
outstandingByteCount =
maxOutstandingRequestBytes != null ? new Semaphore(maxOutstandingRequestBytes) : null;
this.flowControlEnabled = settings.getIsEnabled();
switch (settings.getLimitExceededBehavior()) {
case ThrowException:
this.failOnLimits = true;
break;
case Block:
this.failOnLimits = false;
break;
case Ignore:
this.failOnLimits = false;
this.maxOutstandingElementCount = null;
this.maxOutstandingRequestBytes = null;
this.outstandingElementCount = null;
this.outstandingByteCount = null;
return;
default:
throw new IllegalArgumentException(
"Unknown LimitBehaviour: " + settings.getLimitExceededBehavior());
}
this.maxOutstandingElementCount = settings.getMaxOutstandingElementCount();
this.maxOutstandingRequestBytes = settings.getMaxOutstandingRequestBytes();
outstandingElementCount =
maxOutstandingElementCount != null ? new Semaphore(maxOutstandingElementCount) : null;
outstandingByteCount =
maxOutstandingRequestBytes != null ? new Semaphore(maxOutstandingRequestBytes) : null;
}

public void reserve(int elements, int bytes) throws FlowControlException {
if (!flowControlEnabled) {
return;
}
Preconditions.checkArgument(elements > 0);

if (outstandingElementCount != null) {
Expand All @@ -156,9 +167,6 @@ public void reserve(int elements, int bytes) throws FlowControlException {
}

public void release(int elements, int bytes) {
if (!flowControlEnabled) {
return;
}
Preconditions.checkArgument(elements > 0);

if (outstandingElementCount != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,11 @@
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package com.google.api.gax.grpc;
package com.google.api.gax.core;

import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import com.google.api.gax.core.FlowControlSettings;
import com.google.api.gax.core.FlowController;
import com.google.api.gax.core.FlowController.LimitExceededBehavior;
import com.google.common.util.concurrent.SettableFuture;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -104,6 +102,20 @@ public void testReserveRelease_noLimits_ok() throws Exception {
flowController.reserve(1, 1);
flowController.release(1, 1);
}

@Test
public void testReserveRelease_ignore_ok() throws Exception {
FlowController flowController =
new FlowController(
FlowControlSettings.newBuilder()
.setMaxOutstandingElementCount(1)
.setMaxOutstandingRequestBytes(1)
.setLimitExceededBehavior(LimitExceededBehavior.Ignore)
.build());

flowController.reserve(1, 1);
flowController.release(1, 1);
}

@Test
public void testReserveRelease_blockedByElementCount() throws Exception {
Expand Down