Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mobile: Remove Executor from EnvoyHTTPCallbacks API #32776

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions mobile/docs/root/api/grpc.rst
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ Start and interact with a gRPC stream in **Kotlin**::
}
.setOnError { ... }
.setOnCancel { ... }
.start(Executors.newSingleThreadExecutor())
.start()
.sendHeaders(headers, false)
.sendMessage(...)
...
Expand Down Expand Up @@ -180,7 +180,7 @@ stream.
.newGRPCStreamPrototype()
...
val stream = prototype
.start(Executors.newSingleThreadExecutor())
.start()
.sendHeaders(...)
.sendMessage(...)

Expand Down
6 changes: 3 additions & 3 deletions mobile/docs/root/api/http.rst
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ Start and interact with an HTTP stream in **Kotlin**::
}
.setOnError { ... }
.setOnCancel { ... }
.start(Executors.newSingleThreadExecutor())
.start()
.sendHeaders(...)
.sendData(...)

Expand Down Expand Up @@ -166,7 +166,7 @@ Doing so returns a ``Stream`` which allows the sender to interact with the strea
.newStreamPrototype()
...
val stream = prototype
.start(Executors.newSingleThreadExecutor())
.start()
.sendHeaders(...)
.sendData(...)

Expand Down Expand Up @@ -215,7 +215,7 @@ For example:
.build()
val stream = streamClient
.newStreamPrototype()
.start(Executors.newSingleThreadExecutor())
.start()

// Headers-only
stream.sendHeaders(requestHeaders, true)
Expand Down
3 changes: 1 addition & 2 deletions mobile/examples/java/hello_world/MainActivity.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import io.envoyproxy.envoymobile.shared.Success;
import kotlin.Unit;

import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.Arrays;
import java.util.HashSet;
Expand Down Expand Up @@ -131,7 +130,7 @@ private void makeRequest() {
recyclerView.post(() -> viewAdapter.add(new Failure(message)));
return Unit.INSTANCE;
})
.start(Executors.newSingleThreadExecutor())
.start()
.sendHeaders(requestHeaders, true);

clear_text = !clear_text;
Expand Down
3 changes: 1 addition & 2 deletions mobile/examples/kotlin/hello_world/MainActivity.kt
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import io.envoyproxy.envoymobile.shared.Failure
import io.envoyproxy.envoymobile.shared.ResponseRecyclerViewAdapter
import io.envoyproxy.envoymobile.shared.Success
import java.io.IOException
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit

private const val REQUEST_HANDLER_THREAD_NAME = "hello_envoy_kt"
Expand Down Expand Up @@ -141,7 +140,7 @@ class MainActivity : Activity() {
Log.d("MainActivity", message)
recyclerView.post { viewAdapter.add(Failure(message)) }
}
.start(Executors.newSingleThreadExecutor())
.start()
.sendHeaders(requestHeaders, true)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,7 @@ void passHeader(byte[] key, byte[] value, boolean start) {
public Object onResponseHeaders(long headerCount, boolean endStream, long[] streamIntel) {
assert bridgeUtility.validateCount(headerCount);
final Map<String, List<String>> headers = bridgeUtility.retrieveHeaders();

callbacks.getExecutor().execute(
() -> callbacks.onHeaders(headers, endStream, new EnvoyStreamIntelImpl(streamIntel)));

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

which thread is onResponseHeaders called from? is it the Envoy worker thread?

same question for other functions below like onResponseData

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's whichever thread that calls the callback, which is probably the worker thread? The idea for the original design is to not block the Envoy main thread, which makes sense. However, this design imposes the requirement that every callback must be executed in a separate thread from within the Envoy Mobile layer, which may not always be desirable depending on the needs. This change moves the responsibility to the EnvoyHTTPCallbacks implementer. For example, with Cronvoy as an implementer of the EnvoyHTTPCallbacks, it can decide when to execute the callback in a separate thread. If you noticed, in the Cronvoy implementation, it had DirectExecutor (it is removed in this PR), which is basically a no-op Executor because Cronvoy does not want every callback to be executed in a separate thread.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the Runnable task created in CronvoyUrlRequest.onData (https://github.com/envoyproxy/envoy/pull/32776/files#diff-8f04efffbc03f667d43c68783a3480f3b0b31cc61e136a07c778e5c177aca969R876) gets executed in the thread supplied by the Executor?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before this PR, there were two Executors used in CronvoyUrlRequest:

The method below came from EnvoyHTTPCallbacks and the DirectExecutor implementation basically does nothing.

@Override
public Executor getExecutor() {
  return DIRECT_EXECUTOR;
}

The code in CronvoyUrlRequest.onData uses execute, which runs on mUserExecutor.

This PR removes EnvoyHTTPCallbacks::getExecutor. It won't matter for Cronvoy because returns a no-op Executor anyway for EnvoyHTTPCallbacks::getExecutor, but Cronvoy still uses mUserExecutor which is a user-supplied Executor. By not having EnvoyHTTPCallbacks::getExecutor(), Envoy Mobile no longer needs to perform a copy of the ByteBuffer before passing it to a user-defined onResponseData callback in the JvmCallbackContext.

From Envoy Mobile standpoint, we now only have Envoy::Buffer::Instance --> envoy_data and the data passed into onResponseData callback will be backed by envoy_data directly without any copy. The implementer of EnvoyHTTPCallbacks::onData can still make a copy if needed (just like the case with Cronvoy).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so if we want to remove the enforcement that all up calls be run on the non-network thread, I'd think we'd want to ensure all current calls (unless there's exceptions) be run on the non-network thread. I don't see any changes to the non-cronvoy up calls. Are we running those all on other threads already, or would this be a major regression for all non-cronvoy E-M?

Largely I think having an invariant to not run upcalls on the network thread is a good call. Is there any other way to avoid the copy?
/wait-any

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see any changes to the non-cronvoy up calls.

Yeah, for Cronvoy, it uses its own Executor to run on the non-network thread:

Runnable task = new Runnable() {
@Override
public void run() {
checkCallingThread();
try {
ByteBuffer userBuffer = mUserCurrentReadBuffer;
mUserCurrentReadBuffer = null; // Avoid the reference to a potentially large buffer.
int dataRead = data.remaining();
userBuffer.put(data); // NPE ==> BUG, BufferOverflowException ==> User not behaving.
if (dataRead > 0 || !endStream) {
mWaitingOnRead.set(true);
mCallback.onReadCompleted(CronvoyUrlRequest.this, mUrlResponseInfo, userBuffer);
}
} catch (Throwable t) {
onCallbackException(t);
}
}
};
execute(task);

I don't see any changes to the non-cronvoy up calls. Are we running those all on other threads already, or would this be a major regression for all non-cronvoy E-M?

The idea is to let the users control that behavior whether or not they want to run the callbacks on the non-network thread. IOW, we don't want the library (the Java/Kotlin library in this case) to enforce that. If we look at the current implementation of onResponseData, it currently does a copy of the ByteBuffer before passing it to the Executor even though the Executor implementation maybe a direct executor (no-op), which is the case the case with the Cronvoy implementation. From the Java/Kotlin library standpoint, there's no way to know if it uses a no-op Executor, so the safest way is to always create a copy.

Before this PR, Envoy Mobile would create 2 copies:

  1. JvmCallbackContext::onResponseData
  2. CronvoyUrlRequest::onData

After this PR, Cronvoy will only do one copy.

I agree that this change may inconvenience the users a little bit since now they have to decide whether or not to run certain callbacks on non-network threads, but at the same time the library will no longer need to always pay the penalty of creating a copy.

Another option that I can think of is changing the EnvoyHTTPCallbacks::getExecutor to something like

Optional<Executor> getExecutor

With this change, if the Executor is empty, we don't need to create a copy. For Cronvoy, we will set it to empty. Let me know what you think.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"The idea is to let the users control that behavior whether or not they want to run the callbacks on the non-network thread." Hm, I'm not sure about this. While I can imagine that for some trivial applications this might be fine, I'm very concerned that this would be a massive footgun. I think Cronet runs callbacks on a different thread, IIRC, so it would seem like we should somehow be able to get the performance we need while preserving this invariant.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing it to Optional<Executor> getExecutor is probably the best of both worlds? We can still default it to Optional.of(Executors.newSingleThreadExecutor()), but for Cronvoy, we will set it to Optional.empty(). When the Executor is empty, we will not need to do a copy (or the copy can be deferred to the implementer), but when the Executor is not empty, we will do a copy. What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm usually weary of general APIs that take optional values, because most users of an API will truly treat them as optional and not look as closely at them, but this is a major decision that is being left on the user, so I think it's better not to make it optional.

Why does Cronvoy need to create it's own thread in CronvoyUrlRequest if it's already passing in an Executor to have the onResponseData function run in that thread?

To me, from the user's point of view, the ideal API would be:

onResponseData(lambda_to_run, executor_to_run_on)

That way, the invoker doesn't need to create it's own threads to run on, it just passes the executor and the lambda, and the implementation handles running the lambda on the executor with the response data.

I might be misunderstanding something.

As a side note: I agree the copying sucks so it would be nice to get rid of it, but I think the main repercussion of the copies would be memory pressure on the app. I think the effect on CPU usage and latency would be minimal.

Copy link
Member Author

@fredyw fredyw Mar 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does Cronvoy need to create it's own thread in CronvoyUrlRequest if it's already passing in an Executor to have the onResponseData function run in that thread?

For the Cronvoy API, it wants to have control of the Executor per-request. This is not something that the current API supports. That's why Cronvoy implements a no-op Executor in the EnvoyHTTPCallbacks API.

To me, from the user's point of view, the ideal API would be:
onResponseData(lambda_to_run, executor_to_run_on)

This can simply be implemented with below without having to clutter the callback API with passing the Executor in each callback.

client.newStreamPrototype()
  .setOnResponseHeader { .... }
  .start(Optional.of(createFancyExecutor()))

By introducing an Optional<Executor>, the library does not always need to create a copy because it can know whether it needs to create a copy or not.

FTR, the default Executor in the existing code is a primitive Executor.newSingleThreadExecutor, which is good for a toy project, but might not be ideal when running on a real workload.

callbacks.onHeaders(headers, endStream, new EnvoyStreamIntelImpl(streamIntel));
return null;
}

Expand All @@ -54,10 +51,7 @@ public Object onResponseHeaders(long headerCount, boolean endStream, long[] stre
public Object onResponseTrailers(long trailerCount, long[] streamIntel) {
assert bridgeUtility.validateCount(trailerCount);
final Map<String, List<String>> trailers = bridgeUtility.retrieveHeaders();

callbacks.getExecutor().execute(
() -> callbacks.onTrailers(trailers, new EnvoyStreamIntelImpl(streamIntel)));

callbacks.onTrailers(trailers, new EnvoyStreamIntelImpl(streamIntel));
return null;
}

Expand All @@ -70,11 +64,7 @@ public Object onResponseTrailers(long trailerCount, long[] streamIntel) {
* @return Object, not used for response callbacks.
*/
public Object onResponseData(ByteBuffer data, boolean endStream, long[] streamIntel) {
// Create a copy of the `data` because the `data` uses direct `ByteBuffer` and the `data` will
// be destroyed after calling this callback.
ByteBuffer copiedData = ByteBuffers.copy(data);
callbacks.getExecutor().execute(
() -> callbacks.onData(copiedData, endStream, new EnvoyStreamIntelImpl(streamIntel)));
callbacks.onData(data, endStream, new EnvoyStreamIntelImpl(streamIntel));
return null;
}

Expand All @@ -91,13 +81,9 @@ public Object onResponseData(ByteBuffer data, boolean endStream, long[] streamIn
*/
public Object onError(int errorCode, byte[] message, int attemptCount, long[] streamIntel,
long[] finalStreamIntel) {
callbacks.getExecutor().execute(() -> {
String errorMessage = new String(message);
callbacks.onError(errorCode, errorMessage, attemptCount,
new EnvoyStreamIntelImpl(streamIntel),
new EnvoyFinalStreamIntelImpl(finalStreamIntel));
});

String errorMessage = new String(message);
callbacks.onError(errorCode, errorMessage, attemptCount, new EnvoyStreamIntelImpl(streamIntel),
new EnvoyFinalStreamIntelImpl(finalStreamIntel));
return null;
}

Expand All @@ -109,12 +95,9 @@ public Object onError(int errorCode, byte[] message, int attemptCount, long[] st
* @return Object, not used for response callbacks.
*/
public Object onCancel(long[] streamIntel, long[] finalStreamIntel) {
callbacks.getExecutor().execute(() -> {
// This call is atomically gated at the call-site and will only happen once.
callbacks.onCancel(new EnvoyStreamIntelImpl(streamIntel),
new EnvoyFinalStreamIntelImpl(finalStreamIntel));
});

// This call is atomically gated at the call-site and will only happen once.
callbacks.onCancel(new EnvoyStreamIntelImpl(streamIntel),
new EnvoyFinalStreamIntelImpl(finalStreamIntel));
return null;
}

Expand All @@ -125,11 +108,8 @@ public Object onCancel(long[] streamIntel, long[] finalStreamIntel) {
* @return Object, not used for response callbacks.
*/
public Object onSendWindowAvailable(long[] streamIntel) {
callbacks.getExecutor().execute(() -> {
// This call is atomically gated at the call-site and will only happen once.
callbacks.onSendWindowAvailable(new EnvoyStreamIntelImpl(streamIntel));
});

// This call is atomically gated at the call-site and will only happen once.
callbacks.onSendWindowAvailable(new EnvoyStreamIntelImpl(streamIntel));
return null;
}
/**
Expand All @@ -140,12 +120,9 @@ public Object onSendWindowAvailable(long[] streamIntel) {
* @return Object, not used for response callbacks.
*/
public Object onComplete(long[] streamIntel, long[] finalStreamIntel) {
callbacks.getExecutor().execute(() -> {
// This call is atomically gated at the call-site and will only happen once.
callbacks.onComplete(new EnvoyStreamIntelImpl(streamIntel),
new EnvoyFinalStreamIntelImpl(finalStreamIntel));
});

// This call is atomically gated at the call-site and will only happen once.
callbacks.onComplete(new EnvoyStreamIntelImpl(streamIntel),
new EnvoyFinalStreamIntelImpl(finalStreamIntel));
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,8 @@ public Object onRequestHeaders(long headerCount, boolean endStream, long[] strea
* @return Object[], pair of HTTP filter status and optional modified data.
*/
public Object onRequestData(ByteBuffer data, boolean endStream, long[] streamIntel) {
// Create a copy of the `data` because the `data` uses direct `ByteBuffer` and the `data` will
// be destroyed after calling this callback.
ByteBuffer copiedData = ByteBuffers.copy(data);
return toJniFilterDataStatus(
filter.onRequestData(copiedData, endStream, new EnvoyStreamIntelImpl(streamIntel)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: we weren't running this in a separate thread from the looks of it, so why did we need copiedData?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. I did this initially to be consistent with the callbacks API, but now that we have documented what the behavior is going to be with regards to the ByteBuffer, we can simply remove this logic.

filter.onRequestData(data, endStream, new EnvoyStreamIntelImpl(streamIntel)));
}

/**
Expand Down Expand Up @@ -111,11 +108,8 @@ public Object onResponseHeaders(long headerCount, boolean endStream, long[] stre
* @return Object[], pair of HTTP filter status and optional modified data.
*/
public Object onResponseData(ByteBuffer data, boolean endStream, long[] streamIntel) {
// Create a copy of the `data` because the `data` uses direct `ByteBuffer` and the `data` will
// be destroyed after calling this callback.
ByteBuffer copiedData = ByteBuffers.copy(data);
return toJniFilterDataStatus(
filter.onResponseData(copiedData, endStream, new EnvoyStreamIntelImpl(streamIntel)));
filter.onResponseData(data, endStream, new EnvoyStreamIntelImpl(streamIntel)));
}

/**
Expand Down Expand Up @@ -144,9 +138,6 @@ public Object onResponseTrailers(long trailerCount, long[] streamIntel) {
*/
public Object onResumeRequest(long headerCount, ByteBuffer data, long trailerCount,
boolean endStream, long[] streamIntel) {
// Create a copy of the `data` because the `data` uses direct `ByteBuffer` and the `data` will
// be destroyed after calling this callback.
ByteBuffer copiedData = ByteBuffers.copy(data);
// Headers are optional in this call, and a negative length indicates omission.
Map<String, List<String>> headers = null;
if (headerCount >= 0) {
Expand All @@ -159,7 +150,7 @@ public Object onResumeRequest(long headerCount, ByteBuffer data, long trailerCou
assert trailerUtility.validateCount(trailerCount);
trailers = trailerUtility.retrieveHeaders();
}
return toJniFilterResumeStatus(filter.onResumeRequest(headers, copiedData, trailers, endStream,
return toJniFilterResumeStatus(filter.onResumeRequest(headers, data, trailers, endStream,
new EnvoyStreamIntelImpl(streamIntel)));
}

Expand All @@ -175,9 +166,6 @@ public Object onResumeRequest(long headerCount, ByteBuffer data, long trailerCou
*/
public Object onResumeResponse(long headerCount, ByteBuffer data, long trailerCount,
boolean endStream, long[] streamIntel) {
// Create a copy of the `data` because the `data` uses direct `ByteBuffer` and the `data` will
// be destroyed after calling this callback.
ByteBuffer copiedData = ByteBuffers.copy(data);
// Headers are optional in this call, and a negative length indicates omission.
Map<String, List<String>> headers = null;
if (headerCount >= 0) {
Expand All @@ -190,7 +178,7 @@ public Object onResumeResponse(long headerCount, ByteBuffer data, long trailerCo
assert trailerUtility.validateCount(trailerCount);
trailers = trailerUtility.retrieveHeaders();
}
return toJniFilterResumeStatus(filter.onResumeResponse(headers, copiedData, trailers, endStream,
return toJniFilterResumeStatus(filter.onResumeResponse(headers, data, trailers, endStream,
new EnvoyStreamIntelImpl(streamIntel)));
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package io.envoyproxy.envoymobile.engine;

import io.envoyproxy.envoymobile.engine.JvmFilterContext;
import io.envoyproxy.envoymobile.engine.types.EnvoyHTTPFilterFactory;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,6 @@
import java.util.Map;

public interface EnvoyHTTPCallbacks {

Executor getExecutor();

/**
* Called when all headers get received on the async HTTP stream.
*
Expand All @@ -24,6 +21,8 @@ void onHeaders(Map<String, List<String>> headers, boolean endStream,
* callback can be invoked multiple times if the data gets streamed.
*
* @param data, the buffer of the data received.
* The `data` will be destroyed upon completing this callback. Create a copy
* of the `data` if the `data` is going to outlive this callback.
* @param endStream, whether the data is the last data frame.
* @param streamIntel, contains internal HTTP stream metrics, context, and other details.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ Object[] onRequestHeaders(Map<String, List<String>> headers, boolean endStream,
* callback can be invoked multiple times.
*
* @param data, the buffer of the data received.
* The `data` will be destroyed upon completing this callback. Create a copy
* of the `data` if the `data` is going to outlive this callback.
* @param endStream, whether the data is the last data frame.
* @param streamIntel, contains internal HTTP stream metrics, context, and other details.
*/
Expand Down Expand Up @@ -47,7 +49,9 @@ Object[] onResponseHeaders(Map<String, List<String>> headers, boolean endStream,
* Called when a data frame is received on the HTTP stream. This
* callback can be invoked multiple times.
*
* @param data, the buffer of the data received.
* @param data, the buffer of the data received. The `data` will be destroyed upon
* completing this callback. Create a copy of the `data` if the `data` is
* going to outlive this callback.
* @param endStream, whether the data is the last data frame.
* @param streamIntel, contains internal HTTP stream metrics, context, and other details.
*/
Expand All @@ -69,10 +73,12 @@ Object[] onResponseHeaders(Map<String, List<String>> headers, boolean endStream,
void setRequestFilterCallbacks(EnvoyHTTPFilterCallbacks callbacks);

/**
* Called when request filter iteration has been asynchronsouly resumed via callback.
* Called when request filter iteration has been asynchronously resumed via callback.
*
* @param headers, pending headers that have not yet been forwarded along the filter chain.
* @param data, pending data that has not yet been forwarded along the filter chain.
* The `data` will be destroyed upon completing this callback. Create a copy
* of the `data` if the `data` is going to outlive this callback.
* @param trailers, pending trailers that have not yet been forwarded along the filter chain.
* @param streamIntel, contains internal HTTP stream metrics, context, and other details.
*/
Expand All @@ -88,10 +94,12 @@ Object[] onResumeRequest(Map<String, List<String>> headers, ByteBuffer data,
void setResponseFilterCallbacks(EnvoyHTTPFilterCallbacks callbacks);

/**
* Called when response filter iteration has been asynchronsouly resumed via callback.
* Called when response filter iteration has been asynchronously resumed via callback.
*
* @param headers, pending headers that have not yet been forwarded along the filter chain.
* @param data, pending data that has not yet been forwarded along the filter chain.
* The `data` will be destroyed upon completing this callback. Create a copy
* of the `data` if the `data` is going to outlive this callback.
* @param trailers, pending trailers that have not yet been forwarded along the filter chain.
* @param streamIntel, contains internal HTTP stream metrics, context, and other details.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ public final class CronvoyBidirectionalStream
private static final String X_ENVOY = "x-envoy";
private static final String X_ENVOY_SELECTED_TRANSPORT = "x-envoy-upstream-alpn";
private static final String USER_AGENT = "User-Agent";
private static final Executor DIRECT_EXECUTOR = new DirectExecutor();

private final CronvoyUrlRequestContext mRequestContext;
private final Executor mExecutor;
Expand Down Expand Up @@ -932,11 +931,6 @@ private static boolean isValidHeaderName(String header) {
return headers;
}

@Override
public Executor getExecutor() {
return DIRECT_EXECUTOR;
}

@Override
public void onSendWindowAvailable(EnvoyStreamIntel streamIntel) {
switch (mState.nextAction(Event.ON_SEND_WINDOW_AVAILABLE)) {
Expand Down Expand Up @@ -1108,11 +1102,4 @@ private static class ReadBuffer {
this.mInitialLimit = mByteBuffer.limit();
}
}

private static class DirectExecutor implements Executor {
@Override
public void execute(Runnable runnable) {
runnable.run();
}
}
}
Loading
Loading