Skip to content

Commit

Permalink
loadbalancer: Some follow up feedback for DefaultLoadBalancer (#2807)
Browse files Browse the repository at this point in the history
Motivation:

There are a few places that can be tightened up in
DefaultLoadBalancer.

Modifications:

- Fix some copyright dates that went stale over the new year
- Add some missing type parameters to avoid using raw types
- Cleanup some comments

Result:

Tighter code.
  • Loading branch information
bryce-anderson authored Jan 16, 2024
1 parent b2fc0bc commit 4d64f07
Show file tree
Hide file tree
Showing 16 changed files with 57 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ <T extends C> LoadBalancer<T> newLoadBalancer(
* new connections. Returned {@link LoadBalancer} will own the responsibility for this {@link ConnectionFactory}
* and hence will call {@link ConnectionFactory#closeAsync()} when {@link LoadBalancer#closeAsync()} is called.
* @param targetResource A {@link String} representation of the target resource for which the created instance
* will perform load balancing. Bear in mind, load balancing is performed over the a collection of hosts provided
* will perform load balancing. Bear in mind, load balancing is performed over the collection of hosts provided
* via the {@code eventPublisher} which may not correspond directly to a single unresolved address, but potentially
* a merged collection.
* @return a new {@link LoadBalancer}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ final class DefaultLoadBalancer<ResolvedAddress, C extends LoadBalancedConnectio
final int linearSearchSpace,
final LoadBalancerObserver<ResolvedAddress> loadBalancerObserver,
@Nullable final HealthCheckConfig healthCheckConfig,
@Nullable final Supplier<HealthChecker> healthCheckerFactory) {
@Nullable final Supplier<HealthChecker<ResolvedAddress>> healthCheckerFactory) {
this.targetResource = requireNonNull(targetResourceName);
this.lbDescription = makeDescription(id, targetResource);
this.hostSelector = requireNonNull(hostSelector, "hostSelector");
Expand All @@ -145,7 +145,7 @@ final class DefaultLoadBalancer<ResolvedAddress, C extends LoadBalancedConnectio
this.loadBalancerObserver = requireNonNull(loadBalancerObserver, "loadBalancerObserver");
this.healthCheckConfig = healthCheckConfig;
this.sequentialExecutor = new SequentialExecutor((uncaughtException) ->
LOGGER.error("{}: Uncaught exception in " + this.getClass().getSimpleName(), this, uncaughtException));
LOGGER.error("{}: Uncaught exception in {}", this, this.getClass().getSimpleName(), uncaughtException));
this.asyncCloseable = toAsyncCloseable(this::doClose);
// Maintain a Subscriber so signals are always delivered to replay and new Subscribers get the latest signal.
eventStream.ignoreElements().subscribe();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ final class DefaultLoadBalancerBuilder<ResolvedAddress, C extends LoadBalancedCo
@Nullable
private LoadBalancerObserver<ResolvedAddress> loadBalancerObserver;
@Nullable
private HealthCheckerFactory healthCheckerFactory;
private HealthCheckerFactory<ResolvedAddress> healthCheckerFactory;
private Duration healthCheckInterval = DEFAULT_HEALTH_CHECK_INTERVAL;
private Duration healthCheckJitter = DEFAULT_HEALTH_CHECK_JITTER;
private int healthCheckFailedConnectionsThreshold = DEFAULT_HEALTH_CHECK_FAILED_CONNECTIONS_THRESHOLD;
Expand All @@ -70,7 +70,8 @@ public LoadBalancerBuilder<ResolvedAddress, C> linearSearchSpace(int linearSearc
}

@Override
public LoadBalancerBuilder<ResolvedAddress, C> loadBalancingPolicy(LoadBalancingPolicy loadBalancingPolicy) {
public LoadBalancerBuilder<ResolvedAddress, C> loadBalancingPolicy(
LoadBalancingPolicy<ResolvedAddress, C> loadBalancingPolicy) {
this.loadBalancingPolicy = requireNonNull(loadBalancingPolicy, "loadBalancingPolicy");
return this;
}
Expand All @@ -83,7 +84,8 @@ public LoadBalancerBuilder<ResolvedAddress, C> loadBalancerObserver(
}

@Override
public LoadBalancerBuilder<ResolvedAddress, C> healthCheckerFactory(HealthCheckerFactory healthCheckerFactory) {
public LoadBalancerBuilder<ResolvedAddress, C> healthCheckerFactory(
HealthCheckerFactory<ResolvedAddress> healthCheckerFactory) {
this.healthCheckerFactory = healthCheckerFactory;
return this;
}
Expand Down Expand Up @@ -133,7 +135,7 @@ public LoadBalancerFactory<ResolvedAddress, C> build() {
}
final LoadBalancerObserver<ResolvedAddress> loadBalancerObserver = this.loadBalancerObserver != null ?
this.loadBalancerObserver : NoopLoadBalancerObserver.instance();
Supplier<HealthChecker> healthCheckerSupplier;
Supplier<HealthChecker<ResolvedAddress>> healthCheckerSupplier;
if (healthCheckerFactory == null) {
healthCheckerSupplier = null;
} else {
Expand All @@ -150,18 +152,18 @@ private static final class DefaultLoadBalancerFactory<ResolvedAddress, C extends
implements LoadBalancerFactory<ResolvedAddress, C> {

private final String id;
private final LoadBalancingPolicy loadBalancingPolicy;
private final LoadBalancingPolicy<ResolvedAddress, C> loadBalancingPolicy;
private final LoadBalancerObserver<ResolvedAddress> loadBalancerObserver;
private final int linearSearchSpace;
@Nullable
private final Supplier<HealthChecker> healthCheckerFactory;
private final Supplier<HealthChecker<ResolvedAddress>> healthCheckerFactory;
@Nullable
private final HealthCheckConfig healthCheckConfig;

DefaultLoadBalancerFactory(final String id, final LoadBalancingPolicy loadBalancingPolicy,
final int linearSearchSpace, final HealthCheckConfig healthCheckConfig,
DefaultLoadBalancerFactory(final String id, final LoadBalancingPolicy<ResolvedAddress, C> loadBalancingPolicy,
final int linearSearchSpace, final HealthCheckConfig healthCheckConfig,
final LoadBalancerObserver<ResolvedAddress> loadBalancerObserver,
final Supplier<HealthChecker> healthCheckerFactory) {
final Supplier<HealthChecker<ResolvedAddress>> healthCheckerFactory) {
this.id = requireNonNull(id, "id");
this.loadBalancingPolicy = requireNonNull(loadBalancingPolicy, "loadBalancingPolicy");
this.loadBalancerObserver = requireNonNull(loadBalancerObserver, "loadBalancerObserver");
Expand All @@ -174,7 +176,7 @@ private static final class DefaultLoadBalancerFactory<ResolvedAddress, C extends
public <T extends C> LoadBalancer<T> newLoadBalancer(String targetResource,
Publisher<? extends Collection<? extends ServiceDiscovererEvent<ResolvedAddress>>> eventPublisher,
ConnectionFactory<ResolvedAddress, T> connectionFactory) {
return new DefaultLoadBalancer<>(id, targetResource, eventPublisher,
return new DefaultLoadBalancer<ResolvedAddress, T>(id, targetResource, eventPublisher,
loadBalancingPolicy.buildSelector(Collections.emptyList(), targetResource), connectionFactory,
linearSearchSpace, loadBalancerObserver, healthCheckConfig, healthCheckerFactory);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,19 +87,19 @@ public final long beforeStart() {
}

@Override
public void observeSuccess(final long startTimeNanos) {
public void onSuccess(final long startTimeNanos) {
pendingUpdater.decrementAndGet(this);
calculateAndStore((ewma, currentLatency) -> currentLatency, startTimeNanos);
}

@Override
public void observeCancel(final long startTimeNanos) {
public void onCancel(final long startTimeNanos) {
pendingUpdater.decrementAndGet(this);
calculateAndStore(this::cancelPenalty, startTimeNanos);
}

@Override
public void observeError(final long startTimeNanos) {
public void onError(final long startTimeNanos) {
pendingUpdater.decrementAndGet(this);
calculateAndStore(this::errorPenalty, startTimeNanos);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2023 Apple Inc. and the ServiceTalk project authors
* Copyright © 2024 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,8 +19,6 @@

/**
* The representation of a health checking system for use with load balancing.
* <p>
* The core
*/
interface HealthChecker<ResolvedAddress> extends Cancellable {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2023 Apple Inc. and the ServiceTalk project authors
* Copyright © 2024 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2023 Apple Inc. and the ServiceTalk project authors
* Copyright © 2024 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ LoadBalancerBuilder<ResolvedAddress, C> loadBalancerObserver(
* {@link HealthChecker}.
* @return {code this}
*/
LoadBalancerBuilder<ResolvedAddress, C> healthCheckerFactory(HealthCheckerFactory healthCheckerFactory);
LoadBalancerBuilder<ResolvedAddress, C> healthCheckerFactory(
HealthCheckerFactory<ResolvedAddress> healthCheckerFactory);

/**
* This {@link LoadBalancer} may monitor hosts to which connection establishment has failed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,6 @@ interface LoadBalancingPolicy<ResolvedAddress, C extends LoadBalancedConnection>
* @param targetResource the name of the target resource, useful for debugging purposes.
* @return a {@link HostSelector}
*/
HostSelector<ResolvedAddress, C> buildSelector(List<Host<ResolvedAddress, C>> hosts, String targetResource);
<T extends C> HostSelector<ResolvedAddress, T> buildSelector(
List<Host<ResolvedAddress, T>> hosts, String targetResource);
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ private P2CLoadBalancingPolicy(final int maxEffort, final boolean failOpen, @Nul
}

@Override
public HostSelector<ResolvedAddress, C> buildSelector(List<Host<ResolvedAddress, C>> hosts, String targetResource) {
public <T extends C> HostSelector<ResolvedAddress, T> buildSelector(
List<Host<ResolvedAddress, T>> hosts, String targetResource) {
return new P2CSelector<>(hosts, targetResource, maxEffort, failOpen, random);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2020 Apple Inc. and the ServiceTalk project authors
* Copyright © 2023-2024 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -22,7 +22,6 @@
*/
interface RequestTracker {

@SuppressWarnings("rawtypes")
ContextMap.Key<RequestTracker> REQUEST_TRACKER_KEY =
ContextMap.Key.newKey("request_tracker", RequestTracker.class);

Expand All @@ -38,19 +37,19 @@ interface RequestTracker {
*
* @param beforeStartTimeNs return value from {@link #beforeStart()}.
*/
void observeSuccess(long beforeStartTimeNs);
void onSuccess(long beforeStartTimeNs);

/**
* Records cancellation of the action for which latency is to be tracked.
*
* @param beforeStartTimeNs return value from {@link #beforeStart()}.
*/
void observeCancel(long beforeStartTimeNs);
void onCancel(long beforeStartTimeNs);

/**
* Records a failed completion of the action for which latency is to be tracked.
*
* @param beforeStartTimeNs return value from {@link #beforeStart()}.
*/
void observeError(long beforeStartTimeNs);
void onError(long beforeStartTimeNs);
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,23 +40,23 @@ public long beforeStart() {
}

@Override
public void observeSuccess(final long beforeStartTimeNs) {
public void onSuccess(final long beforeStartTimeNs) {
// Tracks both levels
root.observeSuccess(beforeStartTimeNs);
leaf.observeSuccess(beforeStartTimeNs);
root.onSuccess(beforeStartTimeNs);
leaf.onSuccess(beforeStartTimeNs);
}

@Override
public void observeCancel(final long beforeStartTimeNs) {
public void onCancel(final long beforeStartTimeNs) {
// Tracks both levels
root.observeCancel(beforeStartTimeNs);
leaf.observeCancel(beforeStartTimeNs);
root.onCancel(beforeStartTimeNs);
leaf.onCancel(beforeStartTimeNs);
}

@Override
public void observeError(final long beforeStartTimeNs) {
public void onError(final long beforeStartTimeNs) {
// Tracks both levels
root.observeError(beforeStartTimeNs);
leaf.observeError(beforeStartTimeNs);
root.onError(beforeStartTimeNs);
leaf.onError(beforeStartTimeNs);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ private RoundRobinLoadBalancingPolicy(final boolean failOpen) {
}

@Override
public HostSelector<ResolvedAddress, C>
buildSelector(final List<Host<ResolvedAddress, C>> hosts, final String targetResource) {
public <T extends C> HostSelector<ResolvedAddress, T>
buildSelector(final List<Host<ResolvedAddress, T>> hosts, final String targetResource) {
return new RoundRobinSelector<>(hosts, targetResource, failOpen);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,15 +210,15 @@ public boolean isHealthy() {
}

@Override
public void observeSuccess(long beforeStartTime) {
public void onSuccess(long beforeStartTime) {
}

@Override
public void observeCancel(long beforeStartTimeNs) {
public void onCancel(long beforeStartTimeNs) {
}

@Override
public void observeError(long beforeStartTime) {
public void onError(long beforeStartTime) {
}
}

Expand Down Expand Up @@ -270,30 +270,30 @@ public String name() {
}

@Override
public HostSelector<String, TestLoadBalancedConnection> buildSelector(
List<Host<String, TestLoadBalancedConnection>> hosts, String targetResource) {
public <T extends TestLoadBalancedConnection> HostSelector<String, T> buildSelector(
List<Host<String, T>> hosts, String targetResource) {
return new TestSelector(hosts);
}

private class TestSelector implements HostSelector<String, TestLoadBalancedConnection> {
private class TestSelector<C extends TestLoadBalancedConnection> implements HostSelector<String, C> {

private final List<? extends Host<String, TestLoadBalancedConnection>> hosts;
private final List<? extends Host<String, C>> hosts;

TestSelector(final List<? extends Host<String, TestLoadBalancedConnection>> hosts) {
TestSelector(final List<? extends Host<String, C>> hosts) {
this.hosts = hosts;
}

@Override
public Single<TestLoadBalancedConnection> selectConnection(
Predicate<TestLoadBalancedConnection> selector, @Nullable ContextMap context,
public Single<C> selectConnection(
Predicate<C> selector, @Nullable ContextMap context,
boolean forceNewConnectionAndReserve) {
return hosts.isEmpty() ? failed(new IllegalStateException("shouldn't be empty"))
: hosts.get(0).newConnection(selector, false, context);
}

@Override
public HostSelector<String, TestLoadBalancedConnection> rebuildWithHosts(
List<? extends Host<String, TestLoadBalancedConnection>> hosts) {
public HostSelector<String, C> rebuildWithHosts(
List<? extends Host<String, C>> hosts) {
rebuilds++;
return new TestSelector(hosts);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@ void test() {
Assertions.assertEquals(0, requestTracker.score());

// upon success score
requestTracker.observeSuccess(requestTracker.beforeStart());
requestTracker.onSuccess(requestTracker.beforeStart());
Assertions.assertEquals(-500, requestTracker.score());

// error penalty
requestTracker.observeError(requestTracker.beforeStart());
requestTracker.onError(requestTracker.beforeStart());
Assertions.assertEquals(-5000, requestTracker.score());

// decay
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public LoadBalancerBuilder<String, TestLoadBalancedConnection> loadBalancerObser

@Override
public LoadBalancerBuilder<String, TestLoadBalancedConnection> healthCheckerFactory(
HealthCheckerFactory healthCheckerFactory) {
HealthCheckerFactory<String> healthCheckerFactory) {
throw new IllegalStateException("Cannot set a load balancer health checker for old round robin");
}

Expand Down

0 comments on commit 4d64f07

Please sign in to comment.