Skip to content

Commit

Permalink
REST client: hosts marked dead for the first time should not be immed…
Browse files Browse the repository at this point in the history
…iately retried (#29230)

This was the plan from day one but due to a silly bug nodes were immediately retried after they were marked as dead for the first time. From the second time on, the expected backoff was applied.
  • Loading branch information
javanna committed Mar 27, 2018
1 parent 2ba6377 commit 9afd490
Show file tree
Hide file tree
Showing 5 changed files with 223 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,31 +26,50 @@
* when the host should be retried (based on number of previous failed attempts).
* Class is immutable, a new copy of it should be created each time the state has to be changed.
*/
final class DeadHostState {
final class DeadHostState implements Comparable<DeadHostState> {

private static final long MIN_CONNECTION_TIMEOUT_NANOS = TimeUnit.MINUTES.toNanos(1);
private static final long MAX_CONNECTION_TIMEOUT_NANOS = TimeUnit.MINUTES.toNanos(30);

static final DeadHostState INITIAL_DEAD_STATE = new DeadHostState();

private final int failedAttempts;
private final long deadUntilNanos;
private final TimeSupplier timeSupplier;

private DeadHostState() {
/**
* Build the initial dead state of a host. Useful when a working host stops functioning
* and needs to be marked dead after its first failure. In such case the host will be retried after a minute or so.
*
* @param timeSupplier a way to supply the current time and allow for unit testing
*/
DeadHostState(TimeSupplier timeSupplier) {
this.failedAttempts = 1;
this.deadUntilNanos = System.nanoTime() + MIN_CONNECTION_TIMEOUT_NANOS;
this.deadUntilNanos = timeSupplier.nanoTime() + MIN_CONNECTION_TIMEOUT_NANOS;
this.timeSupplier = timeSupplier;
}

/**
* We keep track of how many times a certain node fails consecutively. The higher that number is the longer we will wait
* to retry that same node again. Minimum is 1 minute (for a node the only failed once), maximum is 30 minutes (for a node
* that failed many consecutive times).
* Build the dead state of a host given its previous dead state. Useful when a host has been failing before, hence
* it already failed for one or more consecutive times. The more failed attempts we register the longer we wait
* to retry that same host again. Minimum is 1 minute (for a node the only failed once created
* through {@link #DeadHostState(TimeSupplier)}), maximum is 30 minutes (for a node that failed more than 10 consecutive times)
*
* @param previousDeadHostState the previous state of the host which allows us to increase the wait till the next retry attempt
*/
DeadHostState(DeadHostState previousDeadHostState) {
DeadHostState(DeadHostState previousDeadHostState, TimeSupplier timeSupplier) {
long timeoutNanos = (long)Math.min(MIN_CONNECTION_TIMEOUT_NANOS * 2 * Math.pow(2, previousDeadHostState.failedAttempts * 0.5 - 1),
MAX_CONNECTION_TIMEOUT_NANOS);
this.deadUntilNanos = System.nanoTime() + timeoutNanos;
this.deadUntilNanos = timeSupplier.nanoTime() + timeoutNanos;
this.failedAttempts = previousDeadHostState.failedAttempts + 1;
this.timeSupplier = timeSupplier;
}

/**
* Indicates whether it's time to retry to failed host or not.
*
* @return true if the host should be retried, false otherwise
*/
boolean shallBeRetried() {
return timeSupplier.nanoTime() - deadUntilNanos > 0;
}

/**
Expand All @@ -61,11 +80,35 @@ long getDeadUntilNanos() {
return deadUntilNanos;
}

int getFailedAttempts() {
return failedAttempts;
}

@Override
public int compareTo(DeadHostState other) {
return Long.compare(deadUntilNanos, other.deadUntilNanos);
}

@Override
public String toString() {
return "DeadHostState{" +
"failedAttempts=" + failedAttempts +
", deadUntilNanos=" + deadUntilNanos +
'}';
}

/**
* Time supplier that makes timing aspects pluggable to ease testing
*/
interface TimeSupplier {

TimeSupplier DEFAULT = new TimeSupplier() {
@Override
public long nanoTime() {
return System.nanoTime();
}
};

long nanoTime();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
import org.apache.http.nio.protocol.HttpAsyncResponseConsumer;

import javax.net.ssl.SSLHandshakeException;
import java.io.Closeable;
import java.io.IOException;
import java.net.URI;
Expand Down Expand Up @@ -437,18 +438,18 @@ private HostTuple<Iterator<HttpHost>> nextHost() {
do {
Set<HttpHost> filteredHosts = new HashSet<>(hostTuple.hosts);
for (Map.Entry<HttpHost, DeadHostState> entry : blacklist.entrySet()) {
if (System.nanoTime() - entry.getValue().getDeadUntilNanos() < 0) {
if (entry.getValue().shallBeRetried() == false) {
filteredHosts.remove(entry.getKey());
}
}
if (filteredHosts.isEmpty()) {
//last resort: if there are no good host to use, return a single dead one, the one that's closest to being retried
//last resort: if there are no good hosts to use, return a single dead one, the one that's closest to being retried
List<Map.Entry<HttpHost, DeadHostState>> sortedHosts = new ArrayList<>(blacklist.entrySet());
if (sortedHosts.size() > 0) {
Collections.sort(sortedHosts, new Comparator<Map.Entry<HttpHost, DeadHostState>>() {
@Override
public int compare(Map.Entry<HttpHost, DeadHostState> o1, Map.Entry<HttpHost, DeadHostState> o2) {
return Long.compare(o1.getValue().getDeadUntilNanos(), o2.getValue().getDeadUntilNanos());
return o1.getValue().compareTo(o2.getValue());
}
});
HttpHost deadHost = sortedHosts.get(0).getKey();
Expand Down Expand Up @@ -479,14 +480,15 @@ private void onResponse(HttpHost host) {
* Called after each failed attempt.
* Receives as an argument the host that was used for the failed attempt.
*/
private void onFailure(HttpHost host) throws IOException {
private void onFailure(HttpHost host) {
while(true) {
DeadHostState previousDeadHostState = blacklist.putIfAbsent(host, DeadHostState.INITIAL_DEAD_STATE);
DeadHostState previousDeadHostState = blacklist.putIfAbsent(host, new DeadHostState(DeadHostState.TimeSupplier.DEFAULT));
if (previousDeadHostState == null) {
logger.debug("added host [" + host + "] to blacklist");
break;
}
if (blacklist.replace(host, previousDeadHostState, new DeadHostState(previousDeadHostState))) {
if (blacklist.replace(host, previousDeadHostState,
new DeadHostState(previousDeadHostState, DeadHostState.TimeSupplier.DEFAULT))) {
logger.debug("updated host [" + host + "] already in blacklist");
break;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.client;

import java.util.concurrent.TimeUnit;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;

public class DeadHostStateTests extends RestClientTestCase {

private static long[] EXPECTED_TIMEOUTS_SECONDS = new long[]{60, 84, 120, 169, 240, 339, 480, 678, 960, 1357, 1800};

public void testInitialDeadHostStateDefaultTimeSupplier() {
DeadHostState deadHostState = new DeadHostState(DeadHostState.TimeSupplier.DEFAULT);
long currentTime = System.nanoTime();
assertThat(deadHostState.getDeadUntilNanos(), greaterThan(currentTime));
assertThat(deadHostState.getFailedAttempts(), equalTo(1));
}

public void testDeadHostStateFromPreviousDefaultTimeSupplier() {
DeadHostState previous = new DeadHostState(DeadHostState.TimeSupplier.DEFAULT);
int iters = randomIntBetween(5, 30);
for (int i = 0; i < iters; i++) {
DeadHostState deadHostState = new DeadHostState(previous, DeadHostState.TimeSupplier.DEFAULT);
assertThat(deadHostState.getDeadUntilNanos(), greaterThan(previous.getDeadUntilNanos()));
assertThat(deadHostState.getFailedAttempts(), equalTo(previous.getFailedAttempts() + 1));
previous = deadHostState;
}
}

public void testCompareToDefaultTimeSupplier() {
int numObjects = randomIntBetween(EXPECTED_TIMEOUTS_SECONDS.length, 30);
DeadHostState[] deadHostStates = new DeadHostState[numObjects];
for (int i = 0; i < numObjects; i++) {
if (i == 0) {
deadHostStates[i] = new DeadHostState(DeadHostState.TimeSupplier.DEFAULT);
} else {
deadHostStates[i] = new DeadHostState(deadHostStates[i - 1], DeadHostState.TimeSupplier.DEFAULT);
}
}
for (int k = 1; k < deadHostStates.length; k++) {
assertThat(deadHostStates[k - 1].getDeadUntilNanos(), lessThan(deadHostStates[k].getDeadUntilNanos()));
assertThat(deadHostStates[k - 1], lessThan(deadHostStates[k]));
}
}

public void testShallBeRetried() {
ConfigurableTimeSupplier timeSupplier = new ConfigurableTimeSupplier();
DeadHostState deadHostState = null;
for (int i = 0; i < EXPECTED_TIMEOUTS_SECONDS.length; i++) {
long expectedTimeoutSecond = EXPECTED_TIMEOUTS_SECONDS[i];
timeSupplier.nanoTime = 0;
if (i == 0) {
deadHostState = new DeadHostState(timeSupplier);
} else {
deadHostState = new DeadHostState(deadHostState, timeSupplier);
}
for (int j = 0; j < expectedTimeoutSecond; j++) {
timeSupplier.nanoTime += TimeUnit.SECONDS.toNanos(1);
assertThat(deadHostState.shallBeRetried(), is(false));
}
int iters = randomIntBetween(5, 30);
for (int j = 0; j < iters; j++) {
timeSupplier.nanoTime += TimeUnit.SECONDS.toNanos(1);
assertThat(deadHostState.shallBeRetried(), is(true));
}
}
}

public void testDeadHostStateTimeouts() {
ConfigurableTimeSupplier zeroTimeSupplier = new ConfigurableTimeSupplier();
zeroTimeSupplier.nanoTime = 0L;
DeadHostState previous = new DeadHostState(zeroTimeSupplier);
for (long expectedTimeoutsSecond : EXPECTED_TIMEOUTS_SECONDS) {
assertThat(TimeUnit.NANOSECONDS.toSeconds(previous.getDeadUntilNanos()), equalTo(expectedTimeoutsSecond));
previous = new DeadHostState(previous, zeroTimeSupplier);
}
//check that from here on the timeout does not increase
int iters = randomIntBetween(5, 30);
for (int i = 0; i < iters; i++) {
DeadHostState deadHostState = new DeadHostState(previous, zeroTimeSupplier);
assertThat(TimeUnit.NANOSECONDS.toSeconds(deadHostState.getDeadUntilNanos()),
equalTo(EXPECTED_TIMEOUTS_SECONDS[EXPECTED_TIMEOUTS_SECONDS.length - 1]));
previous = deadHostState;
}
}

private static class ConfigurableTimeSupplier implements DeadHostState.TimeSupplier {

long nanoTime;

@Override
public long nanoTime() {
return nanoTime;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public class RestClientSingleHostTests extends RestClientTestCase {

@Before
@SuppressWarnings("unchecked")
public void createRestClient() throws IOException {
public void createRestClient() {
httpClient = mock(CloseableHttpAsyncClient.class);
when(httpClient.<HttpResponse>execute(any(HttpAsyncRequestProducer.class), any(HttpAsyncResponseConsumer.class),
any(HttpClientContext.class), any(FutureCallback.class))).thenAnswer(new Answer<Future<HttpResponse>>() {
Expand Down Expand Up @@ -140,17 +140,6 @@ public Future<HttpResponse> answer(InvocationOnMock invocationOnMock) throws Thr
restClient = new RestClient(httpClient, 10000, defaultHeaders, new HttpHost[]{httpHost}, null, failureListener);
}

public void testNullPath() throws IOException {
for (String method : getHttpMethods()) {
try {
restClient.performRequest(method, null);
fail("path set to null should fail!");
} catch (NullPointerException e) {
assertEquals("path must not be null", e.getMessage());
}
}
}

/**
* Verifies the content of the {@link HttpRequest} that's internally created and passed through to the http client
*/
Expand All @@ -176,33 +165,6 @@ public void testInternalHttpRequest() throws Exception {
}
}

public void testSetHosts() throws IOException {
try {
restClient.setHosts((HttpHost[]) null);
fail("setHosts should have failed");
} catch (IllegalArgumentException e) {
assertEquals("hosts must not be null nor empty", e.getMessage());
}
try {
restClient.setHosts();
fail("setHosts should have failed");
} catch (IllegalArgumentException e) {
assertEquals("hosts must not be null nor empty", e.getMessage());
}
try {
restClient.setHosts((HttpHost) null);
fail("setHosts should have failed");
} catch (NullPointerException e) {
assertEquals("host cannot be null", e.getMessage());
}
try {
restClient.setHosts(new HttpHost("localhost", 9200), null, new HttpHost("localhost", 9201));
fail("setHosts should have failed");
} catch (NullPointerException e) {
assertEquals("host cannot be null", e.getMessage());
}
}

/**
* End to end test for ok status codes
*/
Expand Down Expand Up @@ -268,7 +230,7 @@ public void testErrorStatusCodes() throws IOException {
}
}

public void testIOExceptions() throws IOException {
public void testIOExceptions() {
for (String method : getHttpMethods()) {
//IOExceptions should be let bubble up
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.net.URI;
import java.util.Collections;

import static org.elasticsearch.client.RestClientTestUtil.getHttpMethods;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -111,8 +112,48 @@ public void testBuildUriLeavesPathUntouched() {
}
}

public void testSetHostsWrongArguments() throws IOException {
try (RestClient restClient = createRestClient()) {
restClient.setHosts((HttpHost[]) null);
fail("setHosts should have failed");
} catch (IllegalArgumentException e) {
assertEquals("hosts must not be null nor empty", e.getMessage());
}
try (RestClient restClient = createRestClient()) {
restClient.setHosts();
fail("setHosts should have failed");
} catch (IllegalArgumentException e) {
assertEquals("hosts must not be null nor empty", e.getMessage());
}
try (RestClient restClient = createRestClient()) {
restClient.setHosts((HttpHost) null);
fail("setHosts should have failed");
} catch (NullPointerException e) {
assertEquals("host cannot be null", e.getMessage());
}
try (RestClient restClient = createRestClient()) {
restClient.setHosts(new HttpHost("localhost", 9200), null, new HttpHost("localhost", 9201));
fail("setHosts should have failed");
} catch (NullPointerException e) {
assertEquals("host cannot be null", e.getMessage());
}
}

public void testNullPath() throws IOException {
try (RestClient restClient = createRestClient()) {
for (String method : getHttpMethods()) {
try {
restClient.performRequest(method, null);
fail("path set to null should fail!");
} catch (NullPointerException e) {
assertEquals("path must not be null", e.getMessage());
}
}
}
}

private static RestClient createRestClient() {
HttpHost[] hosts = new HttpHost[]{new HttpHost("localhost", 9200)};
return new RestClient(mock(CloseableHttpAsyncClient.class), randomLongBetween(1_000, 30_000), new Header[]{}, hosts, null, null);
return new RestClient(mock(CloseableHttpAsyncClient.class), randomIntBetween(1_000, 30_000), new Header[]{}, hosts, null, null);
}
}

0 comments on commit 9afd490

Please sign in to comment.