Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Eureka peer connections loop #2775

Merged
merged 13 commits into from
Feb 8, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.net.MalformedURLException;
import java.net.URL;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

Expand Down Expand Up @@ -372,11 +373,39 @@ public static class ReplicationTaskProcessor implements TaskProcessor<Replicatio

private static final Pattern READ_TIME_OUT_PATTERN = Pattern.compile(".*read.*time.*out.*");

private final NetworkIssueCounter networkIssueCounter = new NetworkIssueCounter();

public ReplicationTaskProcessor(String peerId, HttpReplicationClient replicationClient) {
this.replicationClient = replicationClient;
this.peerId = peerId;
}

static class NetworkIssueCounter {

static final int MAX_RETRIES = 10;
JirkaAichler marked this conversation as resolved.
Show resolved Hide resolved
final AtomicInteger counter = new AtomicInteger(0);

public void success() {
int count = counter.get();
if (count > 0) {
log.trace("Network error indicator was reset. The number of errors was {}/{}", count, MAX_RETRIES);
}
counter.set(0);
}

public void fail(String errorMessage) {
int count = counter.getAndUpdate(prev -> Math.min(prev + 1, MAX_RETRIES));
log.trace("Network error ({}) occurred. The number of errors is {}{}/{}. The network error status is considered as {}.",
errorMessage, counter.get(), count >= MAX_RETRIES ? "+" : "",
MAX_RETRIES, isPermanent() ? "permanent" : "temporary");
JirkaAichler marked this conversation as resolved.
Show resolved Hide resolved
}

public boolean isPermanent() {
pablocarle marked this conversation as resolved.
Show resolved Hide resolved
return counter.get() >= MAX_RETRIES;
}

}

@Override
public ProcessingResult process(ReplicationTask task) {
try {
Expand All @@ -387,25 +416,27 @@ public ProcessingResult process(ReplicationTask task) {
log.debug("Replication task {} completed with status {}, (includes entity {})", task.getTaskName(), statusCode, entity != null);

if (isSuccess(statusCode)) {
networkIssueCounter.success();
task.handleSuccess();
} else if (statusCode == 503) {
networkIssueCounter.fail("Service is not available");
log.debug("Server busy (503) reply for task {}", task.getTaskName());
return ProcessingResult.Congestion;
} else {
task.handleFailure(statusCode, entity);
return ProcessingResult.PermanentError;
}
} catch (Throwable e) {
networkIssueCounter.fail(e.getLocalizedMessage());
if (maybeReadTimeOut(e)) {
log.error("It seems to be a socket read timeout exception, it will retry later. if it continues to happen and some eureka node occupied all the cpu time, you should set property 'eureka.server.peer-node-read-timeout-ms' to a bigger value", e);
//read timeout exception is more Congestion than TransientError, return Congestion for longer delay
return ProcessingResult.Congestion;
} else if (isNetworkConnectException(e)) {
logNetworkErrorSample(task, e);
} else if (isNetworkConnectException(e) && !networkIssueCounter.isPermanent()) {
logNetworkErrorSample(task, e, "; retrying after delay");
return ProcessingResult.TransientError;
} else {
log.error("{}: {} Not re-trying this exception because it does not seem to be a network exception",
peerId, task.getTaskName(), e);
logNetworkErrorSample(task, e, "; not re-trying this exception because it does not seem to be a network exception");
return ProcessingResult.PermanentError;
}
}
Expand All @@ -420,6 +451,7 @@ public ProcessingResult process(List<ReplicationTask> tasks) {
int statusCode = response.getStatusCode();
if (!isSuccess(statusCode)) {
if (statusCode == 503) {
networkIssueCounter.fail("Service is not available");
log.warn("Server busy (503) HTTP status code received from the peer {}; rescheduling tasks after delay", peerId);
return ProcessingResult.Congestion;
} else {
Expand All @@ -428,18 +460,20 @@ public ProcessingResult process(List<ReplicationTask> tasks) {
return ProcessingResult.PermanentError;
}
} else {
networkIssueCounter.success();
handleBatchResponse(tasks, response.getEntity().getResponseList());
}
} catch (Throwable e) {
networkIssueCounter.fail(e.getLocalizedMessage());
if (maybeReadTimeOut(e)) {
log.error("It seems to be a socket read timeout exception, it will retry later. if it continues to happen and some eureka node occupied all the cpu time, you should set property 'eureka.server.peer-node-read-timeout-ms' to a bigger value", e);
//read timeout exception is more Congestion than TransientError, return Congestion for longer delay
return ProcessingResult.Congestion;
} else if (isNetworkConnectException(e)) {
logNetworkErrorSample(null, e);
} else if (isNetworkConnectException(e) && !networkIssueCounter.isPermanent()) {
logNetworkErrorSample(null, e, "; retrying after delay");
JirkaAichler marked this conversation as resolved.
Show resolved Hide resolved
return ProcessingResult.TransientError;
} else {
log.error("Not re-trying this exception because it does not seem to be a network exception", e);
logNetworkErrorSample(null, e, "; not re-trying this exception because it does not seem to be a network exception");
return ProcessingResult.PermanentError;
}
}
Expand All @@ -452,7 +486,7 @@ public ProcessingResult process(List<ReplicationTask> tasks) {
* 20 threads * 100ms delay == 200 error entries / sec worst case
* Still we would like to see the exception samples, so we print samples at regular intervals.
*/
private void logNetworkErrorSample(ReplicationTask task, Throwable e) {
private void logNetworkErrorSample(ReplicationTask task, Throwable e, String retryMessage) {
long now = System.currentTimeMillis();
if (now - lastNetworkErrorTime > 10000) {
lastNetworkErrorTime = now;
Expand All @@ -461,7 +495,7 @@ private void logNetworkErrorSample(ReplicationTask task, Throwable e) {
if (task != null) {
sb.append(" for task ").append(task.getTaskName());
}
sb.append("; retrying after delay");
sb.append(retryMessage);
log.error(sb.toString(), e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@

import javax.net.ssl.SSLException;
import java.util.Collections;
import java.util.List;
import java.util.stream.IntStream;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.zowe.apiml.product.eureka.client.ApimlPeerEurekaNode.ReplicationTaskProcessor;
import static org.zowe.apiml.product.eureka.client.ApimlPeerEurekaNode.ReplicationTaskProcessor.NetworkIssueCounter.MAX_RETRIES;
import static org.zowe.apiml.product.eureka.client.TestableInstanceReplicationTask.ProcessingState;
import static org.zowe.apiml.product.eureka.client.TestableInstanceReplicationTask.aReplicationTask;

Expand Down Expand Up @@ -77,6 +80,48 @@ public void whenNonNetworkError_thenSetPermanent() {
assertThat(status, is(ProcessingResult.PermanentError));
assertThat(task.getProcessingState(), is(ProcessingState.Failed));
}

@Test
void whenNetworkProblemRepeatedMultipleTimes_thenSetPermanentAndRecoverWhenNetworkIsOk() {
TestableInstanceReplicationTask task = aReplicationTask().withAction(Action.Heartbeat).withNetworkFailures(MAX_RETRIES).build();

ProcessingResult status = replicationTaskProcessor.process(task);
assertThat(status, is(ProcessingResult.TransientError));

IntStream.range(1, MAX_RETRIES - 2).forEach(n -> replicationTaskProcessor.process(task));

status = replicationTaskProcessor.process(task);
assertThat(status, is(ProcessingResult.TransientError));

status = replicationTaskProcessor.process(task);
assertThat(status, is(ProcessingResult.PermanentError));

status = replicationTaskProcessor.process(task);
assertThat(status, is(ProcessingResult.Success));
}

@Test
void whenNetworkProblemRepeatedMultipleTimes_thenResetCounterAfterSuccessfulConnection() {
TestableInstanceReplicationTask task1 = aReplicationTask().withAction(Action.Heartbeat).withNetworkFailures(MAX_RETRIES - 5).build();
JirkaAichler marked this conversation as resolved.
Show resolved Hide resolved
TestableInstanceReplicationTask task2 = aReplicationTask().withAction(Action.Heartbeat).withNetworkFailures(MAX_RETRIES).build();

IntStream.range(1, MAX_RETRIES - 5).forEach(n -> replicationTaskProcessor.process(task1));

ProcessingResult status = replicationTaskProcessor.process(task1);
assertThat(status, is(ProcessingResult.TransientError));

status = replicationTaskProcessor.process(task1);
assertThat(status, is(ProcessingResult.Success));


IntStream.range(1, MAX_RETRIES - 1).forEach(n -> replicationTaskProcessor.process(task2));

status = replicationTaskProcessor.process(task2);
assertThat(status, is(ProcessingResult.TransientError));

status = replicationTaskProcessor.process(task2);
assertThat(status, is(ProcessingResult.PermanentError));
}
}

@Nested
Expand Down Expand Up @@ -152,5 +197,57 @@ public void whenNetworkOKAndBatchFailed_thenSetFailed() {
assertThat(status, is(ProcessingResult.Success));
assertThat(task.getProcessingState(), is(ProcessingState.Failed));
}

@Test
void whenNetworkProblemRepeatedMultipleTimes_thenSetPermanentAndRecoverWhenNetworkIsOk() {
TestableInstanceReplicationTask task = aReplicationTask().build();
List<ReplicationTask> tasks = Collections.singletonList(task);
replicationClient.withNetworkError(MAX_RETRIES);

ProcessingResult status = replicationTaskProcessor.process(tasks);
assertThat(status, is(ProcessingResult.TransientError));

IntStream.range(1, MAX_RETRIES - 2).forEach(n -> replicationTaskProcessor.process(tasks));

status = replicationTaskProcessor.process(tasks);
assertThat(status, is(ProcessingResult.TransientError));

status = replicationTaskProcessor.process(tasks);
assertThat(status, is(ProcessingResult.PermanentError));

replicationClient.withBatchReply(200);
replicationClient.withNetworkStatusCode(200);

status = replicationTaskProcessor.process(tasks);
assertThat(status, is(ProcessingResult.Success));
}

@Test
void whenNetworkProblemRepeatedMultipleTimes_thenResetCounterAfterSuccessfulConnection() {
TestableInstanceReplicationTask task = aReplicationTask().build();
List<ReplicationTask> tasks = Collections.singletonList(task);
replicationClient.withNetworkError(MAX_RETRIES - 5);

IntStream.range(1, MAX_RETRIES - 5).forEach(n -> replicationTaskProcessor.process(tasks));

ProcessingResult status = replicationTaskProcessor.process(tasks);
assertThat(status, is(ProcessingResult.TransientError));

replicationClient.withBatchReply(200);
replicationClient.withNetworkStatusCode(200);

status = replicationTaskProcessor.process(tasks);
assertThat(status, is(ProcessingResult.Success));

replicationClient.withNetworkError(MAX_RETRIES + 5);

IntStream.range(1, MAX_RETRIES - 1).forEach(n -> replicationTaskProcessor.process(tasks));

status = replicationTaskProcessor.process(tasks);
assertThat(status, is(ProcessingResult.TransientError));

status = replicationTaskProcessor.process(tasks);
assertThat(status, is(ProcessingResult.PermanentError));
}
}
}