diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java index 4f71320556714..c58f5d8fb62b3 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java @@ -122,7 +122,7 @@ public void checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs( client.getRemoteClusterClient(clusterAlias), request, onFailure, - remoteClusterStateResponse -> { + remoteClusterStateResponse -> { ClusterState remoteClusterState = remoteClusterStateResponse.getState(); IndexMetaData leaderIndexMetaData = remoteClusterState.getMetaData().index(leaderIndex); if (leaderIndexMetaData == null) { diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java index d7a581c7daf69..296dc1b3187f8 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java @@ -32,6 +32,7 @@ import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.license.LicenseUtils; +import org.elasticsearch.xpack.ccr.Ccr; import org.elasticsearch.xpack.ccr.CcrLicenseChecker; import org.elasticsearch.xpack.ccr.CcrSettings; import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata; @@ -360,8 +361,8 @@ private void autoFollowIndices(final AutoFollowMetadata autoFollowMetadata, Map headers = autoFollowMetadata.getHeaders().get(autoFollowPatternName); List followedIndices = autoFollowMetadata.getFollowedLeaderIndexUUIDs().get(autoFollowPatternName); - final List leaderIndicesToFollow = getLeaderIndicesToFollow(autoFollowPattern, remoteClusterState, - clusterState, followedIndices); + final List leaderIndicesToFollow = + getLeaderIndicesToFollow(autoFollowPattern, remoteClusterState, followedIndices); if (leaderIndicesToFollow.isEmpty()) { finalise(slot, new AutoFollowResult(autoFollowPatternName)); } else { @@ -374,7 +375,7 @@ private void autoFollowIndices(final AutoFollowMetadata autoFollowMetadata, Consumer resultHandler = result -> finalise(slot, result); checkAutoFollowPattern(autoFollowPatternName, remoteCluster, autoFollowPattern, leaderIndicesToFollow, headers, - patternsForTheSameRemoteCluster, remoteClusterState.metaData(), resultHandler); + patternsForTheSameRemoteCluster, remoteClusterState.metaData(), clusterState.metaData(), resultHandler); } i++; } @@ -388,6 +389,7 @@ private void checkAutoFollowPattern(String autoFollowPattenName, Map headers, List> patternsForTheSameRemoteCluster, MetaData remoteMetadata, + MetaData localMetadata, Consumer resultHandler) { final CountDown leaderIndicesCountDown = new CountDown(leaderIndicesToFollow.size()); @@ -423,7 +425,16 @@ private void checkAutoFollowPattern(String autoFollowPattenName, } }); continue; + } else if (leaderIndexAlreadyFollowed(autoFollowPattern, indexToFollow, localMetadata)) { + updateAutoFollowMetadata(recordLeaderIndexAsFollowFunction(autoFollowPattenName, indexToFollow), error -> { + results.set(slot, new Tuple<>(indexToFollow, error)); + if (leaderIndicesCountDown.countDown()) { + resultHandler.accept(new AutoFollowResult(autoFollowPattenName, results.asList())); + } + }); + continue; } + followLeaderIndex(autoFollowPattenName, remoteCluster, indexToFollow, autoFollowPattern, headers, error -> { results.set(slot, new Tuple<>(indexToFollow, error)); if (leaderIndicesCountDown.countDown()) { @@ -434,6 +445,25 @@ private void checkAutoFollowPattern(String autoFollowPattenName, } } + private static boolean leaderIndexAlreadyFollowed(AutoFollowPattern autoFollowPattern, + Index leaderIndex, + MetaData localMetadata) { + String followIndexName = getFollowerIndexName(autoFollowPattern, leaderIndex.getName()); + IndexMetaData indexMetaData = localMetadata.index(followIndexName); + if (indexMetaData != null) { + // If an index with the same name exists, but it is not a follow index for this leader index then + // we should let the auto follower attempt to auto follow it, so it can fail later and + // it is then visible in the auto follow stats. For example a cluster can just happen to have + // an index with the same name as the new follower index. + Map customData = indexMetaData.getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY); + if (customData != null) { + String recordedLeaderIndexUUID = customData.get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_UUID_KEY); + return leaderIndex.getUUID().equals(recordedLeaderIndexUUID); + } + } + return false; + } + private void followLeaderIndex(String autoFollowPattenName, String remoteCluster, Index indexToFollow, @@ -485,7 +515,6 @@ private void finalise(int slot, AutoFollowResult result) { static List getLeaderIndicesToFollow(AutoFollowPattern autoFollowPattern, ClusterState remoteClusterState, - ClusterState followerClusterState, List followedIndexUUIDs) { List leaderIndicesToFollow = new ArrayList<>(); for (IndexMetaData leaderIndexMetaData : remoteClusterState.getMetaData()) { @@ -498,10 +527,7 @@ static List getLeaderIndicesToFollow(AutoFollowPattern autoFollowPattern, // this index will be auto followed. indexRoutingTable.allPrimaryShardsActive() && followedIndexUUIDs.contains(leaderIndexMetaData.getIndex().getUUID()) == false) { - // TODO: iterate over the indices in the followerClusterState and check whether a IndexMetaData - // has a leader index uuid custom metadata entry that matches with uuid of leaderIndexMetaData variable - // If so then handle it differently: not follow it, but just add an entry to - // AutoFollowMetadata#followedLeaderIndexUUIDs + leaderIndicesToFollow.add(leaderIndexMetaData.getIndex()); } } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java index 3acdde52a443e..5fbaf8c76ba11 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java @@ -24,6 +24,7 @@ import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.ccr.Ccr; import org.elasticsearch.xpack.ccr.CcrLicenseChecker; import org.elasticsearch.xpack.ccr.CcrSettings; import org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator.AutoFollower; @@ -345,8 +346,7 @@ public void testGetLeaderIndicesToFollow() { .routingTable(routingTableBuilder.build()) .build(); - List result = AutoFollower.getLeaderIndicesToFollow(autoFollowPattern, remoteState, clusterState, - Collections.emptyList()); + List result = AutoFollower.getLeaderIndicesToFollow(autoFollowPattern, remoteState, Collections.emptyList()); result.sort(Comparator.comparing(Index::getName)); assertThat(result.size(), equalTo(5)); assertThat(result.get(0).getName(), equalTo("metrics-0")); @@ -356,7 +356,7 @@ public void testGetLeaderIndicesToFollow() { assertThat(result.get(4).getName(), equalTo("metrics-4")); List followedIndexUUIDs = Collections.singletonList(remoteState.metaData().index("metrics-2").getIndexUUID()); - result = AutoFollower.getLeaderIndicesToFollow(autoFollowPattern, remoteState, clusterState, followedIndexUUIDs); + result = AutoFollower.getLeaderIndicesToFollow(autoFollowPattern, remoteState, followedIndexUUIDs); result.sort(Comparator.comparing(Index::getName)); assertThat(result.size(), equalTo(4)); assertThat(result.get(0).getName(), equalTo("metrics-0")); @@ -390,8 +390,7 @@ public void testGetLeaderIndicesToFollow_shardsNotStarted() { .routingTable(RoutingTable.builder(remoteState.routingTable()).add(indexRoutingTable).build()) .build(); - List result = AutoFollower.getLeaderIndicesToFollow(autoFollowPattern, remoteState, clusterState, - Collections.emptyList()); + List result = AutoFollower.getLeaderIndicesToFollow(autoFollowPattern, remoteState, Collections.emptyList()); assertThat(result.size(), equalTo(1)); assertThat(result.get(0).getName(), equalTo("index1")); @@ -404,7 +403,7 @@ public void testGetLeaderIndicesToFollow_shardsNotStarted() { .routingTable(RoutingTable.builder(remoteState.routingTable()).add(indexRoutingTable).build()) .build(); - result = AutoFollower.getLeaderIndicesToFollow(autoFollowPattern, remoteState, clusterState, Collections.emptyList()); + result = AutoFollower.getLeaderIndicesToFollow(autoFollowPattern, remoteState, Collections.emptyList()); assertThat(result.size(), equalTo(2)); result.sort(Comparator.comparing(Index::getName)); assertThat(result.get(0).getName(), equalTo("index1")); @@ -864,6 +863,83 @@ void cleanFollowedRemoteIndices(ClusterState remoteClusterState, List pa "because soft deletes are not enabled")); } + public void testAutoFollowerFollowerIndexAlreadyExists() { + Client client = mock(Client.class); + when(client.getRemoteClusterClient(anyString())).thenReturn(client); + + ClusterState remoteState = createRemoteClusterState("logs-20190101", true); + + AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("logs-*"), + null, null, null, null, null, null, null, null, null, null, null); + Map patterns = new HashMap<>(); + patterns.put("remote", autoFollowPattern); + Map> followedLeaderIndexUUIDS = new HashMap<>(); + followedLeaderIndexUUIDS.put("remote", new ArrayList<>()); + Map> autoFollowHeaders = new HashMap<>(); + autoFollowHeaders.put("remote", Collections.singletonMap("key", "val")); + AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata(patterns, followedLeaderIndexUUIDS, autoFollowHeaders); + + ClusterState currentState = ClusterState.builder(new ClusterName("name")) + .metaData(MetaData.builder() + .put(IndexMetaData.builder("logs-20190101") + .settings(settings(Version.CURRENT).put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)) + .putCustom(Ccr.CCR_CUSTOM_METADATA_KEY, Collections.singletonMap(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_UUID_KEY, + remoteState.metaData().index("logs-20190101").getIndexUUID())) + .numberOfShards(1) + .numberOfReplicas(0)) + .putCustom(AutoFollowMetadata.TYPE, autoFollowMetadata)) + .build(); + + + final Object[] resultHolder = new Object[1]; + Consumer> handler = results -> { + resultHolder[0] = results; + }; + AutoFollower autoFollower = new AutoFollower("remote", handler, localClusterStateSupplier(currentState), () -> 1L) { + @Override + void getRemoteClusterState(String remoteCluster, + long metadataVersion, + BiConsumer handler) { + assertThat(remoteCluster, equalTo("remote")); + handler.accept(new ClusterStateResponse(new ClusterName("name"), remoteState, 1L, false), null); + } + + @Override + void createAndFollow(Map headers, + PutFollowAction.Request followRequest, + Runnable successHandler, + Consumer failureHandler) { + fail("this should not be invoked"); + } + + @Override + void updateAutoFollowMetadata(Function updateFunction, + Consumer handler) { + ClusterState resultCs = updateFunction.apply(currentState); + AutoFollowMetadata result = resultCs.metaData().custom(AutoFollowMetadata.TYPE); + assertThat(result.getFollowedLeaderIndexUUIDs().size(), equalTo(1)); + assertThat(result.getFollowedLeaderIndexUUIDs().get("remote").size(), equalTo(1)); + handler.accept(null); + } + + @Override + void cleanFollowedRemoteIndices(ClusterState remoteClusterState, List patterns) { + // Ignore, to avoid invoking updateAutoFollowMetadata(...) twice + } + }; + autoFollower.start(); + + @SuppressWarnings("unchecked") + List results = (List) resultHolder[0]; + assertThat(results, notNullValue()); + assertThat(results.size(), equalTo(1)); + assertThat(results.get(0).clusterStateFetchException, nullValue()); + List> entries = new ArrayList<>(results.get(0).autoFollowExecutionResults.entrySet()); + assertThat(entries.size(), equalTo(1)); + assertThat(entries.get(0).getKey().getName(), equalTo("logs-20190101")); + assertThat(entries.get(0).getValue(), nullValue()); + } + private static ClusterState createRemoteClusterState(String indexName, Boolean enableSoftDeletes) { Settings.Builder indexSettings; if (enableSoftDeletes != null) {