Skip to content
This repository has been archived by the owner on Sep 26, 2023. It is now read-only.

Add bundling flow control #213

Merged

Conversation

michaelbausor
Copy link
Contributor

This is WIP to make sure I am on the right track adding FlowController support to bundling.

One question I had is: at what point (if any) do we want to convert a FlowControlException into an ApiException? Note that if we don't do this, we will end up adding a checked exception (FlowControlException) into the signature of FutureCallable.

Copy link
Member

@garrettjonesgoogle garrettjonesgoogle left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should add a new checked exception into the signature of FutureCallable.

@@ -0,0 +1,65 @@
/*
* Copyright 2016, Google Inc. All rights reserved.

This comment was marked as spam.

This comment was marked as spam.

Copy link
Contributor

@pongad pongad left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand the sentiment of not adding more checked exceptions. Maybe RuntimeException with @throw doc?

outputCollection.addAll(outBundle.getData());
List<E> data = outBundle.getData();
outputCollection.addAll(data);
if (flowController != null) {

This comment was marked as spam.

This comment was marked as spam.

@@ -103,6 +103,12 @@
/** Returns the Boolean object to indicate if the bundling is enabled. Default to true */
public abstract Boolean getIsEnabled();

public abstract Boolean getIsFlowControlEnabled();

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

@@ -155,6 +161,12 @@ public Builder setRequestByteThreshold(Integer requestByteThreshold) {
*/
public abstract Builder setIsEnabled(Boolean enabled);

public abstract Builder setIsFlowControlEnabled(Boolean enabled);

public abstract Builder setFailOnFlowControlLimits(Boolean failOnFlowControlLimits);

This comment was marked as spam.

This comment was marked as spam.

package com.google.api.gax.bundling;

import com.google.api.gax.grpc.FlowController;
import com.google.api.gax.grpc.FlowController.FlowControlException;

This comment was marked as spam.

This comment was marked as spam.

Copy link
Contributor Author

@michaelbausor michaelbausor left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PTAL. More work to come before complete.

@@ -0,0 +1,65 @@
/*
* Copyright 2016, Google Inc. All rights reserved.

This comment was marked as spam.

package com.google.api.gax.bundling;

import com.google.api.gax.grpc.FlowController;
import com.google.api.gax.grpc.FlowController.FlowControlException;

This comment was marked as spam.

outputCollection.addAll(outBundle.getData());
List<E> data = outBundle.getData();
outputCollection.addAll(data);
if (flowController != null) {

This comment was marked as spam.

@@ -155,6 +161,12 @@ public Builder setRequestByteThreshold(Integer requestByteThreshold) {
*/
public abstract Builder setIsEnabled(Boolean enabled);

public abstract Builder setIsFlowControlEnabled(Boolean enabled);

public abstract Builder setFailOnFlowControlLimits(Boolean failOnFlowControlLimits);

This comment was marked as spam.

@@ -103,6 +103,12 @@
/** Returns the Boolean object to indicate if the bundling is enabled. Default to true */
public abstract Boolean getIsEnabled();

public abstract Boolean getIsFlowControlEnabled();

This comment was marked as spam.

- Move FlowController into core
- Move BundlingSettings in bundling
- Address other PR feedback
@michaelbausor
Copy link
Contributor Author

Do you have a preference for (a) a new FlowControlRuntimeException type? Or (b) use ApiException, with status code UNKNOWN. My initial thought was to use ApiException, but it seems that we are not using ApiException except in cases where it is the remote call that has failed.

Copy link
Member

@garrettjonesgoogle garrettjonesgoogle left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Who is "you" here? As for me, I think I'd to have the bundling code throw FlowControlRuntimeException, and limit the usage of ApiException to the callable classes. I think the question of whether FlowControlRuntimeException is wrapped in ApiException when thrown from the callable classes depends on whether users can take specific action to recover from FlowControlRuntimeException or not.

@@ -103,6 +104,10 @@
/** Returns the Boolean object to indicate if the bundling is enabled. Default to true */
public abstract Boolean getIsEnabled();

/** Get the flow control settings to use. */
@Nullable
public abstract FlowControlSettings getFlowControlSettings();

This comment was marked as spam.

* Control whether the FlowController will fail with an exception or block the thread when flow
* control limits are exceeded.
*/
public abstract boolean getShouldFailOnFlowControlLimits();

This comment was marked as spam.

* Build the ThresholdBundler.
*/
/** Set the flow controller for the ThresholdBundler. */
public Builder<E> setFlowControler(BundlingFlowController<E> flowController) {

This comment was marked as spam.

@michaelbausor
Copy link
Contributor Author

"You" is @garrettjonesgoogle and @pongad

Updated to address feedback and add tests and runtime exception, PTAL

@@ -129,11 +129,14 @@ private Builder() {
*
* @throws FlowControlException
*/
public void add(E e) throws FlowControlException {
public synchronized void add(E e) throws FlowControlException {

This comment was marked as spam.

This comment was marked as spam.

flowController.reserve(e);
// We need to reserve resources from flowController outside the lock, but we also need to
// prevent concurrent calls to add(E e) from all reserving flowController resources and then
// waiting to acquire the lock, so this method is marked as synchronized.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

return new AutoValue_FlowControlSettings.Builder().setIsEnabled(true);
return new AutoValue_FlowControlSettings.Builder()
.setIsEnabled(true)
.setExceedLimitBehavior(LimitExceededBehavior.ThrowException);

This comment was marked as spam.

This comment was marked as spam.

Copy link
Contributor Author

@michaelbausor michaelbausor left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PTAL

flowController.reserve(e);
// We need to reserve resources from flowController outside the lock, but we also need to
// prevent concurrent calls to add(E e) from all reserving flowController resources and then
// waiting to acquire the lock, so this method is marked as synchronized.

This comment was marked as spam.

@@ -129,11 +129,14 @@ private Builder() {
*
* @throws FlowControlException
*/
public void add(E e) throws FlowControlException {
public synchronized void add(E e) throws FlowControlException {

This comment was marked as spam.

return new AutoValue_FlowControlSettings.Builder().setIsEnabled(true);
return new AutoValue_FlowControlSettings.Builder()
.setIsEnabled(true)
.setExceedLimitBehavior(LimitExceededBehavior.ThrowException);

This comment was marked as spam.

@codecov-io
Copy link

codecov-io commented Feb 28, 2017

Codecov Report

Merging #213 into master will increase coverage by 0.44%.
The diff coverage is 89.15%.

@@             Coverage Diff              @@
##             master     #213      +/-   ##
============================================
+ Coverage      69.9%   70.35%   +0.44%     
- Complexity      475      484       +9     
============================================
  Files            66       67       +1     
  Lines          2479     2540      +61     
  Branches        262      268       +6     
============================================
+ Hits           1733     1787      +54     
- Misses          649      655       +6     
- Partials         97       98       +1
Impacted Files Coverage Δ Complexity Δ
.../com/google/api/gax/grpc/BundlingCallSettings.java 78.78% <ø> (ø) 3 <0> (ø)
...in/java/com/google/api/gax/grpc/UnaryCallable.java 90% <ø> (ø) 17 <0> (ø)
...e/api/gax/bundling/ThresholdBundlingForwarder.java 87.5% <ø> (+3.12%) 4 <0> (ø)
...oogle/api/gax/bundling/BundlingFlowController.java 100% <100%> (ø) 3 <3> (?)
...a/com/google/api/gax/core/FlowControlSettings.java 81.25% <100%> (ø) 3 <1> (?)
.../com/google/api/gax/bundling/BundlingSettings.java 72.22% <100%> (ø) 3 <1> (?)
.../com/google/api/gax/bundling/ThresholdBundler.java 96.72% <100%> (+0.39%) 28 <4> (+1)
...java/com/google/api/gax/grpc/BundlingCallable.java 88.88% <50%> (-11.12%) 3 <0> (ø)
...n/java/com/google/api/gax/core/FlowController.java 77.96% <68.42%> (ø) 17 <17> (?)
...n/java/com/google/api/gax/grpc/BundlerFactory.java 83.05% <90%> (+1.41%) 11 <1> (+1)
... and 3 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update f049262...d02a888. Read the comment docs.

currentOpenBundle.start();
signalBundleIsReady = true;
}
flowController.reserve(e);

This comment was marked as spam.

This comment was marked as spam.

@@ -49,12 +49,23 @@ public static FlowControlSettings getDefaultInstance() {
@Nullable
public abstract Integer getMaxOutstandingRequestBytes();

/** Is flow control enabled. Defaults to true. */
public abstract boolean getIsEnabled();

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

@Nullable private final Semaphore outstandingElementCount;
@Nullable private final Semaphore outstandingByteCount;
private final boolean flowControlEnabled;

This comment was marked as spam.

This comment was marked as spam.

return new AutoValue_FlowControlSettings.Builder();
return new AutoValue_FlowControlSettings.Builder()
.setIsEnabled(true)
.setLimitExceededBehavior(LimitExceededBehavior.ThrowException);

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

* <p>The expected behavior for each of these values is: ThrowException: the FlowController will
* throw a {@link FlowControlException} if any of the limits are exceeded. Block: the reserve()
* method of FlowController will block until the quote is available to be reserved. Ignore: all
* flow control limits will be ignored; the FlowController is disabled.

This comment was marked as spam.

new FlowController(
bundlingSettings.getFlowControlSettings() != null
? bundlingSettings.getFlowControlSettings()
: FlowControlSettings.newBuilder().setIsEnabled(false).build());

This comment was marked as spam.

This comment was marked as spam.

* limits are exceeded.
* <li>Block: the reserve() method of FlowController will block until the quote is available to be
* reserved.
* <li>Ignore: all flow control limits will be ignored; the FlowController is disabled.
*/

This comment was marked as spam.

This comment was marked as spam.

@michaelbausor
Copy link
Contributor Author

PTAL

@michaelbausor michaelbausor changed the title WIP: add bundling flow control Add bundling flow control Mar 1, 2017
@garrettjonesgoogle
Copy link
Member

LGTM - @pongad ?

@michaelbausor michaelbausor merged commit ec2e656 into googleapis:master Mar 1, 2017
@michaelbausor michaelbausor deleted the bundling-flow-control branch March 1, 2017 16:14
michaelbausor added a commit to michaelbausor/gax-java that referenced this pull request Mar 3, 2017
michaelbausor added a commit that referenced this pull request Mar 3, 2017
michaelbausor added a commit to michaelbausor/gax-java that referenced this pull request Mar 3, 2017
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants