Skip to content

Commit

Permalink
Make CCR resilient against missing remote cluster connections (#36682)
Browse files Browse the repository at this point in the history
Both index following and auto following should be resilient against missing remote connections.
This happens in the case that they get accidentally removed by a user. When this happens
auto following and index following will retry to continue instead of failing with unrecoverable exceptions.

Both the put follow and put auto follow APIs validate whether the
remote cluster connection. The logic added in this change only exists
in case during the lifetime of a follower index or auto follow pattern
the remote connection gets removed. This retry behavior similar how CCR
deals with authorization errors.

Closes #36667
Closes #36255
  • Loading branch information
martijnvg committed Dec 24, 2018
1 parent b0bcb32 commit 46dfbb3
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -244,11 +244,17 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
}

List<String> removedRemoteClusters = new ArrayList<>();
for (String remoteCluster : autoFollowers.keySet()) {
for (Map.Entry<String, AutoFollower> entry : autoFollowers.entrySet()) {
String remoteCluster = entry.getKey();
AutoFollower autoFollower = entry.getValue();
boolean exist = autoFollowMetadata.getPatterns().values().stream()
.anyMatch(pattern -> pattern.getRemoteCluster().equals(remoteCluster));
if (exist == false) {
removedRemoteClusters.add(remoteCluster);
} else if (autoFollower.remoteClusterConnectionMissing) {
LOGGER.info("Retrying auto follower [{}] after remote cluster connection was missing", remoteCluster);
autoFollower.remoteClusterConnectionMissing = false;
autoFollower.start();
}
}
this.autoFollowers = autoFollowers
Expand Down Expand Up @@ -280,6 +286,7 @@ abstract static class AutoFollower {

private volatile long lastAutoFollowTimeInMillis = -1;
private volatile long metadataVersion = 0;
private volatile boolean remoteClusterConnectionMissing = false;
private volatile CountDown autoFollowPatternsCountDown;
private volatile AtomicArray<AutoFollowResult> autoFollowResults;

Expand Down Expand Up @@ -326,6 +333,14 @@ void start() {
autoFollowIndices(autoFollowMetadata, clusterState, remoteClusterState, patterns);
} else {
assert remoteError != null;
String expectedErrorMessage = "unknown cluster alias [" + remoteCluster + "]";
if (remoteError instanceof IllegalArgumentException &&
expectedErrorMessage.equals(remoteError.getMessage())) {
LOGGER.info("AutoFollower for cluster [{}] has stopped, because remote connection is gone", remoteCluster);
remoteClusterConnectionMissing = true;
return;
}

for (int i = 0; i < patterns.size(); i++) {
String autoFollowPatternName = patterns.get(i);
finalise(i, new AutoFollowResult(autoFollowPatternName, remoteError));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ private void updateSettings(final LongConsumer handler, final AtomicInteger retr

private void handleFailure(Exception e, AtomicInteger retryCounter, Runnable task) {
assert e != null;
if (shouldRetry(e) && isStopped() == false) {
if (shouldRetry(params.getRemoteCluster(), e) && isStopped() == false) {
int currentRetry = retryCounter.incrementAndGet();
LOGGER.debug(new ParameterizedMessage("{} error during follow shard task, retrying [{}]",
params.getFollowShardId(), currentRetry), e);
Expand All @@ -441,13 +441,14 @@ static long computeDelay(int currentRetry, long maxRetryDelayInMillis) {
return Math.min(backOffDelay, maxRetryDelayInMillis);
}

static boolean shouldRetry(Exception e) {
static boolean shouldRetry(String remoteCluster, Exception e) {
if (NetworkExceptionHelper.isConnectException(e)) {
return true;
} else if (NetworkExceptionHelper.isCloseConnectionException(e)) {
return true;
}

String noSuchRemoteClusterMessage = "no such remote cluster: " + remoteCluster;
final Throwable actual = ExceptionsHelper.unwrapCause(e);
return actual instanceof ShardNotFoundException ||
actual instanceof IllegalIndexShardStateException ||
Expand All @@ -460,7 +461,8 @@ static boolean shouldRetry(Exception e) {
actual instanceof NodeDisconnectedException ||
actual instanceof NodeNotConnectedException ||
actual instanceof NodeClosedException ||
(actual.getMessage() != null && actual.getMessage().contains("TransportService is closed"));
(actual.getMessage() != null && actual.getMessage().contains("TransportService is closed")) ||
(actual instanceof IllegalArgumentException && noSuchRemoteClusterMessage.equals(actual.getMessage()));
}

// These methods are protected for testing purposes:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,11 @@ protected void innerSendShardChangesRequest(long from, int maxOperationCount, Co
request.setMaxOperationCount(maxOperationCount);
request.setMaxBatchSize(params.getMaxReadRequestSize());
request.setPollTimeout(params.getReadPollTimeout());
remoteClient.execute(ShardChangesAction.INSTANCE, request, ActionListener.wrap(handler::accept, errorHandler));
try {
remoteClient.execute(ShardChangesAction.INSTANCE, request, ActionListener.wrap(handler::accept, errorHandler));
} catch (Exception e) {
errorHandler.accept(e);
}
}
};
}
Expand Down Expand Up @@ -274,7 +278,7 @@ protected void nodeOperation(final AllocatedPersistentTask task, final ShardFoll
return;
}

if (ShardFollowNodeTask.shouldRetry(e)) {
if (ShardFollowNodeTask.shouldRetry(params.getRemoteCluster(), e)) {
logger.debug(new ParameterizedMessage("failed to fetch follow shard global {} checkpoint and max sequence number",
shardFollowNodeTask), e);
threadPool.schedule(params.getMaxRetryDelay(), Ccr.CCR_THREAD_POOL_NAME, () -> nodeOperation(task, params, state));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,31 @@

package org.elasticsearch.xpack.ccr;

import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.xpack.CcrSingleNodeTestCase;
import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction;
import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction;
import org.elasticsearch.xpack.core.ccr.action.PauseFollowAction;
import org.elasticsearch.xpack.core.ccr.action.PutAutoFollowPatternAction;
import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;
import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction;

import java.io.IOException;
import java.util.Collections;
import java.util.Map;

import static java.util.Collections.singletonMap;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;

@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/36764")
public class LocalIndexFollowingIT extends CcrSingleNodeTestCase {

public void testFollowIndex() throws Exception {
Expand Down Expand Up @@ -87,6 +93,51 @@ public void testDoNotCreateFollowerIfLeaderDoesNotHaveSoftDeletes() throws Excep
assertThat(client().admin().indices().prepareExists("follower-index").get().isExists(), equalTo(false));
}

public void testRemoveRemoteConnection() throws Exception {
PutAutoFollowPatternAction.Request request = new PutAutoFollowPatternAction.Request();
request.setName("my_pattern");
request.setRemoteCluster("local");
request.setLeaderIndexPatterns(Collections.singletonList("logs-*"));
request.setFollowIndexNamePattern("copy-{{leader_index}}");
request.setReadPollTimeout(TimeValue.timeValueMillis(10));
assertTrue(client().execute(PutAutoFollowPatternAction.INSTANCE, request).actionGet().isAcknowledged());

Settings leaderIndexSettings = Settings.builder()
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
.put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
.put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0)
.build();
createIndex("logs-20200101", leaderIndexSettings);
client().prepareIndex("logs-20200101", "doc").setSource("{}", XContentType.JSON).get();
assertBusy(() -> {
CcrStatsAction.Response response = client().execute(CcrStatsAction.INSTANCE, new CcrStatsAction.Request()).actionGet();
assertThat(response.getAutoFollowStats().getNumberOfSuccessfulFollowIndices(), equalTo(1L));
assertThat(response.getFollowStats().getStatsResponses().size(), equalTo(1));
assertThat(response.getFollowStats().getStatsResponses().get(0).status().followerGlobalCheckpoint(), equalTo(0L));
});

// Both auto follow patterns and index following should be resilient to remote connection being missing:
removeLocalRemote();
// This triggers a cluster state update, which should let auto follow coordinator retry auto following:
setupLocalRemote();

// This new index should be picked up by auto follow coordinator
createIndex("logs-20200102", leaderIndexSettings);
// This new document should be replicated to follower index:
client().prepareIndex("logs-20200101", "doc").setSource("{}", XContentType.JSON).get();
assertBusy(() -> {
CcrStatsAction.Response response = client().execute(CcrStatsAction.INSTANCE, new CcrStatsAction.Request()).actionGet();
assertThat(response.getAutoFollowStats().getNumberOfSuccessfulFollowIndices(), equalTo(2L));

FollowStatsAction.StatsRequest statsRequest = new FollowStatsAction.StatsRequest();
statsRequest.setIndices(new String[]{"copy-logs-20200101"});
FollowStatsAction.StatsResponses responses = client().execute(FollowStatsAction.INSTANCE, statsRequest).actionGet();
assertThat(responses.getStatsResponses().size(), equalTo(1));
assertThat(responses.getStatsResponses().get(0).status().getFatalException(), nullValue());
assertThat(responses.getStatsResponses().get(0).status().followerGlobalCheckpoint(), equalTo(1L));
});
}

public static String getIndexSettings(final int numberOfShards,
final int numberOfReplicas,
final Map<String, String> additionalIndexSettings) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

package org.elasticsearch.xpack.ccr.action;

import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
Expand Down Expand Up @@ -34,7 +33,6 @@
* Test scope is important to ensure that other tests added to this suite do not interfere with the expectation in
* testStatsWhenNoPersistentTasksMetaDataExists that the cluster state does not contain any persistent tasks metadata.
*/
@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/36764")
public class FollowStatsIT extends CcrSingleNodeTestCase {

/**
Expand Down

0 comments on commit 46dfbb3

Please sign in to comment.