Skip to content

Commit

Permalink
Refactor AutoFollowCoordinator to track leader indices per remote clu…
Browse files Browse the repository at this point in the history
…ster (#36031)

and replaced poll interval setting with a hardcoded poll interval.
The hard coded interval will be removed in a follow up change to make
use of cluster state API's wait_for_metatdata_version.

Before the auto following was bootstrapped from thread pool scheduler,
but now auto followers for new remote clusters are bootstrapped when
a new cluster state is published.

Originates from #35895
Relates to #33007
  • Loading branch information
martijnvg authored Dec 5, 2018
1 parent 60e45cd commit a264cb6
Show file tree
Hide file tree
Showing 7 changed files with 317 additions and 180 deletions.
1 change: 1 addition & 0 deletions x-pack/plugin/ccr/qa/chain/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ followClusterTestCluster {
numNodes = 1
clusterName = 'follow-cluster'
setting 'xpack.license.self_generated.type', 'trial'
setting 'cluster.remote.leader_cluster.seeds', "\"${-> leaderClusterTest.nodes.get(0).transportUri()}\""
setting 'cluster.remote.middle_cluster.seeds', "\"${-> middleClusterTest.nodes.get(0).transportUri()}\""
setting 'node.name', 'follow'
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.ccr;

import org.elasticsearch.client.Request;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;

import java.util.Map;

import static org.hamcrest.Matchers.equalTo;

public class AutoFollowIT extends ESCCRRestTestCase {

public void testAutoFollowPatterns() throws Exception {
if ("follow".equals(targetCluster) == false) {
return;
}
Request putPatternRequest = new Request("PUT", "/_ccr/auto_follow/leader_cluster_pattern");
putPatternRequest.setJsonEntity("{\"leader_index_patterns\": [\"logs-*\"], \"remote_cluster\": \"leader_cluster\"}");
assertOK(client().performRequest(putPatternRequest));
putPatternRequest = new Request("PUT", "/_ccr/auto_follow/middle_cluster_pattern");
putPatternRequest.setJsonEntity("{\"leader_index_patterns\": [\"logs-*\"], \"remote_cluster\": \"middle_cluster\"}");
assertOK(client().performRequest(putPatternRequest));
try (RestClient leaderClient = buildLeaderClient()) {
Settings settings = Settings.builder()
.put("index.soft_deletes.enabled", true)
.build();
Request request = new Request("PUT", "/logs-20190101");
request.setJsonEntity("{\"settings\": " + Strings.toString(settings) +
", \"mappings\": {\"_doc\": {\"properties\": {\"field\": {\"type\": \"keyword\"}}}} }");
assertOK(leaderClient.performRequest(request));
for (int i = 0; i < 5; i++) {
String id = Integer.toString(i);
index(leaderClient, "logs-20190101", id, "field", i, "filtered_field", "true");
}
}
try (RestClient middleClient = buildMiddleClient()) {
Settings settings = Settings.builder()
.put("index.soft_deletes.enabled", true)
.build();
Request request = new Request("PUT", "/logs-20200101");
request.setJsonEntity("{\"settings\": " + Strings.toString(settings) +
", \"mappings\": {\"_doc\": {\"properties\": {\"field\": {\"type\": \"keyword\"}}}} }");
assertOK(middleClient.performRequest(request));
for (int i = 0; i < 5; i++) {
String id = Integer.toString(i);
index(middleClient, "logs-20200101", id, "field", i, "filtered_field", "true");
}
}
assertBusy(() -> {
Request statsRequest = new Request("GET", "/_ccr/stats");
Map<?, ?> response = toMap(client().performRequest(statsRequest));
Map<?, ?> autoFollowStats = (Map<?, ?>) response.get("auto_follow_stats");
assertThat(autoFollowStats.get("number_of_successful_follow_indices"), equalTo(2));

ensureYellow("logs-20190101");
ensureYellow("logs-20200101");
verifyDocuments("logs-20190101", 5, "filtered_field:true");
verifyDocuments("logs-20200101", 5, "filtered_field:true");
});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ public Collection<Object> createComponents(

return Arrays.asList(
ccrLicenseChecker,
new AutoFollowCoordinator(settings, client, threadPool, clusterService, ccrLicenseChecker)
new AutoFollowCoordinator(client, threadPool, clusterService, ccrLicenseChecker)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.xpack.core.XPackSettings;

import java.util.Arrays;
Expand All @@ -29,12 +28,6 @@ private CcrSettings() {
public static final Setting<Boolean> CCR_FOLLOWING_INDEX_SETTING =
Setting.boolSetting("index.xpack.ccr.following_index", false, Property.IndexScope, Property.InternalIndex);

/**
* Setting for controlling the interval in between polling leader clusters to check whether there are indices to follow
*/
public static final Setting<TimeValue> CCR_AUTO_FOLLOW_POLL_INTERVAL =
Setting.timeSetting("xpack.ccr.auto_follow.poll_interval", TimeValue.timeValueMillis(2500), Property.NodeScope);

/**
* The settings defined by CCR.
*
Expand All @@ -43,8 +36,7 @@ private CcrSettings() {
static List<Setting<?>> getSettings() {
return Arrays.asList(
XPackSettings.CCR_ENABLED_SETTING,
CCR_FOLLOWING_INDEX_SETTING,
CCR_AUTO_FOLLOW_POLL_INTERVAL);
CCR_FOLLOWING_INDEX_SETTING);
}

}
Loading

0 comments on commit a264cb6

Please sign in to comment.