From 11c5a3dc36fc9acf0f7eadcc5994b1587f6fdc88 Mon Sep 17 00:00:00 2001 From: Thomas Kountis Date: Thu, 1 Feb 2024 14:20:29 -0800 Subject: [PATCH] LoadBalancer wire the tracker on the host layers with the request flow (#2816) Motivation In order for the new load balancer features to work, the health indicators need to be wired as request trackers on the request flow. Modification - Added additional configuration options in the DefaultHttpLoadBalancerFactory to support error classifications. Feature only useful to load-balancers that support it. - When the Context is available and a request-tracker is present in it, the request flow is now enriched with additional logic to track the state and feed it to the request-tracker. Co-authored-by: Bryce Anderson Co-authored-by: Idel Pivnitskiy --- servicetalk-http-netty/build.gradle | 1 + .../netty/DefaultHttpLoadBalancerFactory.java | 268 +++++++++++++++++- .../servicetalk/loadbalancer/DefaultHost.java | 106 +++---- .../servicetalk/loadbalancer/ErrorClass.java | 5 + .../io/servicetalk/loadbalancer/Host.java | 1 - .../RootSwayingLeafRequestTracker.java | 55 ---- 6 files changed, 325 insertions(+), 111 deletions(-) delete mode 100644 servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/RootSwayingLeafRequestTracker.java diff --git a/servicetalk-http-netty/build.gradle b/servicetalk-http-netty/build.gradle index fa0ff23065..f4c77fb034 100644 --- a/servicetalk-http-netty/build.gradle +++ b/servicetalk-http-netty/build.gradle @@ -33,6 +33,7 @@ dependencies { implementation project(":servicetalk-dns-discovery-netty") implementation project(":servicetalk-http-utils") implementation project(":servicetalk-loadbalancer") + implementation project(":servicetalk-loadbalancer-experimental") implementation project(":servicetalk-logging-slf4j-internal") implementation project(":servicetalk-tcp-netty-internal") implementation project(":servicetalk-transport-netty") diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/DefaultHttpLoadBalancerFactory.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/DefaultHttpLoadBalancerFactory.java index 15679f3cef..3a9db2b576 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/DefaultHttpLoadBalancerFactory.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/DefaultHttpLoadBalancerFactory.java @@ -18,11 +18,14 @@ import io.servicetalk.client.api.ConnectionFactory; import io.servicetalk.client.api.LoadBalancer; import io.servicetalk.client.api.LoadBalancerFactory; +import io.servicetalk.client.api.ReservableRequestConcurrencyController; import io.servicetalk.client.api.ScoreSupplier; import io.servicetalk.client.api.ServiceDiscovererEvent; import io.servicetalk.concurrent.api.Completable; import io.servicetalk.concurrent.api.Publisher; import io.servicetalk.concurrent.api.Single; +import io.servicetalk.concurrent.api.TerminalSignalConsumer; +import io.servicetalk.context.api.ContextMap; import io.servicetalk.http.api.FilterableStreamingHttpConnection; import io.servicetalk.http.api.FilterableStreamingHttpLoadBalancedConnection; import io.servicetalk.http.api.HttpConnectionContext; @@ -31,14 +34,37 @@ import io.servicetalk.http.api.HttpExecutionStrategy; import io.servicetalk.http.api.HttpLoadBalancerFactory; import io.servicetalk.http.api.HttpRequestMethod; +import io.servicetalk.http.api.HttpResponseMetaData; +import io.servicetalk.http.api.ReservedBlockingHttpConnection; +import io.servicetalk.http.api.ReservedBlockingStreamingHttpConnection; +import io.servicetalk.http.api.ReservedHttpConnection; import io.servicetalk.http.api.StreamingHttpRequest; import io.servicetalk.http.api.StreamingHttpResponse; import io.servicetalk.http.api.StreamingHttpResponseFactory; +import io.servicetalk.http.utils.BeforeFinallyHttpOperator; +import io.servicetalk.loadbalancer.ErrorClass; +import io.servicetalk.loadbalancer.RequestTracker; import io.servicetalk.loadbalancer.RoundRobinLoadBalancers; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.ConnectException; import java.util.Collection; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.function.Function; +import javax.annotation.Nullable; +import static io.servicetalk.http.api.HttpApiConversions.toReservedBlockingConnection; +import static io.servicetalk.http.api.HttpApiConversions.toReservedBlockingStreamingConnection; +import static io.servicetalk.http.api.HttpApiConversions.toReservedConnection; +import static io.servicetalk.http.api.HttpResponseStatus.StatusClass.SERVER_ERROR_5XX; +import static io.servicetalk.http.api.HttpResponseStatus.TOO_MANY_REQUESTS; +import static io.servicetalk.loadbalancer.ErrorClass.LOCAL_ORIGIN_CONNECT_FAILED; +import static io.servicetalk.loadbalancer.ErrorClass.LOCAL_ORIGIN_REQUEST_FAILED; +import static io.servicetalk.loadbalancer.RequestTracker.REQUEST_TRACKER_KEY; import static java.util.Objects.requireNonNull; +import static java.util.concurrent.atomic.AtomicIntegerFieldUpdater.newUpdater; /** * Default implementation of {@link HttpLoadBalancerFactory}. @@ -47,13 +73,20 @@ */ public final class DefaultHttpLoadBalancerFactory implements HttpLoadBalancerFactory { + private static final Logger LOGGER = LoggerFactory.getLogger(DefaultHttpLoadBalancerFactory.class); private final LoadBalancerFactory rawFactory; + private final Function errorClassFunction; + private final Function peerResponseErrorClassifier; private final HttpExecutionStrategy strategy; DefaultHttpLoadBalancerFactory( final LoadBalancerFactory rawFactory, + final Function errorClassFunction, + final Function peerResponseErrorClassifier, final HttpExecutionStrategy strategy) { this.rawFactory = rawFactory; + this.errorClassFunction = errorClassFunction; + this.peerResponseErrorClassifier = peerResponseErrorClassifier; this.strategy = strategy; } @@ -80,6 +113,36 @@ public FilterableStreamingHttpLoadBalancedConnection toLoadBalancedConnection( return new DefaultFilterableStreamingHttpLoadBalancedConnection(connection); } + @Override + public FilterableStreamingHttpLoadBalancedConnection toLoadBalancedConnection( + final FilterableStreamingHttpConnection connection, + final ReservableRequestConcurrencyController concurrencyController, + @Nullable final ContextMap context) { + + RequestTracker hostHealthIndicator = null; + if (context == null) { + LOGGER.debug("Context is null. In order for " + DefaultHttpLoadBalancerFactory.class.getSimpleName() + + ":toLoadBalancedConnection to get access to the " + RequestTracker.class.getSimpleName() + + ", health-monitor of this connection, the context must not be null."); + } else { + hostHealthIndicator = context.get(REQUEST_TRACKER_KEY); + if (hostHealthIndicator == null) { + LOGGER.debug(REQUEST_TRACKER_KEY.name() + " is not set in context. " + + "In order for " + DefaultHttpLoadBalancerFactory.class.getSimpleName() + + ":toLoadBalancedConnection to get access to the " + RequestTracker.class.getSimpleName() + + ", health-monitor of this connection, the context must be properly wired."); + } + } + + if (hostHealthIndicator == null) { + return new HttpLoadBalancerFactory.DefaultFilterableStreamingHttpLoadBalancedConnection(connection, + concurrencyController); + } + + return new DefaultHttpLoadBalancedConnection(connection, concurrencyController, + errorClassFunction, peerResponseErrorClassifier, hostHealthIndicator); + } + @Override public HttpExecutionStrategy requiredOffloads() { return strategy; @@ -94,6 +157,11 @@ public HttpExecutionStrategy requiredOffloads() { public static final class Builder { private final LoadBalancerFactory rawFactory; private final HttpExecutionStrategy strategy; + private final Function errorClassifier = t -> t instanceof ConnectException ? + LOCAL_ORIGIN_CONNECT_FAILED : LOCAL_ORIGIN_REQUEST_FAILED; + private final Function peerResponseErrorClassifier = resp -> + (resp.status().statusClass() == SERVER_ERROR_5XX || TOO_MANY_REQUESTS.equals(resp.status())) ? + ErrorClass.EXT_ORIGIN_REQUEST_FAILED : null; private Builder( final LoadBalancerFactory rawFactory, @@ -108,7 +176,8 @@ private Builder( * @return A {@link DefaultHttpLoadBalancerFactory}. */ public DefaultHttpLoadBalancerFactory build() { - return new DefaultHttpLoadBalancerFactory<>(rawFactory, strategy); + return new DefaultHttpLoadBalancerFactory<>(rawFactory, errorClassifier, peerResponseErrorClassifier, + strategy); } /** @@ -153,10 +222,10 @@ private static final class DefaultFilterableStreamingHttpLoadBalancedConnection @Override public int score() { throw new UnsupportedOperationException( - DefaultFilterableStreamingHttpLoadBalancedConnection.class.getName() + - " doesn't support scoring. " + ScoreSupplier.class.getName() + - " is only available through " + HttpLoadBalancerFactory.class.getSimpleName() + - " implementations that support scoring."); + DefaultFilterableStreamingHttpLoadBalancedConnection.class.getName() + + " doesn't support scoring. " + ScoreSupplier.class.getName() + + " is only available through " + HttpLoadBalancerFactory.class.getSimpleName() + + " implementations that support scoring."); } @Override @@ -214,4 +283,193 @@ public String toString() { return delegate.toString(); } } + + private static final class DefaultHttpLoadBalancedConnection + implements FilterableStreamingHttpLoadBalancedConnection { + private final FilterableStreamingHttpConnection delegate; + private final ReservableRequestConcurrencyController concurrencyController; + private final Function errorClassFunction; + private final Function peerResponseErrorClassifier; + @Nullable + private final RequestTracker tracker; + + DefaultHttpLoadBalancedConnection(final FilterableStreamingHttpConnection delegate, + final ReservableRequestConcurrencyController concurrencyController, + final Function errorClassFunction, + final Function peerResponseErrorClassifier, + @Nullable final RequestTracker tracker) { + this.delegate = delegate; + this.concurrencyController = concurrencyController; + this.errorClassFunction = errorClassFunction; + this.peerResponseErrorClassifier = peerResponseErrorClassifier; + this.tracker = tracker; + } + + @Override + public int score() { + return 1; + } + + @Override + public ReservedHttpConnection asConnection() { + return toReservedConnection(this, executionContext().executionStrategy()); + } + + @Override + public ReservedBlockingStreamingHttpConnection asBlockingStreamingConnection() { + return toReservedBlockingStreamingConnection(this, executionContext().executionStrategy()); + } + + @Override + public ReservedBlockingHttpConnection asBlockingConnection() { + return toReservedBlockingConnection(this, executionContext().executionStrategy()); + } + + @Override + public Completable releaseAsync() { + return concurrencyController.releaseAsync(); + } + + @Override + public Completable closeAsyncGracefully() { + return delegate.closeAsyncGracefully(); + } + + @Override + public Result tryRequest() { + return concurrencyController.tryRequest(); + } + + @Override + public boolean tryReserve() { + return concurrencyController.tryReserve(); + } + + @Override + public void requestFinished() { + concurrencyController.requestFinished(); + } + + @Override + public HttpConnectionContext connectionContext() { + return delegate.connectionContext(); + } + + @Override + public Publisher transportEventStream(final HttpEventKey eventKey) { + return delegate.transportEventStream(eventKey); + } + + @Override + public Single request(final StreamingHttpRequest request) { + if (tracker == null) { + return delegate.request(request).shareContextOnSubscribe(); + } + + return Single.defer(() -> { + final RequestTracker theTracker = new AtMostOnceDeliveryRequestTracker(tracker); + final long startTime = theTracker.beforeStart(); + + return delegate.request(request) + .liftSync(new BeforeFinallyHttpOperator(new TerminalSignalConsumer() { + @Override + public void onComplete() { + theTracker.onSuccess(startTime); + } + + @Override + public void onError(final Throwable throwable) { + theTracker.onError(startTime, errorClassFunction.apply(throwable)); + } + + @Override + public void cancel() { + theTracker.onError(startTime, ErrorClass.CANCELLED); + } + }, /*discardEventsAfterCancel*/ true)) + + // BeforeFinallyHttpOperator conditionally outputs a Single with a failed + // Publisher instead of the real Publisher in case a cancel signal is observed + // before completion of Meta. It also transforms the original Publisher to discard + // signals after cancel. So in order for downstream operators to get a consistent view of the + // data path map() needs to be applied last. + .map(response -> { + final ErrorClass eClass = peerResponseErrorClassifier.apply(response); + if (eClass != null) { + // The onError is triggered before the body is actually consumed. + theTracker.onError(startTime, eClass); + } + return response; + }) + .shareContextOnSubscribe(); + }); + } + + @Override + public HttpExecutionContext executionContext() { + return delegate.executionContext(); + } + + @Override + public StreamingHttpResponseFactory httpResponseFactory() { + return delegate.httpResponseFactory(); + } + + @Override + public Completable onClose() { + return delegate.onClose(); + } + + @Override + public Completable onClosing() { + return delegate.onClosing(); + } + + @Override + public Completable closeAsync() { + return delegate.closeAsync(); + } + + @Override + public StreamingHttpRequest newRequest(final HttpRequestMethod method, final String requestTarget) { + return delegate.newRequest(method, requestTarget); + } + + @Override + public String toString() { + return delegate.toString(); + } + + private static final class AtMostOnceDeliveryRequestTracker implements RequestTracker { + + private static final AtomicIntegerFieldUpdater doneUpdater = + newUpdater(AtMostOnceDeliveryRequestTracker.class, "done"); + + private final RequestTracker original; + private volatile int done; + + private AtMostOnceDeliveryRequestTracker(final RequestTracker original) { + this.original = original; + } + + @Override + public long beforeStart() { + return original.beforeStart(); + } + + @Override + public void onSuccess(final long beforeStartTimeNs) { + if (doneUpdater.compareAndSet(this, 0, 1)) { + original.onSuccess(beforeStartTimeNs); + } + } + + @Override + public void onError(final long beforeStartTimeNs, final ErrorClass errorClass) { + if (doneUpdater.compareAndSet(this, 0, 1)) { + original.onError(beforeStartTimeNs, errorClass); + } + } + } + } } diff --git a/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/DefaultHost.java b/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/DefaultHost.java index fba6e09197..e17ad79edf 100644 --- a/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/DefaultHost.java +++ b/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/DefaultHost.java @@ -218,59 +218,65 @@ public boolean markExpired() { @Override public Single newConnection( - Predicate selector, final boolean forceNewConnectionAndReserve, @Nullable ContextMap context) { - // We need to put our address latency tracker in the context for consumption. - if (healthIndicator != null) { - if (context == null) { - context = new DefaultContextMap(); + Predicate selector, final boolean forceNewConnectionAndReserve, final @Nullable ContextMap context) { + return Single.defer(() -> { + ContextMap actualContext = context; + if (actualContext == null) { + actualContext = new DefaultContextMap(); } - context.put(REQUEST_TRACKER_KEY, healthIndicator); - } - // This LB implementation does not automatically provide TransportObserver. Therefore, we pass "null" here. - // Users can apply a ConnectionFactoryFilter if they need to override this "null" value with TransportObserver. - Single establishConnection = connectionFactory.newConnection(address, context, null); - if (healthCheckConfig != null) { - // Schedule health check before returning - establishConnection = establishConnection.beforeOnError(this::markUnhealthy); - } - return establishConnection - .flatMap(newCnx -> { - if (forceNewConnectionAndReserve && !newCnx.tryReserve()) { - return newCnx.closeAsync().concat(failed( - Exceptions.StacklessConnectionRejectedException.newInstance( + + // We need to put our address latency tracker in the context for consumption. + if (healthIndicator != null) { + actualContext.put(REQUEST_TRACKER_KEY, healthIndicator); + } + // This LB implementation does not automatically provide TransportObserver. Therefore, we pass "null" here. + // Users can apply a ConnectionFactoryFilter if they need to override this "null" value with + // TransportObserver. + Single establishConnection = connectionFactory.newConnection(address, actualContext, null); + if (healthCheckConfig != null) { + // Schedule health check before returning + establishConnection = establishConnection.beforeOnError(this::markUnhealthy); + } + return establishConnection + .flatMap(newCnx -> { + if (forceNewConnectionAndReserve && !newCnx.tryReserve()) { + return newCnx.closeAsync().concat(failed( + Exceptions.StacklessConnectionRejectedException.newInstance( + "Newly created connection " + newCnx + " for " + lbDescription + + " could not be reserved.", + DefaultHost.class, "newConnection(...)"))) + .shareContextOnSubscribe(); + } + + // Invoke the selector before adding the connection to the pool, otherwise, connection can be + // used concurrently and hence a new connection can be rejected by the selector. + if (!selector.test(newCnx)) { + // Failure in selection could be the result of connection factory returning cached + // connection, and not having visibility into max-concurrent-requests, or other threads + // already selected the + // connection which uses all the max concurrent request count. + + // If there is caching Propagate the exception and rely upon retry strategy. + Single failedSingle = failed(Exceptions.StacklessConnectionRejectedException.newInstance( "Newly created connection " + newCnx + " for " + lbDescription - + " could not be reserved.", - RoundRobinLoadBalancer.class, "selectConnection0(...)"))) - .shareContextOnSubscribe(); - } + + " was rejected by the selection filter.", + DefaultHost.class, "newConnection(...)")); - // Invoke the selector before adding the connection to the pool, otherwise, connection can be - // used concurrently and hence a new connection can be rejected by the selector. - if (!selector.test(newCnx)) { - // Failure in selection could be the result of connection factory returning cached connection, - // and not having visibility into max-concurrent-requests, or other threads already selected the - // connection which uses all the max concurrent request count. - - // If there is caching Propagate the exception and rely upon retry strategy. - Single failedSingle = failed(Exceptions.StacklessConnectionRejectedException.newInstance( - "Newly created connection " + newCnx + " for " + lbDescription - + " was rejected by the selection filter.", - RoundRobinLoadBalancer.class, "selectConnection0(...)")); - - // Just in case the connection is not closed add it to the host so we don't lose track, - // duplicates will be filtered out. - return (addConnection(newCnx, null) ? - failedSingle : newCnx.closeAsync().concat(failedSingle)).shareContextOnSubscribe(); - } - if (addConnection(newCnx, null)) { - return succeeded(newCnx).shareContextOnSubscribe(); - } - return newCnx.closeAsync().concat( - failed(Exceptions.StacklessConnectionRejectedException.newInstance( - "Failed to add newly created connection " + newCnx + " for " + this, - RoundRobinLoadBalancer.class, "selectConnection0(...)"))) - .shareContextOnSubscribe(); - }); + // Just in case the connection is not closed add it to the host so we don't lose track, + // duplicates will be filtered out. + return (addConnection(newCnx, null) ? + failedSingle : newCnx.closeAsync().concat(failedSingle)).shareContextOnSubscribe(); + } + if (addConnection(newCnx, null)) { + return succeeded(newCnx).shareContextOnSubscribe(); + } + return newCnx.closeAsync().concat( + failed(Exceptions.StacklessConnectionRejectedException.newInstance( + "Failed to add newly created connection " + newCnx + " for " + this, + DefaultHost.class, "newConnection(...)"))) + .shareContextOnSubscribe(); + }).shareContextOnSubscribe(); + }); } private void markHealthy(final HealthCheck originalHealthCheckState) { diff --git a/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/ErrorClass.java b/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/ErrorClass.java index 61b595d910..e579c2e10a 100644 --- a/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/ErrorClass.java +++ b/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/ErrorClass.java @@ -29,6 +29,11 @@ public enum ErrorClass { */ LOCAL_ORIGIN_CONNECT_FAILED(true), + /** + * Failures caused locally, these would be things that failed due to an exception locally. + */ + LOCAL_ORIGIN_REQUEST_FAILED(true), + /** * Failures related to locally enforced timeouts waiting for responses from the peer. */ diff --git a/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/Host.java b/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/Host.java index 9f8a2fad99..a94c1f3c1b 100644 --- a/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/Host.java +++ b/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/Host.java @@ -30,7 +30,6 @@ * @param the concrete type of returned connections. */ interface Host extends ListenableAsyncCloseable, ScoreSupplier { - /** * Select an existing connection from the host. * @return the selected host, or null if a suitable host couldn't be found. diff --git a/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/RootSwayingLeafRequestTracker.java b/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/RootSwayingLeafRequestTracker.java deleted file mode 100644 index bff0c79941..0000000000 --- a/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/RootSwayingLeafRequestTracker.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Copyright © 2022 Apple Inc. and the ServiceTalk project authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.servicetalk.loadbalancer; - -import static java.util.Objects.requireNonNull; - -/** - * A two-level request tracker, namely root and leaf. - * Each tracking interaction influences both levels, but reporting operations will only - * consult the leaf. - */ -final class RootSwayingLeafRequestTracker implements RequestTracker { - - private final RequestTracker root; - private final RequestTracker leaf; - RootSwayingLeafRequestTracker(final RequestTracker root, final RequestTracker leaf) { - this.root = requireNonNull(root); - this.leaf = requireNonNull(leaf); - } - - @Override - public long beforeStart() { - // Tracks both levels - final long timestamp = root.beforeStart(); - leaf.beforeStart(); - return timestamp; - } - - @Override - public void onSuccess(final long beforeStartTimeNs) { - // Tracks both levels - root.onSuccess(beforeStartTimeNs); - leaf.onSuccess(beforeStartTimeNs); - } - - @Override - public void onError(final long beforeStartTimeNs, ErrorClass errorClass) { - // Tracks both levels - root.onError(beforeStartTimeNs, errorClass); - leaf.onError(beforeStartTimeNs, errorClass); - } -}