From b0121284e7e7cf055ff9d61d64a30824c5bc98a2 Mon Sep 17 00:00:00 2001 From: Fredy Wijaya Date: Thu, 7 Mar 2024 17:52:46 +0000 Subject: [PATCH 1/8] mobile: Remove Executor from EnvoyHTTPCallbacks Signed-off-by: Fredy Wijaya --- .../engine/types/EnvoyHTTPCallbacks.java | 2 ++ .../envoymobile/engine/types/EnvoyHTTPFilter.java | 14 +++++++++++--- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/mobile/library/java/io/envoyproxy/envoymobile/engine/types/EnvoyHTTPCallbacks.java b/mobile/library/java/io/envoyproxy/envoymobile/engine/types/EnvoyHTTPCallbacks.java index 890615aea700..f6ee438abf66 100644 --- a/mobile/library/java/io/envoyproxy/envoymobile/engine/types/EnvoyHTTPCallbacks.java +++ b/mobile/library/java/io/envoyproxy/envoymobile/engine/types/EnvoyHTTPCallbacks.java @@ -24,6 +24,8 @@ void onHeaders(Map> headers, boolean endStream, * callback can be invoked multiple times if the data gets streamed. * * @param data, the buffer of the data received. + * The `data` will be destroyed upon completing this callback. Create a copy + * of the `data` if the `data` is going to outlive this callback lifetime. * @param endStream, whether the data is the last data frame. * @param streamIntel, contains internal HTTP stream metrics, context, and other details. */ diff --git a/mobile/library/java/io/envoyproxy/envoymobile/engine/types/EnvoyHTTPFilter.java b/mobile/library/java/io/envoyproxy/envoymobile/engine/types/EnvoyHTTPFilter.java index 365cb8ec9ca3..67607dbef79e 100644 --- a/mobile/library/java/io/envoyproxy/envoymobile/engine/types/EnvoyHTTPFilter.java +++ b/mobile/library/java/io/envoyproxy/envoymobile/engine/types/EnvoyHTTPFilter.java @@ -20,6 +20,8 @@ Object[] onRequestHeaders(Map> headers, boolean endStream, * callback can be invoked multiple times. * * @param data, the buffer of the data received. + * The `data` will be destroyed upon completing this callback. Create a copy + * of the `data` if the `data` is going to outlive this callback lifetime. * @param endStream, whether the data is the last data frame. * @param streamIntel, contains internal HTTP stream metrics, context, and other details. */ @@ -47,7 +49,9 @@ Object[] onResponseHeaders(Map> headers, boolean endStream, * Called when a data frame is received on the HTTP stream. This * callback can be invoked multiple times. * - * @param data, the buffer of the data received. + * @param data, the buffer of the data received. The `data` will be destroyed upon + * completing this callback. Create a copy of the `data` if the `data` is + * going to outlive this callback. * @param endStream, whether the data is the last data frame. * @param streamIntel, contains internal HTTP stream metrics, context, and other details. */ @@ -69,10 +73,12 @@ Object[] onResponseHeaders(Map> headers, boolean endStream, void setRequestFilterCallbacks(EnvoyHTTPFilterCallbacks callbacks); /** - * Called when request filter iteration has been asynchronsouly resumed via callback. + * Called when request filter iteration has been asynchronously resumed via callback. * * @param headers, pending headers that have not yet been forwarded along the filter chain. * @param data, pending data that has not yet been forwarded along the filter chain. + * The `data` will be destroyed upon completing this callback. Create a copy + * of the `data` if the `data` is going to outlive this callback. * @param trailers, pending trailers that have not yet been forwarded along the filter chain. * @param streamIntel, contains internal HTTP stream metrics, context, and other details. */ @@ -88,10 +94,12 @@ Object[] onResumeRequest(Map> headers, ByteBuffer data, void setResponseFilterCallbacks(EnvoyHTTPFilterCallbacks callbacks); /** - * Called when response filter iteration has been asynchronsouly resumed via callback. + * Called when response filter iteration has been asynchronously resumed via callback. * * @param headers, pending headers that have not yet been forwarded along the filter chain. * @param data, pending data that has not yet been forwarded along the filter chain. + * The `data` will be destroyed upon completing this callback. Create a copy + * of the `data` if the `data` is going to outlive this callback. * @param trailers, pending trailers that have not yet been forwarded along the filter chain. * @param streamIntel, contains internal HTTP stream metrics, context, and other details. */ From 8f1a250c78efa8475afff988d378ee60ee619783 Mon Sep 17 00:00:00 2001 From: Fredy Wijaya Date: Thu, 7 Mar 2024 19:29:59 +0000 Subject: [PATCH 2/8] mobile: Remove Executor from the Java & Kotlin API Signed-off-by: Fredy Wijaya --- mobile/docs/root/api/grpc.rst | 4 +- .../engine/JvmCallbackContext.java | 51 ++----- .../envoymobile/engine/JvmFilterContext.java | 20 +-- .../engine/JvmFilterFactoryContext.java | 1 - .../engine/types/EnvoyHTTPCallbacks.java | 5 +- .../engine/types/EnvoyHTTPFilter.java | 2 +- .../net/impl/CronvoyBidirectionalStream.java | 13 -- .../chromium/net/impl/CronvoyUrlRequest.java | 40 ++---- .../envoyproxy/envoymobile/StreamCallbacks.kt | 11 +- .../envoyproxy/envoymobile/StreamPrototype.kt | 12 +- .../envoymobile/grpc/GRPCStreamPrototype.kt | 7 +- .../AndroidEngineSocketTagTest.java | 98 +++++++------ .../AndroidEnvoyExplicitFlowTest.java | 134 +++++++++--------- .../integration/AndroidEnvoyFlowTest.java | 2 +- .../engine/testing/QuicTestServerTest.java | 5 +- .../AndroidEnvoyExplicitH2FlowTest.java | 55 ++++--- .../net/testing/Http2TestServerTest.java | 7 +- .../test/kotlin/apps/baseline/MainActivity.kt | 3 +- .../kotlin/apps/experimental/MainActivity.kt | 3 +- .../integration/CancelGRPCStreamTest.kt | 3 +- .../kotlin/integration/CancelStreamTest.kt | 3 +- .../FilterThrowingExceptionTest.kt | 3 +- .../integration/StreamIdleTimeoutTest.kt | 3 +- ...oIntentPerformHTTPRequestUsingProxyTest.kt | 3 +- ...ntentPerformHTTPSRequestBadHostnameTest.kt | 3 +- ...tPerformHTTPSRequestUsingAsyncProxyTest.kt | 3 +- ...IntentPerformHTTPSRequestUsingProxyTest.kt | 3 +- ...oxyPollPerformHTTPRequestUsingProxyTest.kt | 3 +- ...formHTTPRequestWithoutUsingPACProxyTest.kt | 3 +- .../envoyproxy/envoymobile/GRPCStreamTest.kt | 23 ++- .../envoymobile/mocks/MockStreamPrototype.kt | 5 +- 31 files changed, 215 insertions(+), 316 deletions(-) diff --git a/mobile/docs/root/api/grpc.rst b/mobile/docs/root/api/grpc.rst index 0fd043b3832b..4e0ea72436d2 100644 --- a/mobile/docs/root/api/grpc.rst +++ b/mobile/docs/root/api/grpc.rst @@ -42,7 +42,7 @@ Start and interact with a gRPC stream in **Kotlin**:: } .setOnError { ... } .setOnCancel { ... } - .start(Executors.newSingleThreadExecutor()) + .start() .sendHeaders(headers, false) .sendMessage(...) ... @@ -180,7 +180,7 @@ stream. .newGRPCStreamPrototype() ... val stream = prototype - .start(Executors.newSingleThreadExecutor()) + .start() .sendHeaders(...) .sendMessage(...) diff --git a/mobile/library/java/io/envoyproxy/envoymobile/engine/JvmCallbackContext.java b/mobile/library/java/io/envoyproxy/envoymobile/engine/JvmCallbackContext.java index cdfafb01f1ae..265d4d194e54 100644 --- a/mobile/library/java/io/envoyproxy/envoymobile/engine/JvmCallbackContext.java +++ b/mobile/library/java/io/envoyproxy/envoymobile/engine/JvmCallbackContext.java @@ -37,10 +37,7 @@ void passHeader(byte[] key, byte[] value, boolean start) { public Object onResponseHeaders(long headerCount, boolean endStream, long[] streamIntel) { assert bridgeUtility.validateCount(headerCount); final Map> headers = bridgeUtility.retrieveHeaders(); - - callbacks.getExecutor().execute( - () -> callbacks.onHeaders(headers, endStream, new EnvoyStreamIntelImpl(streamIntel))); - + callbacks.onHeaders(headers, endStream, new EnvoyStreamIntelImpl(streamIntel)); return null; } @@ -54,10 +51,7 @@ public Object onResponseHeaders(long headerCount, boolean endStream, long[] stre public Object onResponseTrailers(long trailerCount, long[] streamIntel) { assert bridgeUtility.validateCount(trailerCount); final Map> trailers = bridgeUtility.retrieveHeaders(); - - callbacks.getExecutor().execute( - () -> callbacks.onTrailers(trailers, new EnvoyStreamIntelImpl(streamIntel))); - + callbacks.onTrailers(trailers, new EnvoyStreamIntelImpl(streamIntel)); return null; } @@ -70,11 +64,7 @@ public Object onResponseTrailers(long trailerCount, long[] streamIntel) { * @return Object, not used for response callbacks. */ public Object onResponseData(ByteBuffer data, boolean endStream, long[] streamIntel) { - // Create a copy of the `data` because the `data` uses direct `ByteBuffer` and the `data` will - // be destroyed after calling this callback. - ByteBuffer copiedData = ByteBuffers.copy(data); - callbacks.getExecutor().execute( - () -> callbacks.onData(copiedData, endStream, new EnvoyStreamIntelImpl(streamIntel))); + callbacks.onData(data, endStream, new EnvoyStreamIntelImpl(streamIntel)); return null; } @@ -91,13 +81,9 @@ public Object onResponseData(ByteBuffer data, boolean endStream, long[] streamIn */ public Object onError(int errorCode, byte[] message, int attemptCount, long[] streamIntel, long[] finalStreamIntel) { - callbacks.getExecutor().execute(() -> { - String errorMessage = new String(message); - callbacks.onError(errorCode, errorMessage, attemptCount, - new EnvoyStreamIntelImpl(streamIntel), - new EnvoyFinalStreamIntelImpl(finalStreamIntel)); - }); - + String errorMessage = new String(message); + callbacks.onError(errorCode, errorMessage, attemptCount, new EnvoyStreamIntelImpl(streamIntel), + new EnvoyFinalStreamIntelImpl(finalStreamIntel)); return null; } @@ -109,12 +95,9 @@ public Object onError(int errorCode, byte[] message, int attemptCount, long[] st * @return Object, not used for response callbacks. */ public Object onCancel(long[] streamIntel, long[] finalStreamIntel) { - callbacks.getExecutor().execute(() -> { - // This call is atomically gated at the call-site and will only happen once. - callbacks.onCancel(new EnvoyStreamIntelImpl(streamIntel), - new EnvoyFinalStreamIntelImpl(finalStreamIntel)); - }); - + // This call is atomically gated at the call-site and will only happen once. + callbacks.onCancel(new EnvoyStreamIntelImpl(streamIntel), + new EnvoyFinalStreamIntelImpl(finalStreamIntel)); return null; } @@ -125,11 +108,8 @@ public Object onCancel(long[] streamIntel, long[] finalStreamIntel) { * @return Object, not used for response callbacks. */ public Object onSendWindowAvailable(long[] streamIntel) { - callbacks.getExecutor().execute(() -> { - // This call is atomically gated at the call-site and will only happen once. - callbacks.onSendWindowAvailable(new EnvoyStreamIntelImpl(streamIntel)); - }); - + // This call is atomically gated at the call-site and will only happen once. + callbacks.onSendWindowAvailable(new EnvoyStreamIntelImpl(streamIntel)); return null; } /** @@ -140,12 +120,9 @@ public Object onSendWindowAvailable(long[] streamIntel) { * @return Object, not used for response callbacks. */ public Object onComplete(long[] streamIntel, long[] finalStreamIntel) { - callbacks.getExecutor().execute(() -> { - // This call is atomically gated at the call-site and will only happen once. - callbacks.onComplete(new EnvoyStreamIntelImpl(streamIntel), - new EnvoyFinalStreamIntelImpl(finalStreamIntel)); - }); - + // This call is atomically gated at the call-site and will only happen once. + callbacks.onComplete(new EnvoyStreamIntelImpl(streamIntel), + new EnvoyFinalStreamIntelImpl(finalStreamIntel)); return null; } } diff --git a/mobile/library/java/io/envoyproxy/envoymobile/engine/JvmFilterContext.java b/mobile/library/java/io/envoyproxy/envoymobile/engine/JvmFilterContext.java index 251de4cb070e..f9984fdca637 100644 --- a/mobile/library/java/io/envoyproxy/envoymobile/engine/JvmFilterContext.java +++ b/mobile/library/java/io/envoyproxy/envoymobile/engine/JvmFilterContext.java @@ -66,11 +66,8 @@ public Object onRequestHeaders(long headerCount, boolean endStream, long[] strea * @return Object[], pair of HTTP filter status and optional modified data. */ public Object onRequestData(ByteBuffer data, boolean endStream, long[] streamIntel) { - // Create a copy of the `data` because the `data` uses direct `ByteBuffer` and the `data` will - // be destroyed after calling this callback. - ByteBuffer copiedData = ByteBuffers.copy(data); return toJniFilterDataStatus( - filter.onRequestData(copiedData, endStream, new EnvoyStreamIntelImpl(streamIntel))); + filter.onRequestData(data, endStream, new EnvoyStreamIntelImpl(streamIntel))); } /** @@ -111,11 +108,8 @@ public Object onResponseHeaders(long headerCount, boolean endStream, long[] stre * @return Object[], pair of HTTP filter status and optional modified data. */ public Object onResponseData(ByteBuffer data, boolean endStream, long[] streamIntel) { - // Create a copy of the `data` because the `data` uses direct `ByteBuffer` and the `data` will - // be destroyed after calling this callback. - ByteBuffer copiedData = ByteBuffers.copy(data); return toJniFilterDataStatus( - filter.onResponseData(copiedData, endStream, new EnvoyStreamIntelImpl(streamIntel))); + filter.onResponseData(data, endStream, new EnvoyStreamIntelImpl(streamIntel))); } /** @@ -144,9 +138,6 @@ public Object onResponseTrailers(long trailerCount, long[] streamIntel) { */ public Object onResumeRequest(long headerCount, ByteBuffer data, long trailerCount, boolean endStream, long[] streamIntel) { - // Create a copy of the `data` because the `data` uses direct `ByteBuffer` and the `data` will - // be destroyed after calling this callback. - ByteBuffer copiedData = ByteBuffers.copy(data); // Headers are optional in this call, and a negative length indicates omission. Map> headers = null; if (headerCount >= 0) { @@ -159,7 +150,7 @@ public Object onResumeRequest(long headerCount, ByteBuffer data, long trailerCou assert trailerUtility.validateCount(trailerCount); trailers = trailerUtility.retrieveHeaders(); } - return toJniFilterResumeStatus(filter.onResumeRequest(headers, copiedData, trailers, endStream, + return toJniFilterResumeStatus(filter.onResumeRequest(headers, data, trailers, endStream, new EnvoyStreamIntelImpl(streamIntel))); } @@ -175,9 +166,6 @@ public Object onResumeRequest(long headerCount, ByteBuffer data, long trailerCou */ public Object onResumeResponse(long headerCount, ByteBuffer data, long trailerCount, boolean endStream, long[] streamIntel) { - // Create a copy of the `data` because the `data` uses direct `ByteBuffer` and the `data` will - // be destroyed after calling this callback. - ByteBuffer copiedData = ByteBuffers.copy(data); // Headers are optional in this call, and a negative length indicates omission. Map> headers = null; if (headerCount >= 0) { @@ -190,7 +178,7 @@ public Object onResumeResponse(long headerCount, ByteBuffer data, long trailerCo assert trailerUtility.validateCount(trailerCount); trailers = trailerUtility.retrieveHeaders(); } - return toJniFilterResumeStatus(filter.onResumeResponse(headers, copiedData, trailers, endStream, + return toJniFilterResumeStatus(filter.onResumeResponse(headers, data, trailers, endStream, new EnvoyStreamIntelImpl(streamIntel))); } diff --git a/mobile/library/java/io/envoyproxy/envoymobile/engine/JvmFilterFactoryContext.java b/mobile/library/java/io/envoyproxy/envoymobile/engine/JvmFilterFactoryContext.java index 03a600429b7d..6e81c1102ba3 100644 --- a/mobile/library/java/io/envoyproxy/envoymobile/engine/JvmFilterFactoryContext.java +++ b/mobile/library/java/io/envoyproxy/envoymobile/engine/JvmFilterFactoryContext.java @@ -1,6 +1,5 @@ package io.envoyproxy.envoymobile.engine; -import io.envoyproxy.envoymobile.engine.JvmFilterContext; import io.envoyproxy.envoymobile.engine.types.EnvoyHTTPFilterFactory; /** diff --git a/mobile/library/java/io/envoyproxy/envoymobile/engine/types/EnvoyHTTPCallbacks.java b/mobile/library/java/io/envoyproxy/envoymobile/engine/types/EnvoyHTTPCallbacks.java index f6ee438abf66..8e7295d18413 100644 --- a/mobile/library/java/io/envoyproxy/envoymobile/engine/types/EnvoyHTTPCallbacks.java +++ b/mobile/library/java/io/envoyproxy/envoymobile/engine/types/EnvoyHTTPCallbacks.java @@ -6,9 +6,6 @@ import java.util.Map; public interface EnvoyHTTPCallbacks { - - Executor getExecutor(); - /** * Called when all headers get received on the async HTTP stream. * @@ -25,7 +22,7 @@ void onHeaders(Map> headers, boolean endStream, * * @param data, the buffer of the data received. * The `data` will be destroyed upon completing this callback. Create a copy - * of the `data` if the `data` is going to outlive this callback lifetime. + * of the `data` if the `data` is going to outlive this callback. * @param endStream, whether the data is the last data frame. * @param streamIntel, contains internal HTTP stream metrics, context, and other details. */ diff --git a/mobile/library/java/io/envoyproxy/envoymobile/engine/types/EnvoyHTTPFilter.java b/mobile/library/java/io/envoyproxy/envoymobile/engine/types/EnvoyHTTPFilter.java index 67607dbef79e..01fc5f3ebe5c 100644 --- a/mobile/library/java/io/envoyproxy/envoymobile/engine/types/EnvoyHTTPFilter.java +++ b/mobile/library/java/io/envoyproxy/envoymobile/engine/types/EnvoyHTTPFilter.java @@ -21,7 +21,7 @@ Object[] onRequestHeaders(Map> headers, boolean endStream, * * @param data, the buffer of the data received. * The `data` will be destroyed upon completing this callback. Create a copy - * of the `data` if the `data` is going to outlive this callback lifetime. + * of the `data` if the `data` is going to outlive this callback. * @param endStream, whether the data is the last data frame. * @param streamIntel, contains internal HTTP stream metrics, context, and other details. */ diff --git a/mobile/library/java/org/chromium/net/impl/CronvoyBidirectionalStream.java b/mobile/library/java/org/chromium/net/impl/CronvoyBidirectionalStream.java index 19870e3b31e8..b2e07dc26a74 100644 --- a/mobile/library/java/org/chromium/net/impl/CronvoyBidirectionalStream.java +++ b/mobile/library/java/org/chromium/net/impl/CronvoyBidirectionalStream.java @@ -118,7 +118,6 @@ public final class CronvoyBidirectionalStream private static final String X_ENVOY = "x-envoy"; private static final String X_ENVOY_SELECTED_TRANSPORT = "x-envoy-upstream-alpn"; private static final String USER_AGENT = "User-Agent"; - private static final Executor DIRECT_EXECUTOR = new DirectExecutor(); private final CronvoyUrlRequestContext mRequestContext; private final Executor mExecutor; @@ -932,11 +931,6 @@ private static boolean isValidHeaderName(String header) { return headers; } - @Override - public Executor getExecutor() { - return DIRECT_EXECUTOR; - } - @Override public void onSendWindowAvailable(EnvoyStreamIntel streamIntel) { switch (mState.nextAction(Event.ON_SEND_WINDOW_AVAILABLE)) { @@ -1108,11 +1102,4 @@ private static class ReadBuffer { this.mInitialLimit = mByteBuffer.limit(); } } - - private static class DirectExecutor implements Executor { - @Override - public void execute(Runnable runnable) { - runnable.run(); - } - } } diff --git a/mobile/library/java/org/chromium/net/impl/CronvoyUrlRequest.java b/mobile/library/java/org/chromium/net/impl/CronvoyUrlRequest.java index a68b3ccd8ab3..27b28d98a022 100644 --- a/mobile/library/java/org/chromium/net/impl/CronvoyUrlRequest.java +++ b/mobile/library/java/org/chromium/net/impl/CronvoyUrlRequest.java @@ -103,7 +103,6 @@ public final class CronvoyUrlRequest extends CronvoyUrlRequestBase { private static final String TAG = CronvoyUrlRequest.class.getSimpleName(); private static final String USER_AGENT = "User-Agent"; private static final String CONTENT_TYPE = "Content-Type"; - private static final Executor DIRECT_EXECUTOR = new DirectExecutor(); private final String mUserAgent; private final HeadersList mRequestHeaders = new HeadersList(); @@ -766,23 +765,11 @@ private static int determineNextState(boolean endStream, @State int original, private static class HeadersList extends ArrayList> {} - private static class DirectExecutor implements Executor { - @Override - public void execute(Runnable runnable) { - runnable.run(); - } - } - private class CronvoyHttpCallbacks implements EnvoyHTTPCallbacks { private final AtomicInteger mCancelState = new AtomicInteger(CancelState.READY); private volatile boolean mEndStream = false; // Accessed by different Threads - @Override - public Executor getExecutor() { - return DIRECT_EXECUTOR; - } - @Override public void onHeaders(Map> headers, boolean endStream, EnvoyStreamIntel streamIntel) { @@ -880,22 +867,19 @@ public void onData(ByteBuffer data, boolean endStream, EnvoyStreamIntel streamIn return; } - Runnable task = new Runnable() { - @Override - public void run() { - checkCallingThread(); - try { - ByteBuffer userBuffer = mUserCurrentReadBuffer; - mUserCurrentReadBuffer = null; // Avoid the reference to a potentially large buffer. - int dataRead = data.remaining(); - userBuffer.put(data); // NPE ==> BUG, BufferOverflowException ==> User not behaving. - if (dataRead > 0 || !endStream) { - mWaitingOnRead.set(true); - mCallback.onReadCompleted(CronvoyUrlRequest.this, mUrlResponseInfo, userBuffer); - } - } catch (Throwable t) { - onCallbackException(t); + ByteBuffer userBuffer = mUserCurrentReadBuffer; + mUserCurrentReadBuffer = null; // Avoid the reference to a potentially large buffer. + int dataRead = data.remaining(); + userBuffer.put(data); // NPE ==> BUG, BufferOverflowException ==> User not behaving. + Runnable task = () -> { + checkCallingThread(); + try { + if (dataRead > 0 || !endStream) { + mWaitingOnRead.set(true); + mCallback.onReadCompleted(CronvoyUrlRequest.this, mUrlResponseInfo, userBuffer); } + } catch (Throwable t) { + onCallbackException(t); } }; execute(task); diff --git a/mobile/library/kotlin/io/envoyproxy/envoymobile/StreamCallbacks.kt b/mobile/library/kotlin/io/envoyproxy/envoymobile/StreamCallbacks.kt index e5668eb28b49..68f8b3daf124 100644 --- a/mobile/library/kotlin/io/envoyproxy/envoymobile/StreamCallbacks.kt +++ b/mobile/library/kotlin/io/envoyproxy/envoymobile/StreamCallbacks.kt @@ -4,7 +4,6 @@ import io.envoyproxy.envoymobile.engine.types.EnvoyFinalStreamIntel import io.envoyproxy.envoymobile.engine.types.EnvoyHTTPCallbacks import io.envoyproxy.envoymobile.engine.types.EnvoyStreamIntel import java.nio.ByteBuffer -import java.util.concurrent.Executor /** * A collection of platform-level callbacks that are specified by consumers who wish to interact @@ -28,14 +27,8 @@ internal class StreamCallbacks { * Class responsible for bridging between the platform-level `StreamCallbacks` and the engine's * `EnvoyHTTPCallbacks`. */ -internal class EnvoyHTTPCallbacksAdapter( - private val executor: Executor, - private val callbacks: StreamCallbacks -) : EnvoyHTTPCallbacks { - override fun getExecutor(): Executor { - return executor - } - +internal class EnvoyHTTPCallbacksAdapter(private val callbacks: StreamCallbacks) : + EnvoyHTTPCallbacks { override fun onHeaders( headers: Map>, endStream: Boolean, diff --git a/mobile/library/kotlin/io/envoyproxy/envoymobile/StreamPrototype.kt b/mobile/library/kotlin/io/envoyproxy/envoymobile/StreamPrototype.kt index 887c21cede41..770416181a74 100644 --- a/mobile/library/kotlin/io/envoyproxy/envoymobile/StreamPrototype.kt +++ b/mobile/library/kotlin/io/envoyproxy/envoymobile/StreamPrototype.kt @@ -2,8 +2,6 @@ package io.envoyproxy.envoymobile import io.envoyproxy.envoymobile.engine.EnvoyEngine import java.nio.ByteBuffer -import java.util.concurrent.Executor -import java.util.concurrent.Executors /** * A type representing a stream that has not yet been started. @@ -21,11 +19,10 @@ open class StreamPrototype(private val engine: EnvoyEngine) { /** * Start a new stream. * - * @param executor Executor on which to receive callback events. * @return The new stream. */ - open fun start(executor: Executor = Executors.newSingleThreadExecutor()): Stream { - val engineStream = engine.startStream(createCallbacks(executor), explicitFlowControl) + open fun start(): Stream { + val engineStream = engine.startStream(createCallbacks(), explicitFlowControl) return Stream(engineStream, useByteBufferPosition) } @@ -154,10 +151,9 @@ open class StreamPrototype(private val engine: EnvoyEngine) { /** * Create engine callbacks using the provided queue. * - * @param executor Executor on which to receive callback events. * @return A new set of engine callbacks. */ - internal fun createCallbacks(executor: Executor): EnvoyHTTPCallbacksAdapter { - return EnvoyHTTPCallbacksAdapter(executor, callbacks) + internal fun createCallbacks(): EnvoyHTTPCallbacksAdapter { + return EnvoyHTTPCallbacksAdapter(callbacks) } } diff --git a/mobile/library/kotlin/io/envoyproxy/envoymobile/grpc/GRPCStreamPrototype.kt b/mobile/library/kotlin/io/envoyproxy/envoymobile/grpc/GRPCStreamPrototype.kt index cb8f94390304..cb74c7b85347 100644 --- a/mobile/library/kotlin/io/envoyproxy/envoymobile/grpc/GRPCStreamPrototype.kt +++ b/mobile/library/kotlin/io/envoyproxy/envoymobile/grpc/GRPCStreamPrototype.kt @@ -3,8 +3,6 @@ package io.envoyproxy.envoymobile import java.io.ByteArrayOutputStream import java.nio.ByteBuffer import java.nio.ByteOrder -import java.util.concurrent.Executor -import java.util.concurrent.Executors /** * A type representing a gRPC stream that has not yet been started. @@ -16,11 +14,10 @@ class GRPCStreamPrototype(private val underlyingStream: StreamPrototype) { /** * Start a new gRPC stream. * - * @param executor Executor on which to receive callback events. * @return The new gRPC stream. */ - fun start(executor: Executor = Executors.newSingleThreadExecutor()): GRPCStream { - val stream = underlyingStream.start(executor) + fun start(): GRPCStream { + val stream = underlyingStream.start() return GRPCStream(stream) } diff --git a/mobile/test/java/integration/AndroidEngineSocketTagTest.java b/mobile/test/java/integration/AndroidEngineSocketTagTest.java index e5f7a2f8d3c9..eb5b76245c83 100644 --- a/mobile/test/java/integration/AndroidEngineSocketTagTest.java +++ b/mobile/test/java/integration/AndroidEngineSocketTagTest.java @@ -106,56 +106,54 @@ private Response sendRequest(RequestScenario requestScenario) throws Exception { final AtomicReference response = new AtomicReference<>(new Response()); final AtomicReference streamRef = new AtomicReference<>(); - Stream stream = - engine.streamClient() - .newStreamPrototype() - .setOnResponseHeaders((responseHeaders, endStream, streamIntel) -> { - response.get().setHeaders(responseHeaders); - if (requestScenario.cancelOnResponseHeaders) { - streamRef.get().cancel(); // Should be a noop when endStream == true - } else { - if (requestScenario.waitOnReadData) { - try { - Thread.sleep(100 + (int)(Math.random() * 50)); - } catch (InterruptedException e) { - // Don't care - } - } - streamRef.get().readData(requestScenario.responseBufferSize); - } - return null; - }) - .setOnResponseData((data, endStream, streamIntel) -> { - response.get().addBody(data); - if (!endStream) { - if (requestScenario.waitOnReadData) { - try { - Thread.sleep(100 + (int)(Math.random() * 50)); - } catch (InterruptedException e) { - // Don't care - } - } - streamRef.get().readData(requestScenario.responseBufferSize); - } - return null; - }) - .setOnError((error, finalStreamIntel) -> { - response.get().setEnvoyError(error); - latch.countDown(); - return null; - }) - .setOnCancel((finalStreamIntel) -> { - response.get().setCancelled(); - latch.countDown(); - return null; - }) - .setOnComplete((finalStreamIntel) -> { - latch.countDown(); - return null; - }) - .setExplicitFlowControl(true) - .start(requestScenario.useDirectExecutor ? Runnable::run - : Executors.newSingleThreadExecutor()); + Stream stream = engine.streamClient() + .newStreamPrototype() + .setOnResponseHeaders((responseHeaders, endStream, streamIntel) -> { + response.get().setHeaders(responseHeaders); + if (requestScenario.cancelOnResponseHeaders) { + streamRef.get().cancel(); // Should be a noop when endStream == true + } else { + if (requestScenario.waitOnReadData) { + try { + Thread.sleep(100 + (int)(Math.random() * 50)); + } catch (InterruptedException e) { + // Don't care + } + } + streamRef.get().readData(requestScenario.responseBufferSize); + } + return null; + }) + .setOnResponseData((data, endStream, streamIntel) -> { + response.get().addBody(data); + if (!endStream) { + if (requestScenario.waitOnReadData) { + try { + Thread.sleep(100 + (int)(Math.random() * 50)); + } catch (InterruptedException e) { + // Don't care + } + } + streamRef.get().readData(requestScenario.responseBufferSize); + } + return null; + }) + .setOnError((error, finalStreamIntel) -> { + response.get().setEnvoyError(error); + latch.countDown(); + return null; + }) + .setOnCancel((finalStreamIntel) -> { + response.get().setCancelled(); + latch.countDown(); + return null; + }) + .setOnComplete((finalStreamIntel) -> { + latch.countDown(); + return null; + }) + .setExplicitFlowControl(true) + .start(); streamRef.set(stream); // Set before sending headers to avoid race conditions. stream.sendHeaders(requestScenario.getHeaders(), !requestScenario.hasBody()); latch.await(); diff --git a/mobile/test/java/integration/AndroidEnvoyExplicitFlowTest.java b/mobile/test/java/integration/AndroidEnvoyExplicitFlowTest.java index fa0e0e8a29bb..6796a9b0afce 100644 --- a/mobile/test/java/integration/AndroidEnvoyExplicitFlowTest.java +++ b/mobile/test/java/integration/AndroidEnvoyExplicitFlowTest.java @@ -461,74 +461,72 @@ private Response sendRequest(RequestScenario requestScenario) throws Exception { final AtomicReference streamRef = new AtomicReference<>(); final Iterator chunkIterator = requestScenario.getBodyChunks().iterator(); - Stream stream = - engine.streamClient() - .newStreamPrototype() - .setOnSendWindowAvailable(ignored -> { - onSendWindowAvailable(requestScenario, streamRef.get(), chunkIterator, - response.get()); - return null; - }) - .setOnResponseHeaders((responseHeaders, endStream, streamIntel) -> { - response.get().setHeaders(responseHeaders); - response.get().addStreamIntel(streamIntel); - if (requestScenario.cancelOnResponseHeaders) { - streamRef.get().cancel(); // Should be a noop when endStream == true - } else { - if (requestScenario.waitOnReadData) { - try { - Thread.sleep(100 + (int)(Math.random() * 50)); - } catch (InterruptedException e) { - // Don't care - } - } - streamRef.get().readData(requestScenario.responseBufferSize); - } - return null; - }) - .setOnResponseData((data, endStream, streamIntel) -> { - response.get().addBody(data); - response.get().addStreamIntel(streamIntel); - if (!endStream) { - if (requestScenario.waitOnReadData) { - try { - Thread.sleep(100 + (int)(Math.random() * 50)); - } catch (InterruptedException e) { - // Don't care - } - } - streamRef.get().readData(requestScenario.responseBufferSize); - } - return null; - }) - .setOnResponseTrailers((trailers, streamIntel) -> { - response.get().setTrailers(trailers); - response.get().addStreamIntel(streamIntel); - return null; - }) - .setOnError((error, finalStreamIntel) -> { - response.get().setEnvoyError(error); - response.get().addStreamIntel(finalStreamIntel); - response.get().setFinalStreamIntel(finalStreamIntel); - latch.countDown(); - return null; - }) - .setOnCancel((finalStreamIntel) -> { - response.get().setCancelled(); - response.get().addStreamIntel(finalStreamIntel); - response.get().setFinalStreamIntel(finalStreamIntel); - latch.countDown(); - return null; - }) - .setOnComplete((finalStreamIntel) -> { - response.get().addStreamIntel(finalStreamIntel); - response.get().setFinalStreamIntel(finalStreamIntel); - latch.countDown(); - return null; - }) - .setExplicitFlowControl(true) - .start(requestScenario.useDirectExecutor ? Runnable::run - : Executors.newSingleThreadExecutor()); + Stream stream = engine.streamClient() + .newStreamPrototype() + .setOnSendWindowAvailable(ignored -> { + onSendWindowAvailable(requestScenario, streamRef.get(), chunkIterator, + response.get()); + return null; + }) + .setOnResponseHeaders((responseHeaders, endStream, streamIntel) -> { + response.get().setHeaders(responseHeaders); + response.get().addStreamIntel(streamIntel); + if (requestScenario.cancelOnResponseHeaders) { + streamRef.get().cancel(); // Should be a noop when endStream == true + } else { + if (requestScenario.waitOnReadData) { + try { + Thread.sleep(100 + (int)(Math.random() * 50)); + } catch (InterruptedException e) { + // Don't care + } + } + streamRef.get().readData(requestScenario.responseBufferSize); + } + return null; + }) + .setOnResponseData((data, endStream, streamIntel) -> { + response.get().addBody(data); + response.get().addStreamIntel(streamIntel); + if (!endStream) { + if (requestScenario.waitOnReadData) { + try { + Thread.sleep(100 + (int)(Math.random() * 50)); + } catch (InterruptedException e) { + // Don't care + } + } + streamRef.get().readData(requestScenario.responseBufferSize); + } + return null; + }) + .setOnResponseTrailers((trailers, streamIntel) -> { + response.get().setTrailers(trailers); + response.get().addStreamIntel(streamIntel); + return null; + }) + .setOnError((error, finalStreamIntel) -> { + response.get().setEnvoyError(error); + response.get().addStreamIntel(finalStreamIntel); + response.get().setFinalStreamIntel(finalStreamIntel); + latch.countDown(); + return null; + }) + .setOnCancel((finalStreamIntel) -> { + response.get().setCancelled(); + response.get().addStreamIntel(finalStreamIntel); + response.get().setFinalStreamIntel(finalStreamIntel); + latch.countDown(); + return null; + }) + .setOnComplete((finalStreamIntel) -> { + response.get().addStreamIntel(finalStreamIntel); + response.get().setFinalStreamIntel(finalStreamIntel); + latch.countDown(); + return null; + }) + .setExplicitFlowControl(true) + .start(); streamRef.set(stream); // Set before sending headers to avoid race conditions. stream.sendHeaders(requestScenario.getHeaders(), !requestScenario.hasBody()); if (requestScenario.hasBody()) { diff --git a/mobile/test/java/integration/AndroidEnvoyFlowTest.java b/mobile/test/java/integration/AndroidEnvoyFlowTest.java index fbdbcd446cbc..959f431cc02f 100644 --- a/mobile/test/java/integration/AndroidEnvoyFlowTest.java +++ b/mobile/test/java/integration/AndroidEnvoyFlowTest.java @@ -313,7 +313,7 @@ private Response sendRequest(RequestScenario requestScenario) throws Exception { latch.countDown(); return null; }) - .start(Executors.newSingleThreadExecutor()) + .start() .sendHeaders(requestScenario.getHeaders(), !requestScenario.hasBody()); if (requestScenario.cancelBeforeSendingRequestBody) { stream.cancel(); diff --git a/mobile/test/java/io/envoyproxy/envoymobile/engine/testing/QuicTestServerTest.java b/mobile/test/java/io/envoyproxy/envoymobile/engine/testing/QuicTestServerTest.java index 43c2e025e4c9..0260ee951653 100644 --- a/mobile/test/java/io/envoyproxy/envoymobile/engine/testing/QuicTestServerTest.java +++ b/mobile/test/java/io/envoyproxy/envoymobile/engine/testing/QuicTestServerTest.java @@ -16,6 +16,7 @@ import io.envoyproxy.envoymobile.ResponseTrailers; import io.envoyproxy.envoymobile.Stream; import io.envoyproxy.envoymobile.engine.AndroidJniLibrary; +import io.envoyproxy.envoymobile.engine.ByteBuffers; import io.envoyproxy.envoymobile.engine.JniLibrary; import io.envoyproxy.envoymobile.engine.testing.RequestScenario; import io.envoyproxy.envoymobile.engine.testing.Response; @@ -156,7 +157,7 @@ private Response sendRequest(RequestScenario requestScenario) throws Exception { return null; }) .setOnResponseData((data, endStream, ignored) -> { - response.get().addBody(data); + response.get().addBody(ByteBuffers.copy(data)); return null; }) .setOnResponseTrailers((trailers, ignored) -> { @@ -177,7 +178,7 @@ private Response sendRequest(RequestScenario requestScenario) throws Exception { latch.countDown(); return null; }) - .start(Executors.newSingleThreadExecutor()) + .start() .sendHeaders(requestScenario.getHeaders(), !requestScenario.hasBody()); requestScenario.getBodyChunks().forEach(stream::sendData); requestScenario.getClosingBodyChunk().ifPresent(stream::close); diff --git a/mobile/test/java/org/chromium/net/testing/AndroidEnvoyExplicitH2FlowTest.java b/mobile/test/java/org/chromium/net/testing/AndroidEnvoyExplicitH2FlowTest.java index ad2f2447f81d..17c72993509e 100644 --- a/mobile/test/java/org/chromium/net/testing/AndroidEnvoyExplicitH2FlowTest.java +++ b/mobile/test/java/org/chromium/net/testing/AndroidEnvoyExplicitH2FlowTest.java @@ -73,34 +73,33 @@ public void continuousWrite_withCancelOnResponseHeaders() throws Exception { // Loop 100,000 times which should be long enough to wait for the server's // response headers to arrive. final int numWrites = 100000; - stream.set( - engine.streamClient() - .newStreamPrototype() - .setExplicitFlowControl(true) - .setOnSendWindowAvailable((streamIntel -> { - ByteBuffer bf = ByteBuffer.allocateDirect(1); - bf.put((byte)'a'); - if (bufferSent.incrementAndGet() == numWrites) { - stream.get().close(bf); - } else { - stream.get().sendData(bf); - } - return null; - })) - .setOnResponseHeaders((responseHeaders, endStream, ignored) -> { - // This was getting executed, even in the initial test, but only - // after all the data was sent. With the fix, this should happen - // before all the data is sent which is checked in the assert - // below. - stream.get().cancel(); - return null; - }) - .setOnCancel((ignored) -> { - latch.countDown(); - return null; - }) - .start(Runnable::run) // direct executor - all the logic runs on the EM Network Thread. - .sendHeaders(requestHeaders, false)); + stream.set(engine.streamClient() + .newStreamPrototype() + .setExplicitFlowControl(true) + .setOnSendWindowAvailable((streamIntel -> { + ByteBuffer bf = ByteBuffer.allocateDirect(1); + bf.put((byte)'a'); + if (bufferSent.incrementAndGet() == numWrites) { + stream.get().close(bf); + } else { + stream.get().sendData(bf); + } + return null; + })) + .setOnResponseHeaders((responseHeaders, endStream, ignored) -> { + // This was getting executed, even in the initial test, but only + // after all the data was sent. With the fix, this should happen + // before all the data is sent which is checked in the assert + // below. + stream.get().cancel(); + return null; + }) + .setOnCancel((ignored) -> { + latch.countDown(); + return null; + }) + .start() // direct executor - all the logic runs on the EM Network Thread. + .sendHeaders(requestHeaders, false)); ByteBuffer bf = ByteBuffer.allocateDirect(1); bf.put((byte)'a'); stream.get().sendData(bf); diff --git a/mobile/test/java/org/chromium/net/testing/Http2TestServerTest.java b/mobile/test/java/org/chromium/net/testing/Http2TestServerTest.java index 82391ddc6210..930cdde6c567 100644 --- a/mobile/test/java/org/chromium/net/testing/Http2TestServerTest.java +++ b/mobile/test/java/org/chromium/net/testing/Http2TestServerTest.java @@ -25,7 +25,6 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import org.junit.After; @@ -126,7 +125,7 @@ public void testGetRequestWithPlatformCertValidatorFail() throws Exception { return null; }) .setOnCancel((ignored) -> { throw new AssertionError("Unexpected OnCancel called."); }) - .start(Executors.newSingleThreadExecutor()) + .start() .sendHeaders(requestScenario.getHeaders(), false); latch.await(); @@ -163,7 +162,7 @@ public void testSubjectAltNameErrorWithPlatformCertValidator() throws Exception return null; }) .setOnCancel((ignored) -> { throw new AssertionError("Unexpected OnCancel called."); }) - .start(Executors.newSingleThreadExecutor()) + .start() .sendHeaders(requestScenario.getHeaders(), false); latch.await(); @@ -214,7 +213,7 @@ private Response sendRequest(RequestScenario requestScenario) throws Exception { latch.countDown(); return null; }) - .start(Executors.newSingleThreadExecutor()) + .start() .sendHeaders(requestScenario.getHeaders(), /* hasRequestBody= */ false); latch.await(); diff --git a/mobile/test/kotlin/apps/baseline/MainActivity.kt b/mobile/test/kotlin/apps/baseline/MainActivity.kt index 2a1037ec1844..ed7c929614d2 100644 --- a/mobile/test/kotlin/apps/baseline/MainActivity.kt +++ b/mobile/test/kotlin/apps/baseline/MainActivity.kt @@ -18,7 +18,6 @@ import io.envoyproxy.envoymobile.shared.Failure import io.envoyproxy.envoymobile.shared.ResponseRecyclerViewAdapter import io.envoyproxy.envoymobile.shared.Success import java.io.IOException -import java.util.concurrent.Executors import java.util.concurrent.TimeUnit private const val REQUEST_HANDLER_THREAD_NAME = "hello_envoy_kt" @@ -140,7 +139,7 @@ class MainActivity : Activity() { Log.d("MainActivity", message) recyclerView.post { viewAdapter.add(Failure(message)) } } - .start(Executors.newSingleThreadExecutor()) + .start() .sendHeaders(requestHeaders, true) } diff --git a/mobile/test/kotlin/apps/experimental/MainActivity.kt b/mobile/test/kotlin/apps/experimental/MainActivity.kt index dd936f0843db..ee3eb37cb2e4 100644 --- a/mobile/test/kotlin/apps/experimental/MainActivity.kt +++ b/mobile/test/kotlin/apps/experimental/MainActivity.kt @@ -20,7 +20,6 @@ import io.envoyproxy.envoymobile.shared.Failure import io.envoyproxy.envoymobile.shared.ResponseRecyclerViewAdapter import io.envoyproxy.envoymobile.shared.Success import java.io.IOException -import java.util.concurrent.Executors import java.util.concurrent.TimeUnit private const val REQUEST_HANDLER_THREAD_NAME = "hello_envoy_kt" @@ -153,7 +152,7 @@ class MainActivity : Activity() { Log.d("MainActivity", message) recyclerView.post { viewAdapter.add(Failure(message)) } } - .start(Executors.newSingleThreadExecutor()) + .start() .sendHeaders(requestHeaders, true) } diff --git a/mobile/test/kotlin/integration/CancelGRPCStreamTest.kt b/mobile/test/kotlin/integration/CancelGRPCStreamTest.kt index 35615791e2c6..7dee38eb75ec 100644 --- a/mobile/test/kotlin/integration/CancelGRPCStreamTest.kt +++ b/mobile/test/kotlin/integration/CancelGRPCStreamTest.kt @@ -17,7 +17,6 @@ import io.envoyproxy.envoymobile.StreamIntel import io.envoyproxy.envoymobile.engine.JniLibrary import java.nio.ByteBuffer import java.util.concurrent.CountDownLatch -import java.util.concurrent.Executors import java.util.concurrent.TimeUnit import org.junit.Test @@ -92,7 +91,7 @@ class CancelGRPCStreamTest { client .newGRPCStreamPrototype() .setOnCancel { _ -> onCancelCallbackExpectation.countDown() } - .start(Executors.newSingleThreadExecutor()) + .start() .sendHeaders(requestHeaders, false) .cancel() diff --git a/mobile/test/kotlin/integration/CancelStreamTest.kt b/mobile/test/kotlin/integration/CancelStreamTest.kt index 24a1b5ec6995..ec9bb82149b2 100644 --- a/mobile/test/kotlin/integration/CancelStreamTest.kt +++ b/mobile/test/kotlin/integration/CancelStreamTest.kt @@ -17,7 +17,6 @@ import io.envoyproxy.envoymobile.StreamIntel import io.envoyproxy.envoymobile.engine.JniLibrary import java.nio.ByteBuffer import java.util.concurrent.CountDownLatch -import java.util.concurrent.Executors import java.util.concurrent.TimeUnit import org.junit.Test @@ -91,7 +90,7 @@ class CancelStreamTest { client .newStreamPrototype() .setOnCancel { _ -> runExpectation.countDown() } - .start(Executors.newSingleThreadExecutor()) + .start() .sendHeaders(requestHeaders, false) .cancel() diff --git a/mobile/test/kotlin/integration/FilterThrowingExceptionTest.kt b/mobile/test/kotlin/integration/FilterThrowingExceptionTest.kt index 8456fdafeb4e..a141d0addcf4 100644 --- a/mobile/test/kotlin/integration/FilterThrowingExceptionTest.kt +++ b/mobile/test/kotlin/integration/FilterThrowingExceptionTest.kt @@ -22,7 +22,6 @@ import io.envoyproxy.envoymobile.StreamIntel import io.envoyproxy.envoymobile.engine.JniLibrary import java.nio.ByteBuffer import java.util.concurrent.CountDownLatch -import java.util.concurrent.Executors import java.util.concurrent.TimeUnit import org.junit.Test import org.junit.runner.RunWith @@ -134,7 +133,7 @@ class FilterThrowingExceptionTest { assertThat(status).isEqualTo(200) onRespondeHeadersLatch.countDown() } - .start(Executors.newSingleThreadExecutor()) + .start() .sendHeaders(requestHeaders, true) onRespondeHeadersLatch.await(15, TimeUnit.SECONDS) diff --git a/mobile/test/kotlin/integration/StreamIdleTimeoutTest.kt b/mobile/test/kotlin/integration/StreamIdleTimeoutTest.kt index 865da5f197ca..44a8cff79f46 100644 --- a/mobile/test/kotlin/integration/StreamIdleTimeoutTest.kt +++ b/mobile/test/kotlin/integration/StreamIdleTimeoutTest.kt @@ -17,7 +17,6 @@ import io.envoyproxy.envoymobile.StreamIntel import io.envoyproxy.envoymobile.engine.JniLibrary import java.nio.ByteBuffer import java.util.concurrent.CountDownLatch -import java.util.concurrent.Executors import java.util.concurrent.TimeUnit import org.junit.Assert.fail import org.junit.Test @@ -100,7 +99,7 @@ class StreamIdleTimeoutTest { assertThat(error.errorCode).isEqualTo(4) callbackExpectation.countDown() } - .start(Executors.newSingleThreadExecutor()) + .start() .sendHeaders(requestHeaders, false) filterExpectation.await(10, TimeUnit.SECONDS) diff --git a/mobile/test/kotlin/integration/proxying/ProxyInfoIntentPerformHTTPRequestUsingProxyTest.kt b/mobile/test/kotlin/integration/proxying/ProxyInfoIntentPerformHTTPRequestUsingProxyTest.kt index 05dcb60e665f..4b1a6676e282 100644 --- a/mobile/test/kotlin/integration/proxying/ProxyInfoIntentPerformHTTPRequestUsingProxyTest.kt +++ b/mobile/test/kotlin/integration/proxying/ProxyInfoIntentPerformHTTPRequestUsingProxyTest.kt @@ -14,7 +14,6 @@ import io.envoyproxy.envoymobile.RequestMethod import io.envoyproxy.envoymobile.engine.JniLibrary import io.envoyproxy.envoymobile.engine.testing.TestJni import java.util.concurrent.CountDownLatch -import java.util.concurrent.Executors import java.util.concurrent.TimeUnit import org.junit.Test import org.junit.runner.RunWith @@ -86,7 +85,7 @@ class PerformHTTPRequestUsingProxy { assertThat(responseHeaders.value("x-proxy-response")).isEqualTo(listOf("true")) onRespondeHeadersLatch.countDown() } - .start(Executors.newSingleThreadExecutor()) + .start() .sendHeaders(requestHeaders, true) onRespondeHeadersLatch.await(15, TimeUnit.SECONDS) diff --git a/mobile/test/kotlin/integration/proxying/ProxyInfoIntentPerformHTTPSRequestBadHostnameTest.kt b/mobile/test/kotlin/integration/proxying/ProxyInfoIntentPerformHTTPSRequestBadHostnameTest.kt index 7182356cebe6..8e7e09a1dbd5 100644 --- a/mobile/test/kotlin/integration/proxying/ProxyInfoIntentPerformHTTPSRequestBadHostnameTest.kt +++ b/mobile/test/kotlin/integration/proxying/ProxyInfoIntentPerformHTTPSRequestBadHostnameTest.kt @@ -14,7 +14,6 @@ import io.envoyproxy.envoymobile.RequestMethod import io.envoyproxy.envoymobile.engine.JniLibrary import io.envoyproxy.envoymobile.engine.testing.TestJni import java.util.concurrent.CountDownLatch -import java.util.concurrent.Executors import java.util.concurrent.TimeUnit import org.junit.Test import org.junit.runner.RunWith @@ -81,7 +80,7 @@ class PerformHTTPSRequestBadHostname { .streamClient() .newStreamPrototype() .setOnError { _, _ -> onErrorLatch.countDown() } - .start(Executors.newSingleThreadExecutor()) + .start() .sendHeaders(requestHeaders, true) onErrorLatch.await(15, TimeUnit.SECONDS) diff --git a/mobile/test/kotlin/integration/proxying/ProxyInfoIntentPerformHTTPSRequestUsingAsyncProxyTest.kt b/mobile/test/kotlin/integration/proxying/ProxyInfoIntentPerformHTTPSRequestUsingAsyncProxyTest.kt index 672879b70d54..af67a23482bc 100644 --- a/mobile/test/kotlin/integration/proxying/ProxyInfoIntentPerformHTTPSRequestUsingAsyncProxyTest.kt +++ b/mobile/test/kotlin/integration/proxying/ProxyInfoIntentPerformHTTPSRequestUsingAsyncProxyTest.kt @@ -14,7 +14,6 @@ import io.envoyproxy.envoymobile.RequestMethod import io.envoyproxy.envoymobile.engine.JniLibrary import io.envoyproxy.envoymobile.engine.testing.TestJni import java.util.concurrent.CountDownLatch -import java.util.concurrent.Executors import java.util.concurrent.TimeUnit import org.junit.Test import org.junit.runner.RunWith @@ -86,7 +85,7 @@ class PerformHTTPSRequestUsingAsyncProxyTest { assertThat(responseHeaders.value("x-response-header-that-should-be-stripped")).isNull() onRespondeHeadersLatch.countDown() } - .start(Executors.newSingleThreadExecutor()) + .start() .sendHeaders(requestHeaders, true) onRespondeHeadersLatch.await(15, TimeUnit.SECONDS) diff --git a/mobile/test/kotlin/integration/proxying/ProxyInfoIntentPerformHTTPSRequestUsingProxyTest.kt b/mobile/test/kotlin/integration/proxying/ProxyInfoIntentPerformHTTPSRequestUsingProxyTest.kt index 626a4ee49864..2e03d6ea5faa 100644 --- a/mobile/test/kotlin/integration/proxying/ProxyInfoIntentPerformHTTPSRequestUsingProxyTest.kt +++ b/mobile/test/kotlin/integration/proxying/ProxyInfoIntentPerformHTTPSRequestUsingProxyTest.kt @@ -14,7 +14,6 @@ import io.envoyproxy.envoymobile.RequestMethod import io.envoyproxy.envoymobile.engine.JniLibrary import io.envoyproxy.envoymobile.engine.testing.TestJni import java.util.concurrent.CountDownLatch -import java.util.concurrent.Executors import java.util.concurrent.TimeUnit import org.junit.Test import org.junit.runner.RunWith @@ -86,7 +85,7 @@ class PerformHTTPSRequestUsingProxy { assertThat(responseHeaders.value("x-response-header-that-should-be-stripped")).isNull() onRespondeHeadersLatch.countDown() } - .start(Executors.newSingleThreadExecutor()) + .start() .sendHeaders(requestHeaders, true) onRespondeHeadersLatch.await(15, TimeUnit.SECONDS) diff --git a/mobile/test/kotlin/integration/proxying/ProxyPollPerformHTTPRequestUsingProxyTest.kt b/mobile/test/kotlin/integration/proxying/ProxyPollPerformHTTPRequestUsingProxyTest.kt index 08504ef1d13f..5e7b926537f6 100644 --- a/mobile/test/kotlin/integration/proxying/ProxyPollPerformHTTPRequestUsingProxyTest.kt +++ b/mobile/test/kotlin/integration/proxying/ProxyPollPerformHTTPRequestUsingProxyTest.kt @@ -13,7 +13,6 @@ import io.envoyproxy.envoymobile.engine.AndroidJniLibrary import io.envoyproxy.envoymobile.engine.JniLibrary import io.envoyproxy.envoymobile.engine.testing.TestJni import java.util.concurrent.CountDownLatch -import java.util.concurrent.Executors import java.util.concurrent.TimeUnit import org.junit.Test import org.junit.runner.RunWith @@ -84,7 +83,7 @@ class PerformHTTPRequestUsingProxy { assertThat(responseHeaders.value("x-proxy-response")).isEqualTo(listOf("true")) onRespondeHeadersLatch.countDown() } - .start(Executors.newSingleThreadExecutor()) + .start() .sendHeaders(requestHeaders, true) onRespondeHeadersLatch.await(15, TimeUnit.SECONDS) diff --git a/mobile/test/kotlin/integration/proxying/ProxyPollPerformHTTPRequestWithoutUsingPACProxyTest.kt b/mobile/test/kotlin/integration/proxying/ProxyPollPerformHTTPRequestWithoutUsingPACProxyTest.kt index efeb9e30f11f..c1565032d56b 100644 --- a/mobile/test/kotlin/integration/proxying/ProxyPollPerformHTTPRequestWithoutUsingPACProxyTest.kt +++ b/mobile/test/kotlin/integration/proxying/ProxyPollPerformHTTPRequestWithoutUsingPACProxyTest.kt @@ -13,7 +13,6 @@ import io.envoyproxy.envoymobile.RequestMethod import io.envoyproxy.envoymobile.engine.JniLibrary import io.envoyproxy.envoymobile.engine.testing.TestJni import java.util.concurrent.CountDownLatch -import java.util.concurrent.Executors import java.util.concurrent.TimeUnit import org.junit.Test import org.junit.runner.RunWith @@ -84,7 +83,7 @@ class PerformHTTPRequestUsingProxy { assertThat(responseHeaders.value("x-proxy-response")).isNull() onRespondeHeadersLatch.countDown() } - .start(Executors.newSingleThreadExecutor()) + .start() .sendHeaders(requestHeaders, true) onRespondeHeadersLatch.await(15, TimeUnit.SECONDS) diff --git a/mobile/test/kotlin/io/envoyproxy/envoymobile/GRPCStreamTest.kt b/mobile/test/kotlin/io/envoyproxy/envoymobile/GRPCStreamTest.kt index f7de3354d45c..4f5b937b90ac 100644 --- a/mobile/test/kotlin/io/envoyproxy/envoymobile/GRPCStreamTest.kt +++ b/mobile/test/kotlin/io/envoyproxy/envoymobile/GRPCStreamTest.kt @@ -7,7 +7,6 @@ import java.io.ByteArrayOutputStream import java.nio.ByteBuffer import java.nio.ByteOrder import java.util.concurrent.CountDownLatch -import java.util.concurrent.Executor import java.util.concurrent.TimeUnit import org.junit.Test @@ -23,7 +22,7 @@ class GRPCStreamTest { stream.onRequestData = { data, _ -> sentData.write(data.array()) } } - GRPCClient(streamClient).newGRPCStreamPrototype().start(Executor {}).sendMessage(message1) + GRPCClient(streamClient).newGRPCStreamPrototype().start().sendMessage(message1) assertThat(sentData.size()).isEqualTo(5 + message1.array().count()) } @@ -35,7 +34,7 @@ class GRPCStreamTest { stream.onRequestData = { data, _ -> sentData.write(data.array()) } } - GRPCClient(streamClient).newGRPCStreamPrototype().start(Executor {}).sendMessage(message1) + GRPCClient(streamClient).newGRPCStreamPrototype().start().sendMessage(message1) assertThat(sentData.toByteArray()[0]).isEqualTo(0) } @@ -47,7 +46,7 @@ class GRPCStreamTest { stream.onRequestData = { data, _ -> sentData.write(data.array()) } } - GRPCClient(streamClient).newGRPCStreamPrototype().start(Executor {}).sendMessage(message1) + GRPCClient(streamClient).newGRPCStreamPrototype().start().sendMessage(message1) val size = ByteBuffer.wrap(sentData.toByteArray().sliceArray(1 until 5)).order(ByteOrder.BIG_ENDIAN).int @@ -61,7 +60,7 @@ class GRPCStreamTest { stream.onRequestData = { data, _ -> sentData.write(data.array()) } } - GRPCClient(streamClient).newGRPCStreamPrototype().start(Executor {}).sendMessage(message1) + GRPCClient(streamClient).newGRPCStreamPrototype().start().sendMessage(message1) assertThat(sentData.toByteArray().sliceArray(5 until sentData.size())) .isEqualTo(message1.array()) @@ -74,7 +73,7 @@ class GRPCStreamTest { stream.onCancel = { countDownLatch.countDown() } } - GRPCClient(streamClient).newGRPCStreamPrototype().start(Executor {}).cancel() + GRPCClient(streamClient).newGRPCStreamPrototype().start().cancel() assertThat(countDownLatch.await(2000, TimeUnit.MILLISECONDS)).isTrue() } @@ -96,7 +95,7 @@ class GRPCStreamTest { assertThat(endStream).isTrue() countDownLatch.countDown() } - .start(Executor {}) + .start() stream?.receiveHeaders(expectedHeaders, true) countDownLatch.await() @@ -117,7 +116,7 @@ class GRPCStreamTest { .isEqualTo(expectedTrailers.caseSensitiveHeaders()) countDownLatch.countDown() } - .start(Executor {}) + .start() stream?.receiveTrailers(expectedTrailers) countDownLatch.await() @@ -135,7 +134,7 @@ class GRPCStreamTest { assertThat(message.array()).isEqualTo(message1.array()) countDownLatch.countDown() } - .start(Executor {}) + .start() val messageLength = message1.array().count() val data = ByteBuffer.allocate(5 + messageLength) @@ -189,7 +188,7 @@ class GRPCStreamTest { } countDownLatch.countDown() } - .start(Executor {}) + .start() stream?.receiveData(firstMessageBuffer, false) stream?.receiveData(secondMessageBufferPart1, false) @@ -209,7 +208,7 @@ class GRPCStreamTest { assertThat(message.array()).hasLength(0) countDownLatch.countDown() } - .start(Executor {}) + .start() val emptyMessage = ByteBuffer.wrap( @@ -265,7 +264,7 @@ class GRPCStreamTest { } countDownLatch.countDown() } - .start(Executor {}) + .start() stream?.receiveData(emptyMessageBuffer, false) stream?.receiveData(secondMessageBuffer, false) diff --git a/mobile/test/kotlin/io/envoyproxy/envoymobile/mocks/MockStreamPrototype.kt b/mobile/test/kotlin/io/envoyproxy/envoymobile/mocks/MockStreamPrototype.kt index 0eb1ee86be5f..d869ffca8b20 100644 --- a/mobile/test/kotlin/io/envoyproxy/envoymobile/mocks/MockStreamPrototype.kt +++ b/mobile/test/kotlin/io/envoyproxy/envoymobile/mocks/MockStreamPrototype.kt @@ -2,7 +2,6 @@ package io.envoyproxy.envoymobile.mocks import io.envoyproxy.envoymobile.Stream import io.envoyproxy.envoymobile.StreamPrototype -import java.util.concurrent.Executor /** * Mock implementation of `StreamPrototype` which is used to produce `MockStream` instances. @@ -11,8 +10,8 @@ import java.util.concurrent.Executor */ class MockStreamPrototype(private val onStart: ((stream: MockStream) -> Unit)?) : StreamPrototype(MockEnvoyEngine()) { - override fun start(executor: Executor): Stream { - val callbacks = createCallbacks(executor) + override fun start(): Stream { + val callbacks = createCallbacks() val stream = MockStream(MockEnvoyHTTPStream(callbacks, false)) onStart?.invoke(stream) return stream From 0c430d8a5d77ded14d75d3d68d3ab4e723bc148c Mon Sep 17 00:00:00 2001 From: Fredy Wijaya Date: Thu, 7 Mar 2024 19:46:55 +0000 Subject: [PATCH 3/8] Fix tests Signed-off-by: Fredy Wijaya --- mobile/test/java/integration/AndroidEngineSocketTagTest.java | 3 ++- mobile/test/kotlin/integration/ReceiveDataTest.kt | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/mobile/test/java/integration/AndroidEngineSocketTagTest.java b/mobile/test/java/integration/AndroidEngineSocketTagTest.java index eb5b76245c83..01817fb8dcf4 100644 --- a/mobile/test/java/integration/AndroidEngineSocketTagTest.java +++ b/mobile/test/java/integration/AndroidEngineSocketTagTest.java @@ -17,6 +17,7 @@ import io.envoyproxy.envoymobile.Stream; import io.envoyproxy.envoymobile.StreamIntel; import io.envoyproxy.envoymobile.engine.AndroidJniLibrary; +import io.envoyproxy.envoymobile.engine.ByteBuffers; import io.envoyproxy.envoymobile.engine.testing.RequestScenario; import io.envoyproxy.envoymobile.engine.testing.Response; import java.net.MalformedURLException; @@ -125,7 +126,7 @@ private Response sendRequest(RequestScenario requestScenario) throws Exception { return null; }) .setOnResponseData((data, endStream, streamIntel) -> { - response.get().addBody(data); + response.get().addBody(ByteBuffers.copy(data)); if (!endStream) { if (requestScenario.waitOnReadData) { try { diff --git a/mobile/test/kotlin/integration/ReceiveDataTest.kt b/mobile/test/kotlin/integration/ReceiveDataTest.kt index 1fcaad60f166..14eb9e11fcb4 100644 --- a/mobile/test/kotlin/integration/ReceiveDataTest.kt +++ b/mobile/test/kotlin/integration/ReceiveDataTest.kt @@ -5,6 +5,7 @@ import io.envoyproxy.envoymobile.EngineBuilder import io.envoyproxy.envoymobile.RequestHeadersBuilder import io.envoyproxy.envoymobile.RequestMethod import io.envoyproxy.envoymobile.Standard +import io.envoyproxy.envoymobile.engine.ByteBuffers import io.envoyproxy.envoymobile.engine.JniLibrary import java.nio.ByteBuffer import java.util.concurrent.CountDownLatch @@ -51,7 +52,7 @@ class ReceiveDataTest { headersExpectation.countDown() } .setOnResponseData { data, _, _ -> - body = data + body = ByteBuffers.copy(data) dataExpectation.countDown() } .setOnError { _, _ -> fail("Unexpected error") } From d7cad588ceeda8b9af2483ea87e2179632709b38 Mon Sep 17 00:00:00 2001 From: Fredy Wijaya Date: Thu, 7 Mar 2024 19:50:32 +0000 Subject: [PATCH 4/8] Add comment Signed-off-by: Fredy Wijaya --- .../library/java/org/chromium/net/impl/CronvoyUrlRequest.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/mobile/library/java/org/chromium/net/impl/CronvoyUrlRequest.java b/mobile/library/java/org/chromium/net/impl/CronvoyUrlRequest.java index 27b28d98a022..4ee34ddd76dc 100644 --- a/mobile/library/java/org/chromium/net/impl/CronvoyUrlRequest.java +++ b/mobile/library/java/org/chromium/net/impl/CronvoyUrlRequest.java @@ -870,6 +870,8 @@ public void onData(ByteBuffer data, boolean endStream, EnvoyStreamIntel streamIn ByteBuffer userBuffer = mUserCurrentReadBuffer; mUserCurrentReadBuffer = null; // Avoid the reference to a potentially large buffer. int dataRead = data.remaining(); + // Copy the `data` outside the thread before passing into the thread because the `data` + // will be destroyed upon completing this callback. userBuffer.put(data); // NPE ==> BUG, BufferOverflowException ==> User not behaving. Runnable task = () -> { checkCallingThread(); From 305ac95d89afd559c868f8894f1c22a681f123b6 Mon Sep 17 00:00:00 2001 From: Fredy Wijaya Date: Thu, 7 Mar 2024 20:00:41 +0000 Subject: [PATCH 5/8] Fix more tests Signed-off-by: Fredy Wijaya --- .../test/java/integration/AndroidEnvoyExplicitFlowTest.java | 3 ++- mobile/test/java/integration/AndroidEnvoyFlowTest.java | 5 ++++- .../java/org/chromium/net/testing/Http2TestServerTest.java | 4 +++- 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/mobile/test/java/integration/AndroidEnvoyExplicitFlowTest.java b/mobile/test/java/integration/AndroidEnvoyExplicitFlowTest.java index 6796a9b0afce..32aec6e50caf 100644 --- a/mobile/test/java/integration/AndroidEnvoyExplicitFlowTest.java +++ b/mobile/test/java/integration/AndroidEnvoyExplicitFlowTest.java @@ -17,6 +17,7 @@ import io.envoyproxy.envoymobile.Stream; import io.envoyproxy.envoymobile.StreamIntel; import io.envoyproxy.envoymobile.engine.AndroidJniLibrary; +import io.envoyproxy.envoymobile.engine.ByteBuffers; import io.envoyproxy.envoymobile.engine.testing.RequestScenario; import io.envoyproxy.envoymobile.engine.testing.Response; import java.net.MalformedURLException; @@ -486,7 +487,7 @@ private Response sendRequest(RequestScenario requestScenario) throws Exception { return null; }) .setOnResponseData((data, endStream, streamIntel) -> { - response.get().addBody(data); + response.get().addBody(ByteBuffers.copy(data)); response.get().addStreamIntel(streamIntel); if (!endStream) { if (requestScenario.waitOnReadData) { diff --git a/mobile/test/java/integration/AndroidEnvoyFlowTest.java b/mobile/test/java/integration/AndroidEnvoyFlowTest.java index 959f431cc02f..ed3263cf50f5 100644 --- a/mobile/test/java/integration/AndroidEnvoyFlowTest.java +++ b/mobile/test/java/integration/AndroidEnvoyFlowTest.java @@ -14,6 +14,7 @@ import io.envoyproxy.envoymobile.ResponseTrailers; import io.envoyproxy.envoymobile.Stream; import io.envoyproxy.envoymobile.engine.AndroidJniLibrary; +import io.envoyproxy.envoymobile.engine.ByteBuffers; import io.envoyproxy.envoymobile.engine.testing.RequestScenario; import io.envoyproxy.envoymobile.engine.testing.Response; import java.io.IOException; @@ -31,6 +32,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; + +import io.netty.buffer.ByteBuf; import okhttp3.mockwebserver.Dispatcher; import okhttp3.mockwebserver.MockResponse; import okhttp3.mockwebserver.MockWebServer; @@ -292,7 +295,7 @@ private Response sendRequest(RequestScenario requestScenario) throws Exception { return null; }) .setOnResponseData((data, endStream, ignored) -> { - response.get().addBody(data); + response.get().addBody(ByteBuffers.copy(data)); if (endStream) { latch.countDown(); } diff --git a/mobile/test/java/org/chromium/net/testing/Http2TestServerTest.java b/mobile/test/java/org/chromium/net/testing/Http2TestServerTest.java index 930cdde6c567..dbd53365c914 100644 --- a/mobile/test/java/org/chromium/net/testing/Http2TestServerTest.java +++ b/mobile/test/java/org/chromium/net/testing/Http2TestServerTest.java @@ -8,6 +8,8 @@ import static org.junit.Assert.assertNotNull; import android.content.Context; import androidx.test.core.app.ApplicationProvider; + +import io.envoyproxy.envoymobile.engine.ByteBuffers; import io.envoyproxy.envoymobile.utilities.AndroidNetworkLibrary; import io.envoyproxy.envoymobile.AndroidEngineBuilder; import io.envoyproxy.envoymobile.Engine; @@ -192,7 +194,7 @@ private Response sendRequest(RequestScenario requestScenario) throws Exception { return null; }) .setOnResponseData((data, endStream, ignored) -> { - response.get().addBody(data); + response.get().addBody(ByteBuffers.copy(data)); return null; }) .setOnResponseTrailers((trailers, ignored) -> { From 5efe35fd8500d50d88217525938de5a36dd3e989 Mon Sep 17 00:00:00 2001 From: Fredy Wijaya Date: Thu, 7 Mar 2024 20:08:43 +0000 Subject: [PATCH 6/8] Remove unused import Signed-off-by: Fredy Wijaya --- mobile/test/java/integration/AndroidEnvoyFlowTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/mobile/test/java/integration/AndroidEnvoyFlowTest.java b/mobile/test/java/integration/AndroidEnvoyFlowTest.java index ed3263cf50f5..855b91012262 100644 --- a/mobile/test/java/integration/AndroidEnvoyFlowTest.java +++ b/mobile/test/java/integration/AndroidEnvoyFlowTest.java @@ -33,7 +33,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; -import io.netty.buffer.ByteBuf; import okhttp3.mockwebserver.Dispatcher; import okhttp3.mockwebserver.MockResponse; import okhttp3.mockwebserver.MockWebServer; From a2e6eb09f1369459425d950d2288373d2911143a Mon Sep 17 00:00:00 2001 From: Fredy Wijaya Date: Thu, 7 Mar 2024 20:38:22 +0000 Subject: [PATCH 7/8] Fix more stuff Signed-off-by: Fredy Wijaya --- mobile/docs/root/api/http.rst | 6 +++--- mobile/examples/java/hello_world/MainActivity.java | 3 +-- mobile/examples/kotlin/hello_world/MainActivity.kt | 3 +-- 3 files changed, 5 insertions(+), 7 deletions(-) diff --git a/mobile/docs/root/api/http.rst b/mobile/docs/root/api/http.rst index c8dce9ee4673..f745474e779b 100644 --- a/mobile/docs/root/api/http.rst +++ b/mobile/docs/root/api/http.rst @@ -33,7 +33,7 @@ Start and interact with an HTTP stream in **Kotlin**:: } .setOnError { ... } .setOnCancel { ... } - .start(Executors.newSingleThreadExecutor()) + .start() .sendHeaders(...) .sendData(...) @@ -166,7 +166,7 @@ Doing so returns a ``Stream`` which allows the sender to interact with the strea .newStreamPrototype() ... val stream = prototype - .start(Executors.newSingleThreadExecutor()) + .start() .sendHeaders(...) .sendData(...) @@ -215,7 +215,7 @@ For example: .build() val stream = streamClient .newStreamPrototype() - .start(Executors.newSingleThreadExecutor()) + .start() // Headers-only stream.sendHeaders(requestHeaders, true) diff --git a/mobile/examples/java/hello_world/MainActivity.java b/mobile/examples/java/hello_world/MainActivity.java index 53d36ac6f573..4f73219542c9 100644 --- a/mobile/examples/java/hello_world/MainActivity.java +++ b/mobile/examples/java/hello_world/MainActivity.java @@ -22,7 +22,6 @@ import io.envoyproxy.envoymobile.shared.Success; import kotlin.Unit; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.Arrays; import java.util.HashSet; @@ -131,7 +130,7 @@ private void makeRequest() { recyclerView.post(() -> viewAdapter.add(new Failure(message))); return Unit.INSTANCE; }) - .start(Executors.newSingleThreadExecutor()) + .start() .sendHeaders(requestHeaders, true); clear_text = !clear_text; diff --git a/mobile/examples/kotlin/hello_world/MainActivity.kt b/mobile/examples/kotlin/hello_world/MainActivity.kt index 1a59ccf6916f..f120ca1a61ab 100644 --- a/mobile/examples/kotlin/hello_world/MainActivity.kt +++ b/mobile/examples/kotlin/hello_world/MainActivity.kt @@ -18,7 +18,6 @@ import io.envoyproxy.envoymobile.shared.Failure import io.envoyproxy.envoymobile.shared.ResponseRecyclerViewAdapter import io.envoyproxy.envoymobile.shared.Success import java.io.IOException -import java.util.concurrent.Executors import java.util.concurrent.TimeUnit private const val REQUEST_HANDLER_THREAD_NAME = "hello_envoy_kt" @@ -141,7 +140,7 @@ class MainActivity : Activity() { Log.d("MainActivity", message) recyclerView.post { viewAdapter.add(Failure(message)) } } - .start(Executors.newSingleThreadExecutor()) + .start() .sendHeaders(requestHeaders, true) } From 18b9a44a9d97947210e62b9979bed203712031d1 Mon Sep 17 00:00:00 2001 From: Fredy Wijaya Date: Thu, 7 Mar 2024 22:05:21 +0000 Subject: [PATCH 8/8] Fix flaky test Signed-off-by: Fredy Wijaya --- .../AndroidEnvoyExplicitFlowTest.java | 28 ++++++++++--------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/mobile/test/java/integration/AndroidEnvoyExplicitFlowTest.java b/mobile/test/java/integration/AndroidEnvoyExplicitFlowTest.java index 32aec6e50caf..9c7705a23e11 100644 --- a/mobile/test/java/integration/AndroidEnvoyExplicitFlowTest.java +++ b/mobile/test/java/integration/AndroidEnvoyExplicitFlowTest.java @@ -264,8 +264,7 @@ public void get_withThrottledBodyResponse_bufferLargerThanResponseBody() throws assertThat(response.getBodyAsString()).isEqualTo("hello, world"); assertThat(response.getEnvoyError()).isNull(); // A "terminating" empty buffer is systematically sent through the setOnResponseData callback. - // See: https://github.com/envoyproxy/envoy-mobile/issues/1393 - assertThat(response.getNbResponseChunks()).isEqualTo(4); // 5 bytes, 5 bytes, 2, and 0 bytes + assertThat(response.getNbResponseChunks()).isEqualTo(3); // 5 bytes, 5 bytes, 2 } @Test @@ -286,8 +285,7 @@ public void get_withThrottledBodyResponse_bufferSmallerThanResponseBody() throws assertThat(response.getBodyAsString()).isEqualTo("hello, world"); assertThat(response.getEnvoyError()).isNull(); // A "terminating" empty buffer is systematically sent through the setOnResponseData callback. - // See: https://github.com/envoyproxy/envoy-mobile/issues/1393 - assertThat(response.getNbResponseChunks()).isEqualTo(6); // 3&2 bytes, 3&2 bytes, 2, and 0 bytes + assertThat(response.getNbResponseChunks()).isEqualTo(5); // 3&2 bytes, 3&2 bytes, 2 } @Test @@ -487,17 +485,21 @@ private Response sendRequest(RequestScenario requestScenario) throws Exception { return null; }) .setOnResponseData((data, endStream, streamIntel) -> { - response.get().addBody(ByteBuffers.copy(data)); - response.get().addStreamIntel(streamIntel); - if (!endStream) { - if (requestScenario.waitOnReadData) { - try { - Thread.sleep(100 + (int)(Math.random() * 50)); - } catch (InterruptedException e) { - // Don't care + // Ignore when the data is empty. + // See: https://github.com/envoyproxy/envoy-mobile/issues/1393 + if (data.hasRemaining()) { + response.get().addBody(ByteBuffers.copy(data)); + response.get().addStreamIntel(streamIntel); + if (!endStream) { + if (requestScenario.waitOnReadData) { + try { + Thread.sleep(100 + (int)(Math.random() * 50)); + } catch (InterruptedException e) { + // Don't care + } } + streamRef.get().readData(requestScenario.responseBufferSize); } - streamRef.get().readData(requestScenario.responseBufferSize); } return null; })