Skip to content

Commit

Permalink
Revert "Add bundling flow control (googleapis#213)"
Browse files Browse the repository at this point in the history
This reverts commit ec2e656.
  • Loading branch information
michaelbausor committed Mar 3, 2017
1 parent b246dbe commit ddd8309
Show file tree
Hide file tree
Showing 14 changed files with 56 additions and 399 deletions.

This file was deleted.

34 changes: 7 additions & 27 deletions src/main/java/com/google/api/gax/bundling/ThresholdBundler.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
*/
package com.google.api.gax.bundling;

import com.google.api.gax.core.FlowController.FlowControlException;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableList;
Expand All @@ -51,20 +50,15 @@ public final class ThresholdBundler<E> {

private ImmutableList<BundlingThreshold<E>> thresholdPrototypes;
private final Duration maxDelay;
private final BundlingFlowController<E> flowController;

private final Lock lock = new ReentrantLock();
private final Condition bundleCondition = lock.newCondition();
private Bundle currentOpenBundle;
private List<Bundle> closedBundles = new ArrayList<>();

private ThresholdBundler(
ImmutableList<BundlingThreshold<E>> thresholds,
Duration maxDelay,
BundlingFlowController<E> flowController) {
private ThresholdBundler(ImmutableList<BundlingThreshold<E>> thresholds, Duration maxDelay) {
this.thresholdPrototypes = copyResetThresholds(Preconditions.checkNotNull(thresholds));
this.maxDelay = maxDelay;
this.flowController = Preconditions.checkNotNull(flowController);
this.currentOpenBundle = null;
}

Expand All @@ -74,7 +68,6 @@ private ThresholdBundler(
public static final class Builder<E> {
private List<BundlingThreshold<E>> thresholds;
private Duration maxDelay;
private BundlingFlowController<E> flowController;

private Builder() {
thresholds = Lists.newArrayList();
Expand Down Expand Up @@ -104,15 +97,11 @@ public Builder<E> addThreshold(BundlingThreshold<E> threshold) {
return this;
}

/** Set the flow controller for the ThresholdBundler. */
public Builder<E> setFlowController(BundlingFlowController<E> flowController) {
this.flowController = flowController;
return this;
}

/** Build the ThresholdBundler. */
/**
* Build the ThresholdBundler.
*/
public ThresholdBundler<E> build() {
return new ThresholdBundler<E>(ImmutableList.copyOf(thresholds), maxDelay, flowController);
return new ThresholdBundler<E>(ImmutableList.copyOf(thresholds), maxDelay);
}
}

Expand All @@ -126,14 +115,9 @@ public static <T> Builder<T> newBuilder() {
/**
* Adds an element to the bundler. If the element causes the collection to go past any of the
* thresholds, the bundle will be made available to consumers.
*
* @throws FlowControlException
*/
public void add(E e) throws FlowControlException {
public void add(E e) {
final Lock lock = this.lock;
// We need to reserve resources from flowController outside the lock, so that they can be
// released by drainNextBundleTo().
flowController.reserve(e);
lock.lock();
try {
boolean signalBundleIsReady = false;
Expand Down Expand Up @@ -195,11 +179,7 @@ public int drainNextBundleTo(Collection<? super E> outputCollection) {
}

if (outBundle != null) {
List<E> data = outBundle.getData();
for (E e : data) {
flowController.release(e);
}
outputCollection.addAll(data);
outputCollection.addAll(outBundle.getData());
return outputCollection.size();
} else {
return 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
*/
package com.google.api.gax.bundling;

import com.google.api.gax.core.FlowController.FlowControlException;
import java.util.ArrayList;
import java.util.List;

Expand Down Expand Up @@ -66,10 +65,8 @@ public void start() {
/**
* First validates that the receiver can receive the given item (based on the inherent
* characteristics of the item), and then hands it off to the bundler.
*
* @throws FlowControlException
*/
public void addToNextBundle(T item) throws FlowControlException {
public void addToNextBundle(T item) {
bundleReceiver.validateItem(item);
bundler.add(item);
}
Expand Down
32 changes: 0 additions & 32 deletions src/main/java/com/google/api/gax/grpc/BundlerFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,11 @@
*/
package com.google.api.gax.grpc;

import com.google.api.gax.bundling.BundlingFlowController;
import com.google.api.gax.bundling.BundlingSettings;
import com.google.api.gax.bundling.BundlingThreshold;
import com.google.api.gax.bundling.ElementCounter;
import com.google.api.gax.bundling.NumericThreshold;
import com.google.api.gax.bundling.ThresholdBundler;
import com.google.api.gax.bundling.ThresholdBundlingForwarder;
import com.google.api.gax.core.FlowControlSettings;
import com.google.api.gax.core.FlowController;
import com.google.api.gax.core.FlowController.LimitExceededBehavior;
import com.google.common.collect.ImmutableList;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -55,7 +50,6 @@ public final class BundlerFactory<RequestT, ResponseT> implements AutoCloseable
private final Map<String, ThresholdBundlingForwarder<BundlingContext<RequestT, ResponseT>>>
forwarders = new ConcurrentHashMap<>();
private final BundlingDescriptor<RequestT, ResponseT> bundlingDescriptor;
private final FlowController flowController;
private final BundlingSettings bundlingSettings;
private final Object lock = new Object();

Expand All @@ -64,13 +58,6 @@ public BundlerFactory(
BundlingSettings bundlingSettings) {
this.bundlingDescriptor = bundlingDescriptor;
this.bundlingSettings = bundlingSettings;
this.flowController =
new FlowController(
bundlingSettings.getFlowControlSettings() != null
? bundlingSettings.getFlowControlSettings()
: FlowControlSettings.newBuilder()
.setLimitExceededBehavior(LimitExceededBehavior.Ignore)
.build());
}

/**
Expand Down Expand Up @@ -110,31 +97,12 @@ private ThresholdBundlingForwarder<BundlingContext<RequestT, ResponseT>> createF
ThresholdBundler.<BundlingContext<RequestT, ResponseT>>newBuilder()
.setThresholds(getThresholds(bundlingSettings))
.setMaxDelay(bundlingSettings.getDelayThreshold())
.setFlowController(createBundlingFlowController())
.build();
BundleExecutor<RequestT, ResponseT> processor =
new BundleExecutor<>(bundlingDescriptor, partitionKey);
return new ThresholdBundlingForwarder<>(bundler, processor);
}

private BundlingFlowController<BundlingContext<RequestT, ResponseT>>
createBundlingFlowController() {
return new BundlingFlowController<BundlingContext<RequestT, ResponseT>>(
flowController,
new ElementCounter<BundlingContext<RequestT, ResponseT>>() {
@Override
public long count(BundlingContext<RequestT, ResponseT> bundlablePublish) {
return bundlingDescriptor.countElements(bundlablePublish.getRequest());
}
},
new ElementCounter<BundlingContext<RequestT, ResponseT>>() {
@Override
public long count(BundlingContext<RequestT, ResponseT> bundlablePublish) {
return bundlingDescriptor.countBytes(bundlablePublish.getRequest());
}
});
}

@Override
public void close() {
synchronized (lock) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
*/
package com.google.api.gax.grpc;

import com.google.api.gax.bundling.BundlingSettings;
import com.google.api.gax.core.RetrySettings;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
Expand Down
10 changes: 2 additions & 8 deletions src/main/java/com/google/api/gax/grpc/BundlingCallable.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@
package com.google.api.gax.grpc;

import com.google.api.gax.bundling.ThresholdBundlingForwarder;
import com.google.api.gax.core.FlowController.FlowControlException;
import com.google.api.gax.core.FlowController.FlowControlRuntimeException;
import com.google.api.gax.core.RpcFuture;
import com.google.common.base.Preconditions;

Expand Down Expand Up @@ -68,12 +66,8 @@ public RpcFuture<ResponseT> futureCall(RequestT request, CallContext context) {
String partitionKey = bundlingDescriptor.getBundlePartitionKey(request);
ThresholdBundlingForwarder<BundlingContext<RequestT, ResponseT>> forwarder =
bundlerFactory.getForwarder(partitionKey);
try {
forwarder.addToNextBundle(bundlableMessage);
return result;
} catch (FlowControlException e) {
throw FlowControlRuntimeException.fromFlowControlException(e);
}
forwarder.addToNextBundle(bundlableMessage);
return result;
} else {
return callable.futureCall(request, context);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,8 @@
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package com.google.api.gax.bundling;
package com.google.api.gax.grpc;

import com.google.api.gax.core.FlowControlSettings;
import com.google.auto.value.AutoValue;
import com.google.common.base.Preconditions;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -86,12 +85,6 @@
* will not bundle together requests in the resulting bundle will surpass the limit. Thus, a bundle
* can be sent that is actually under the threshold if the next request would put the combined
* request over the limit.
*
* <p>
* Bundling also supports FlowControl. This can be used to prevent the bundling implementation from
* accumulating messages without limit, resulting eventually in an OutOfMemory exception. This can
* occur if messages are created and added to bundling faster than they can be processed. The flow
* control behavior is controlled using FlowControlSettings.
*/
@AutoValue
public abstract class BundlingSettings {
Expand All @@ -110,14 +103,9 @@ public abstract class BundlingSettings {
/** Returns the Boolean object to indicate if the bundling is enabled. Default to true */
public abstract Boolean getIsEnabled();

/** Get the flow control settings to use. */
public abstract FlowControlSettings getFlowControlSettings();

/** Get a new builder. */
public static Builder newBuilder() {
return new AutoValue_BundlingSettings.Builder()
.setIsEnabled(true)
.setFlowControlSettings(FlowControlSettings.getDefaultInstance());
return new AutoValue_BundlingSettings.Builder().setIsEnabled(true);
}

/** Get a builder with the same values as this object. */
Expand Down Expand Up @@ -167,9 +155,6 @@ public Builder setRequestByteThreshold(Integer requestByteThreshold) {
*/
public abstract Builder setIsEnabled(Boolean enabled);

/** Set the flow control settings to be used. */
public abstract Builder setFlowControlSettings(FlowControlSettings flowControlSettings);

abstract BundlingSettings autoBuild();

/** Build the BundlingSettings object. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,11 @@
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package com.google.api.gax.core;
package com.google.api.gax.grpc;

import com.google.api.gax.core.FlowController.FlowControlException;
import com.google.api.gax.core.FlowController.LimitExceededBehavior;
import com.google.auto.value.AutoValue;
import com.google.common.base.Preconditions;
import java.util.concurrent.Semaphore;
import javax.annotation.Nullable;

/** Settings for {@link FlowController}. */
Expand All @@ -50,30 +49,12 @@ public static FlowControlSettings getDefaultInstance() {
@Nullable
public abstract Integer getMaxOutstandingRequestBytes();

/**
* The behavior of {@link FlowController} when the specified limits are exceeded. Defaults to
* Block.
*
* <p>
* The expected behavior for each of these values is:
*
* <ul>
* <li>ThrowException: the FlowController will throw a {@link FlowControlException} if any of the
* limits are exceeded.
* <li>Block: the reserve() method of FlowController will block until the quote is available to be
* reserved.
* <li>Ignore: all flow control limits will be ignored; the FlowController is disabled.
* </ul>
*/
public abstract LimitExceededBehavior getLimitExceededBehavior();

public Builder toBuilder() {
return new AutoValue_FlowControlSettings.Builder(this);
}

public static Builder newBuilder() {
return new AutoValue_FlowControlSettings.Builder()
.setLimitExceededBehavior(LimitExceededBehavior.Block);
return new AutoValue_FlowControlSettings.Builder();
}

@AutoValue.Builder
Expand All @@ -82,8 +63,6 @@ public abstract static class Builder {

public abstract Builder setMaxOutstandingRequestBytes(Integer value);

public abstract Builder setLimitExceededBehavior(LimitExceededBehavior value);

abstract FlowControlSettings autoBuild();

public FlowControlSettings build() {
Expand Down
Loading

0 comments on commit ddd8309

Please sign in to comment.