diff --git a/x-pack/plugin/ccr/qa/multi-cluster-with-security/roles.yml b/x-pack/plugin/ccr/qa/multi-cluster-with-security/roles.yml index 7916bc6eee2cc..8320143a9fb40 100644 --- a/x-pack/plugin/ccr/qa/multi-cluster-with-security/roles.yml +++ b/x-pack/plugin/ccr/qa/multi-cluster-with-security/roles.yml @@ -2,7 +2,7 @@ ccruser: cluster: - manage_ccr indices: - - names: [ 'allowed-index' ] + - names: [ 'allowed-index', 'logs-eu-*' ] privileges: - monitor - read diff --git a/x-pack/plugin/ccr/qa/multi-cluster-with-security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java b/x-pack/plugin/ccr/qa/multi-cluster-with-security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java index 851a292ddae26..60b9f8f23e8b3 100644 --- a/x-pack/plugin/ccr/qa/multi-cluster-with-security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java +++ b/x-pack/plugin/ccr/qa/multi-cluster-with-security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.ccr; +import org.apache.http.HttpHost; import org.apache.http.util.EntityUtils; import org.elasticsearch.client.Request; import org.elasticsearch.client.Response; @@ -119,6 +120,45 @@ public void testFollowIndex() throws Exception { } } + public void testAutoFollowPatterns() throws Exception { + assumeFalse("Test should only run when both clusters are running", runningAgainstLeaderCluster); + String allowedIndex = "logs-eu-20190101"; + String disallowedIndex = "logs-us-20190101"; + + Request request = new Request("PUT", "/_ccr/auto_follow/leader_cluster"); + request.setJsonEntity("{\"leader_index_patterns\": [\"logs-*\"]}"); + assertOK(client().performRequest(request)); + + try (RestClient leaderClient = buildLeaderClient()) { + for (String index : new String[]{allowedIndex, disallowedIndex}) { + Settings settings = Settings.builder() + .put("index.soft_deletes.enabled", true) + .build(); + String requestBody = "{\"settings\": " + Strings.toString(settings) + + ", \"mappings\": {\"_doc\": {\"properties\": {\"field\": {\"type\": \"keyword\"}}}} }"; + request = new Request("PUT", "/" + index); + request.setJsonEntity(requestBody); + assertOK(leaderClient.performRequest(request)); + + for (int i = 0; i < 5; i++) { + String id = Integer.toString(i); + index(leaderClient, index, id, "field", i, "filtered_field", "true"); + } + } + } + + assertBusy(() -> { + ensureYellow(allowedIndex); + verifyDocuments(adminClient(), allowedIndex, 5); + }); + assertThat(indexExists(adminClient(), disallowedIndex), is(false)); + + // Cleanup by deleting auto follow pattern and unfollowing: + request = new Request("DELETE", "/_ccr/auto_follow/leader_cluster"); + assertOK(client().performRequest(request)); + unfollowIndex(allowedIndex); + } + private int countCcrNodeTasks() throws IOException { final Request request = new Request("GET", "/_tasks"); request.addParameter("detailed", "true"); @@ -139,6 +179,10 @@ private int countCcrNodeTasks() throws IOException { } private static void index(String index, String id, Object... fields) throws IOException { + index(adminClient(), index, id, fields); + } + + private static void index(RestClient client, String index, String id, Object... fields) throws IOException { XContentBuilder document = jsonBuilder().startObject(); for (int i = 0; i < fields.length; i += 2) { document.field((String) fields[i], fields[i + 1]); @@ -146,7 +190,7 @@ private static void index(String index, String id, Object... fields) throws IOEx document.endObject(); final Request request = new Request("POST", "/" + index + "/_doc/" + id); request.setJsonEntity(Strings.toString(document)); - assertOK(adminClient().performRequest(request)); + assertOK(client.performRequest(request)); } private static void refresh(String index) throws IOException { @@ -201,11 +245,34 @@ protected static void createIndex(String name, Settings settings, String mapping assertOK(adminClient().performRequest(request)); } + private static void ensureYellow(String index) throws IOException { + Request request = new Request("GET", "/_cluster/health/" + index); + request.addParameter("wait_for_status", "yellow"); + request.addParameter("wait_for_no_relocating_shards", "true"); + request.addParameter("wait_for_no_initializing_shards", "true"); + request.addParameter("timeout", "70s"); + request.addParameter("level", "shards"); + adminClient().performRequest(request); + } + + private RestClient buildLeaderClient() throws IOException { + assert runningAgainstLeaderCluster == false; + String leaderUrl = System.getProperty("tests.leader_host"); + int portSeparator = leaderUrl.lastIndexOf(':'); + HttpHost httpHost = new HttpHost(leaderUrl.substring(0, portSeparator), + Integer.parseInt(leaderUrl.substring(portSeparator + 1)), getProtocol()); + return buildClient(restAdminSettings(), new HttpHost[]{httpHost}); + } + private static boolean indexExists(RestClient client, String index) throws IOException { Response response = client.performRequest(new Request("HEAD", "/" + index)); return RestStatus.OK.getStatus() == response.getStatusLine().getStatusCode(); } + private static void unfollowIndex(String followIndex) throws IOException { + assertOK(client().performRequest(new Request("POST", "/" + followIndex + "/_ccr/unfollow"))); + } + private static void verifyCcrMonitoring(String expectedLeaderIndex, String expectedFollowerIndex) throws IOException { ensureYellow(".monitoring-*"); @@ -239,14 +306,4 @@ private static void verifyCcrMonitoring(String expectedLeaderIndex, String expec assertThat(numberOfOperationsIndexed, greaterThanOrEqualTo(1)); } - private static void ensureYellow(String index) throws IOException { - Request request = new Request("GET", "/_cluster/health/" + index); - request.addParameter("wait_for_status", "yellow"); - request.addParameter("wait_for_no_relocating_shards", "true"); - request.addParameter("wait_for_no_initializing_shards", "true"); - request.addParameter("timeout", "70s"); - request.addParameter("level", "shards"); - adminClient().performRequest(request); - } - } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java index 2161d0a14237a..c000072588704 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java @@ -7,17 +7,23 @@ package org.elasticsearch.xpack.ccr; import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.action.Action; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; +import org.elasticsearch.action.support.ContextPreservingActionListener; import org.elasticsearch.action.admin.indices.stats.IndexShardStats; import org.elasticsearch.action.admin.indices.stats.IndexStats; import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest; import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.client.Client; +import org.elasticsearch.client.FilterClient; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.CheckedConsumer; import org.elasticsearch.index.engine.CommitStats; import org.elasticsearch.index.engine.Engine; @@ -25,15 +31,19 @@ import org.elasticsearch.license.RemoteClusterLicenseChecker; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.xpack.ccr.action.ShardFollowTask; import org.elasticsearch.xpack.core.XPackPlugin; import java.util.Collections; import java.util.Locale; +import java.util.Map; import java.util.Objects; import java.util.function.BiConsumer; import java.util.function.BooleanSupplier; import java.util.function.Consumer; import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; /** * Encapsulates licensing checking for CCR. @@ -93,6 +103,7 @@ public void checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUU request.indices(leaderIndex); checkRemoteClusterLicenseAndFetchClusterState( client, + Collections.emptyMap(), clusterAlias, request, onFailure, @@ -115,6 +126,7 @@ public void checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUU * * @param client the client * @param clusterAlias the remote cluster alias + * @param headers the headers to use for leader client * @param request the cluster state request * @param onFailure the failure consumer * @param leaderClusterStateConsumer the leader cluster state consumer @@ -122,12 +134,14 @@ public void checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUU */ public void checkRemoteClusterLicenseAndFetchClusterState( final Client client, + final Map headers, final String clusterAlias, final ClusterStateRequest request, final Consumer onFailure, final Consumer leaderClusterStateConsumer) { checkRemoteClusterLicenseAndFetchClusterState( client, + headers, clusterAlias, request, onFailure, @@ -144,6 +158,7 @@ public void checkRemoteClusterLicenseAndFetchClusterState( * * @param client the client * @param clusterAlias the remote cluster alias + * @param headers the headers to use for leader client * @param request the cluster state request * @param onFailure the failure consumer * @param leaderClusterStateConsumer the leader cluster state consumer @@ -153,6 +168,7 @@ public void checkRemoteClusterLicenseAndFetchClusterState( */ private void checkRemoteClusterLicenseAndFetchClusterState( final Client client, + final Map headers, final String clusterAlias, final ClusterStateRequest request, final Consumer onFailure, @@ -167,7 +183,7 @@ private void checkRemoteClusterLicenseAndFetchClusterState( @Override public void onResponse(final RemoteClusterLicenseChecker.LicenseCheck licenseCheck) { if (licenseCheck.isSuccess()) { - final Client leaderClient = client.getRemoteClusterClient(clusterAlias); + final Client leaderClient = wrapClient(client.getRemoteClusterClient(clusterAlias), headers); final ActionListener clusterStateListener = ActionListener.wrap(s -> leaderClusterStateConsumer.accept(s.getState()), onFailure); // following an index in remote cluster, so use remote client to fetch leader index metadata @@ -237,6 +253,33 @@ public void fetchLeaderHistoryUUIDs( leaderClient.admin().indices().stats(request, ActionListener.wrap(indicesStatsHandler, onFailure)); } + public static Client wrapClient(Client client, Map headers) { + if (headers.isEmpty()) { + return client; + } else { + final ThreadContext threadContext = client.threadPool().getThreadContext(); + Map filteredHeaders = headers.entrySet().stream() + .filter(e -> ShardFollowTask.HEADER_FILTERS.contains(e.getKey())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + return new FilterClient(client) { + @Override + protected + void doExecute(Action action, Request request, ActionListener listener) { + final Supplier supplier = threadContext.newRestorableContext(false); + try (ThreadContext.StoredContext ignore = stashWithHeaders(threadContext, filteredHeaders)) { + super.doExecute(action, request, new ContextPreservingActionListener<>(supplier, listener)); + } + } + }; + } + } + + private static ThreadContext.StoredContext stashWithHeaders(ThreadContext threadContext, Map headers) { + final ThreadContext.StoredContext storedContext = threadContext.stashContext(); + threadContext.copyHeaders(headers.entrySet()); + return storedContext; + } + private static ElasticsearchStatusException indexMetadataNonCompliantRemoteLicense( final String leaderIndex, final RemoteClusterLicenseChecker.LicenseCheck licenseCheck) { final String clusterAlias = licenseCheck.remoteClusterLicenseInfo().clusterAlias(); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java index 722cbddde1891..180e5e3799098 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java @@ -103,19 +103,22 @@ private void doAutoFollow() { AutoFollower operation = new AutoFollower(handler, followerClusterState) { @Override - void getLeaderClusterState(final String leaderClusterAlias, final BiConsumer handler) { + void getLeaderClusterState(final Map headers, + final String leaderClusterAlias, + final BiConsumer handler) { final ClusterStateRequest request = new ClusterStateRequest(); request.clear(); request.metaData(true); if ("_local_".equals(leaderClusterAlias)) { + Client client = CcrLicenseChecker.wrapClient(AutoFollowCoordinator.this.client, headers); client.admin().cluster().state( request, ActionListener.wrap(r -> handler.accept(r.getState(), null), e -> handler.accept(null, e))); } else { - final Client leaderClient = client.getRemoteClusterClient(leaderClusterAlias); // TODO: set non-compliant status on auto-follow coordination that can be viewed via a stats API ccrLicenseChecker.checkRemoteClusterLicenseAndFetchClusterState( - leaderClient, + client, + headers, leaderClusterAlias, request, e -> handler.accept(null, e), @@ -125,15 +128,22 @@ void getLeaderClusterState(final String leaderClusterAlias, final BiConsumer headers, + FollowIndexAction.Request followRequest, Runnable successHandler, Consumer failureHandler) { - client.execute(CreateAndFollowIndexAction.INSTANCE, new CreateAndFollowIndexAction.Request(followRequest), - ActionListener.wrap(r -> successHandler.run(), failureHandler)); + Client followerClient = CcrLicenseChecker.wrapClient(client, headers); + CreateAndFollowIndexAction.Request request = new CreateAndFollowIndexAction.Request(followRequest); + followerClient.execute( + CreateAndFollowIndexAction.INSTANCE, + request, + ActionListener.wrap(r -> successHandler.run(), failureHandler) + ); } @Override - void updateAutoFollowMetadata(Function updateFunction, Consumer handler) { + void updateAutoFollowMetadata(Function updateFunction, + Consumer handler) { clusterService.submitStateUpdateTask("update_auto_follow_metadata", new ClusterStateUpdateTask() { @Override @@ -188,7 +198,7 @@ void autoFollowIndices() { AutoFollowPattern autoFollowPattern = entry.getValue(); List followedIndices = autoFollowMetadata.getFollowedLeaderIndexUUIDs().get(clusterAlias); - getLeaderClusterState(clusterAlias, (leaderClusterState, e) -> { + getLeaderClusterState(autoFollowPattern.getHeaders(), clusterAlias, (leaderClusterState, e) -> { if (leaderClusterState != null) { assert e == null; handleClusterAlias(clusterAlias, autoFollowPattern, followedIndices, leaderClusterState); @@ -251,7 +261,7 @@ private void handleClusterAlias(String clusterAlias, AutoFollowPattern autoFollo finalise(followError); } }; - createAndFollow(followRequest, successHandler, failureHandler); + createAndFollow(autoFollowPattern.getHeaders(), followRequest, successHandler, failureHandler); } } } @@ -314,14 +324,27 @@ static Function recordLeaderIndexAsFollowFunction(St /** * Fetch the cluster state from the leader with the specified cluster alias * + * @param headers the client headers * @param leaderClusterAlias the cluster alias of the leader * @param handler the callback to invoke */ - abstract void getLeaderClusterState(String leaderClusterAlias, BiConsumer handler); - - abstract void createAndFollow(FollowIndexAction.Request followRequest, Runnable successHandler, Consumer failureHandler); - - abstract void updateAutoFollowMetadata(Function updateFunction, Consumer handler); + abstract void getLeaderClusterState( + Map headers, + String leaderClusterAlias, + BiConsumer handler + ); + + abstract void createAndFollow( + Map headers, + FollowIndexAction.Request followRequest, + Runnable successHandler, + Consumer failureHandler + ); + + abstract void updateAutoFollowMetadata( + Function updateFunction, + Consumer handler + ); } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java index 8e1c1a27a369c..714e1fa289f17 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java @@ -5,18 +5,13 @@ */ package org.elasticsearch.xpack.ccr.action; -import org.elasticsearch.action.Action; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ActionRequest; -import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; import org.elasticsearch.action.admin.indices.stats.IndexStats; import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest; import org.elasticsearch.action.admin.indices.stats.ShardStats; -import org.elasticsearch.action.support.ContextPreservingActionListener; import org.elasticsearch.client.Client; -import org.elasticsearch.client.FilterClient; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData; @@ -24,7 +19,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; -import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.Index; import org.elasticsearch.index.seqno.SeqNoStats; @@ -48,8 +42,8 @@ import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.LongConsumer; -import java.util.function.Supplier; -import java.util.stream.Collectors; + +import static org.elasticsearch.xpack.ccr.CcrLicenseChecker.wrapClient; public class ShardFollowTasksExecutor extends PersistentTasksExecutor { @@ -86,11 +80,11 @@ protected AllocatedPersistentTask createTask(long id, String type, String action ShardFollowTask params = taskInProgress.getParams(); final Client leaderClient; if (params.getLeaderClusterAlias() != null) { - leaderClient = wrapClient(client.getRemoteClusterClient(params.getLeaderClusterAlias()), params); + leaderClient = wrapClient(client.getRemoteClusterClient(params.getLeaderClusterAlias()), params.getHeaders()); } else { - leaderClient = wrapClient(client, params); + leaderClient = wrapClient(client, params.getHeaders()); } - Client followerClient = wrapClient(client, params); + Client followerClient = wrapClient(client, params.getHeaders()); BiConsumer scheduler = (delay, command) -> { try { threadPool.schedule(delay, Ccr.CCR_THREAD_POOL_NAME, command); @@ -160,7 +154,7 @@ interface BiLongConsumer { @Override protected void nodeOperation(final AllocatedPersistentTask task, final ShardFollowTask params, final PersistentTaskState state) { - Client followerClient = wrapClient(client, params); + Client followerClient = wrapClient(client, params.getHeaders()); ShardFollowNodeTask shardFollowNodeTask = (ShardFollowNodeTask) task; logger.info("{} Started to track leader shard {}", params.getFollowShardId(), params.getLeaderShardId()); fetchGlobalCheckpoint(followerClient, params.getFollowShardId(), @@ -189,31 +183,4 @@ private void fetchGlobalCheckpoint( }, errorHandler)); } - private static Client wrapClient(Client client, ShardFollowTask shardFollowTask) { - if (shardFollowTask.getHeaders().isEmpty()) { - return client; - } else { - final ThreadContext threadContext = client.threadPool().getThreadContext(); - Map filteredHeaders = shardFollowTask.getHeaders().entrySet().stream() - .filter(e -> ShardFollowTask.HEADER_FILTERS.contains(e.getKey())) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - return new FilterClient(client) { - @Override - protected - void doExecute(Action action, Request request, ActionListener listener) { - final Supplier supplier = threadContext.newRestorableContext(false); - try (ThreadContext.StoredContext ignore = stashWithHeaders(threadContext, filteredHeaders)) { - super.doExecute(action, request, new ContextPreservingActionListener<>(supplier, listener)); - } - } - }; - } - } - - private static ThreadContext.StoredContext stashWithHeaders(ThreadContext threadContext, Map headers) { - final ThreadContext.StoredContext storedContext = threadContext.stashContext(); - threadContext.copyHeaders(headers.entrySet()); - return storedContext; - } - } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java index 4afd51f56e65e..748ba03f034bc 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java @@ -87,6 +87,10 @@ protected void masterOperation(PutAutoFollowPatternAction.Request request, clusterStateRequest.clear(); clusterStateRequest.metaData(true); + Map filteredHeaders = threadPool.getThreadContext().getHeaders().entrySet().stream() + .filter(e -> ShardFollowTask.HEADER_FILTERS.contains(e.getKey())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + leaderClient.admin().cluster().state( clusterStateRequest, ActionListener.wrap( @@ -102,7 +106,7 @@ protected AcknowledgedResponse newResponse(boolean acknowledged) { @Override public ClusterState execute(ClusterState currentState) throws Exception { - return innerPut(request, currentState, leaderClusterState); + return innerPut(request, filteredHeaders, currentState, leaderClusterState); } }); }, @@ -110,6 +114,7 @@ public ClusterState execute(ClusterState currentState) throws Exception { } static ClusterState innerPut(PutAutoFollowPatternAction.Request request, + Map filteredHeaders, ClusterState localState, ClusterState leaderClusterState) { // auto patterns are always overwritten @@ -151,8 +156,8 @@ static ClusterState innerPut(PutAutoFollowPatternAction.Request request, request.getMaxConcurrentWriteBatches(), request.getMaxWriteBufferSize(), request.getMaxRetryDelay(), - request.getIdleShardRetryDelay() - ); + request.getIdleShardRetryDelay(), + filteredHeaders); patterns.put(request.getLeaderClusterAlias(), autoFollowPattern); ClusterState.Builder newState = ClusterState.builder(localState); newState.metaData(MetaData.builder(localState.getMetaData()) diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowMetadataTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowMetadataTests.java index cc617abc38504..5ef7b4093ae60 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowMetadataTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowMetadataTests.java @@ -13,6 +13,7 @@ import java.io.IOException; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -37,10 +38,17 @@ protected AutoFollowMetadata createTestInstance() { Map> followedLeaderIndices = new HashMap<>(numEntries); for (int i = 0; i < numEntries; i++) { List leaderPatterns = Arrays.asList(generateRandomStringArray(4, 4, false)); - AutoFollowMetadata.AutoFollowPattern autoFollowPattern = - new AutoFollowMetadata.AutoFollowPattern(leaderPatterns, randomAlphaOfLength(4), randomIntBetween(0, Integer.MAX_VALUE), - randomIntBetween(0, Integer.MAX_VALUE), randomNonNegativeLong(), randomIntBetween(0, Integer.MAX_VALUE), - randomIntBetween(0, Integer.MAX_VALUE), TimeValue.timeValueMillis(500), TimeValue.timeValueMillis(500)); + AutoFollowMetadata.AutoFollowPattern autoFollowPattern = new AutoFollowMetadata.AutoFollowPattern( + leaderPatterns, + randomAlphaOfLength(4), + randomIntBetween(0, Integer.MAX_VALUE), + randomIntBetween(0, Integer.MAX_VALUE), + randomNonNegativeLong(), + randomIntBetween(0, Integer.MAX_VALUE), + randomIntBetween(0, Integer.MAX_VALUE), + TimeValue.timeValueMillis(500), + TimeValue.timeValueMillis(500), + randomBoolean() ? null : Collections.singletonMap("key", "value")); configs.put(Integer.toString(i), autoFollowPattern); followedLeaderIndices.put(Integer.toString(i), Arrays.asList(generateRandomStringArray(4, 4, false))); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrLicenseIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrLicenseIT.java index d8bf287254725..1e7e3fe42df27 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrLicenseIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrLicenseIT.java @@ -140,7 +140,7 @@ public void testAutoFollowCoordinatorLogsSkippingAutoFollowCoordinationWithNonCo @Override public ClusterState execute(ClusterState currentState) throws Exception { AutoFollowPattern autoFollowPattern = - new AutoFollowPattern(Collections.singletonList("logs-*"), null, null, null, null, null, null, null, null); + new AutoFollowPattern(Collections.singletonList("logs-*"), null, null, null, null, null, null, null, null, null); AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata( Collections.singletonMap("test_alias", autoFollowPattern), Collections.emptyMap() diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java index 5ab11cf5b0c81..31af326250c3b 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java @@ -51,7 +51,7 @@ public void testAutoFollower() { .build(); AutoFollowPattern autoFollowPattern = - new AutoFollowPattern(Collections.singletonList("logs-*"), null, null, null, null, null, null, null, null); + new AutoFollowPattern(Collections.singletonList("logs-*"), null, null, null, null, null, null, null, null, null); Map patterns = new HashMap<>(); patterns.put("remote", autoFollowPattern); Map> followedLeaderIndexUUIDS = new HashMap<>(); @@ -69,19 +69,25 @@ public void testAutoFollower() { }; AutoFollower autoFollower = new AutoFollower(handler, currentState) { @Override - void getLeaderClusterState(String leaderClusterAlias, BiConsumer handler) { + void getLeaderClusterState(Map headers, + String leaderClusterAlias, + BiConsumer handler) { handler.accept(leaderState, null); } @Override - void createAndFollow(FollowIndexAction.Request followRequest, Runnable successHandler, Consumer failureHandler) { + void createAndFollow(Map headers, + FollowIndexAction.Request followRequest, + Runnable successHandler, + Consumer failureHandler) { assertThat(followRequest.getLeaderIndex(), equalTo("remote:logs-20190101")); assertThat(followRequest.getFollowerIndex(), equalTo("logs-20190101")); successHandler.run(); } @Override - void updateAutoFollowMetadata(Function updateFunction, Consumer handler) { + void updateAutoFollowMetadata(Function updateFunction, + Consumer handler) { ClusterState resultCs = updateFunction.apply(currentState); AutoFollowMetadata result = resultCs.metaData().custom(AutoFollowMetadata.TYPE); assertThat(result.getFollowedLeaderIndexUUIDs().size(), equalTo(1)); @@ -98,7 +104,7 @@ public void testAutoFollowerClusterStateApiFailure() { when(client.getRemoteClusterClient(anyString())).thenReturn(client); AutoFollowPattern autoFollowPattern = - new AutoFollowPattern(Collections.singletonList("logs-*"), null, null, null, null, null, null, null, null); + new AutoFollowPattern(Collections.singletonList("logs-*"), null, null, null, null, null, null, null, null, null); Map patterns = new HashMap<>(); patterns.put("remote", autoFollowPattern); Map> followedLeaderIndexUUIDS = new HashMap<>(); @@ -116,17 +122,23 @@ public void testAutoFollowerClusterStateApiFailure() { }; AutoFollower autoFollower = new AutoFollower(handler, followerState) { @Override - void getLeaderClusterState(String leaderClusterAlias, BiConsumer handler) { + void getLeaderClusterState(Map headers, + String leaderClusterAlias, + BiConsumer handler) { handler.accept(null, failure); } @Override - void createAndFollow(FollowIndexAction.Request followRequest, Runnable successHandler, Consumer failureHandler) { + void createAndFollow(Map headers, + FollowIndexAction.Request followRequest, + Runnable successHandler, + Consumer failureHandler) { fail("should not get here"); } @Override - void updateAutoFollowMetadata(Function updateFunction, Consumer handler) { + void updateAutoFollowMetadata(Function updateFunction, + Consumer handler) { fail("should not get here"); } }; @@ -146,7 +158,7 @@ public void testAutoFollowerUpdateClusterStateFailure() { .build(); AutoFollowPattern autoFollowPattern = - new AutoFollowPattern(Collections.singletonList("logs-*"), null, null, null, null, null, null, null, null); + new AutoFollowPattern(Collections.singletonList("logs-*"), null, null, null, null, null, null, null, null, null); Map patterns = new HashMap<>(); patterns.put("remote", autoFollowPattern); Map> followedLeaderIndexUUIDS = new HashMap<>(); @@ -164,12 +176,17 @@ public void testAutoFollowerUpdateClusterStateFailure() { }; AutoFollower autoFollower = new AutoFollower(handler, followerState) { @Override - void getLeaderClusterState(String leaderClusterAlias, BiConsumer handler) { + void getLeaderClusterState(Map headers, + String leaderClusterAlias, + BiConsumer handler) { handler.accept(leaderState, null); } @Override - void createAndFollow(FollowIndexAction.Request followRequest, Runnable successHandler, Consumer failureHandler) { + void createAndFollow(Map headers, + FollowIndexAction.Request followRequest, + Runnable successHandler, + Consumer failureHandler) { assertThat(followRequest.getLeaderIndex(), equalTo("remote:logs-20190101")); assertThat(followRequest.getFollowerIndex(), equalTo("logs-20190101")); successHandler.run(); @@ -196,7 +213,7 @@ public void testAutoFollowerCreateAndFollowApiCallFailure() { .build(); AutoFollowPattern autoFollowPattern = - new AutoFollowPattern(Collections.singletonList("logs-*"), null, null, null, null, null, null, null, null); + new AutoFollowPattern(Collections.singletonList("logs-*"), null, null, null, null, null, null, null, null, null); Map patterns = new HashMap<>(); patterns.put("remote", autoFollowPattern); Map> followedLeaderIndexUUIDS = new HashMap<>(); @@ -214,19 +231,25 @@ public void testAutoFollowerCreateAndFollowApiCallFailure() { }; AutoFollower autoFollower = new AutoFollower(handler, followerState) { @Override - void getLeaderClusterState(String leaderClusterAlias, BiConsumer handler) { + void getLeaderClusterState(Map headers, + String leaderClusterAlias, + BiConsumer handler) { handler.accept(leaderState, null); } @Override - void createAndFollow(FollowIndexAction.Request followRequest, Runnable successHandler, Consumer failureHandler) { + void createAndFollow(Map headers, + FollowIndexAction.Request followRequest, + Runnable successHandler, + Consumer failureHandler) { assertThat(followRequest.getLeaderIndex(), equalTo("remote:logs-20190101")); assertThat(followRequest.getFollowerIndex(), equalTo("logs-20190101")); failureHandler.accept(failure); } @Override - void updateAutoFollowMetadata(Function updateFunction, Consumer handler) { + void updateAutoFollowMetadata(Function updateFunction, + Consumer handler) { fail("should not get here"); } }; @@ -236,7 +259,7 @@ void updateAutoFollowMetadata(Function updateFunctio public void testGetLeaderIndicesToFollow() { AutoFollowPattern autoFollowPattern = - new AutoFollowPattern(Collections.singletonList("metrics-*"), null, null, null, null, null, null, null, null); + new AutoFollowPattern(Collections.singletonList("metrics-*"), null, null, null, null, null, null, null, null, null); ClusterState followerState = ClusterState.builder(new ClusterName("remote")) .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, new AutoFollowMetadata(Collections.singletonMap("remote", autoFollowPattern), Collections.emptyMap()))) @@ -282,15 +305,15 @@ public void testGetLeaderIndicesToFollow() { public void testGetFollowerIndexName() { AutoFollowPattern autoFollowPattern = new AutoFollowPattern(Collections.singletonList("metrics-*"), null, null, - null, null, null, null, null, null); + null, null, null, null, null, null, null); assertThat(AutoFollower.getFollowerIndexName(autoFollowPattern, "metrics-0"), equalTo("metrics-0")); autoFollowPattern = new AutoFollowPattern(Collections.singletonList("metrics-*"), "eu-metrics-0", null, null, - null, null, null, null, null); + null, null, null, null, null, null); assertThat(AutoFollower.getFollowerIndexName(autoFollowPattern, "metrics-0"), equalTo("eu-metrics-0")); autoFollowPattern = new AutoFollowPattern(Collections.singletonList("metrics-*"), "eu-{{leader_index}}", null, - null, null, null, null, null, null); + null, null, null, null, null, null, null); assertThat(AutoFollower.getFollowerIndexName(autoFollowPattern, "metrics-0"), equalTo("eu-metrics-0")); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportDeleteAutoFollowPatternActionTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportDeleteAutoFollowPatternActionTests.java index 303133d3d82a3..2525b63de31f2 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportDeleteAutoFollowPatternActionTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportDeleteAutoFollowPatternActionTests.java @@ -30,7 +30,7 @@ public void testInnerDelete() { List existingPatterns = new ArrayList<>(); existingPatterns.add("transactions-*"); existingAutoFollowPatterns.put("eu_cluster", - new AutoFollowMetadata.AutoFollowPattern(existingPatterns, null, null, null, null, null, null, null, null)); + new AutoFollowMetadata.AutoFollowPattern(existingPatterns, null, null, null, null, null, null, null, null, null)); List existingUUIDS = new ArrayList<>(); existingUUIDS.add("_val"); @@ -40,7 +40,7 @@ public void testInnerDelete() { List existingPatterns = new ArrayList<>(); existingPatterns.add("logs-*"); existingAutoFollowPatterns.put("asia_cluster", - new AutoFollowMetadata.AutoFollowPattern(existingPatterns, null, null, null, null, null, null, null, null)); + new AutoFollowMetadata.AutoFollowPattern(existingPatterns, null, null, null, null, null, null, null, null, null)); List existingUUIDS = new ArrayList<>(); existingUUIDS.add("_val"); @@ -69,7 +69,7 @@ public void testInnerDeleteDoesNotExist() { List existingPatterns = new ArrayList<>(); existingPatterns.add("transactions-*"); existingAutoFollowPatterns.put("eu_cluster", - new AutoFollowMetadata.AutoFollowPattern(existingPatterns, null, null, null, null, null, null, null, null)); + new AutoFollowMetadata.AutoFollowPattern(existingPatterns, null, null, null, null, null, null, null, null, null)); } ClusterState clusterState = ClusterState.builder(new ClusterName("us_cluster")) .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternActionTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternActionTests.java index 6e7341154c888..5731a64ba8955 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternActionTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternActionTests.java @@ -39,7 +39,7 @@ public void testInnerPut() { .metaData(MetaData.builder()) .build(); - ClusterState result = TransportPutAutoFollowPatternAction.innerPut(request, localState, remoteState); + ClusterState result = TransportPutAutoFollowPatternAction.innerPut(request, null, localState, remoteState); AutoFollowMetadata autoFollowMetadata = result.metaData().custom(AutoFollowMetadata.TYPE); assertThat(autoFollowMetadata, notNullValue()); assertThat(autoFollowMetadata.getPatterns().size(), equalTo(1)); @@ -78,7 +78,7 @@ public void testInnerPut_existingLeaderIndices() { .metaData(mdBuilder) .build(); - ClusterState result = TransportPutAutoFollowPatternAction.innerPut(request, localState, remoteState); + ClusterState result = TransportPutAutoFollowPatternAction.innerPut(request, null, localState, remoteState); AutoFollowMetadata autoFollowMetadata = result.metaData().custom(AutoFollowMetadata.TYPE); assertThat(autoFollowMetadata, notNullValue()); assertThat(autoFollowMetadata.getPatterns().size(), equalTo(1)); @@ -97,7 +97,7 @@ public void testInnerPut_existingLeaderIndicesAndAutoFollowMetadata() { List existingPatterns = new ArrayList<>(); existingPatterns.add("transactions-*"); existingAutoFollowPatterns.put("eu_cluster", - new AutoFollowMetadata.AutoFollowPattern(existingPatterns, null, null, null, null, null, null, null, null)); + new AutoFollowMetadata.AutoFollowPattern(existingPatterns, null, null, null, null, null, null, null, null, null)); Map> existingAlreadyFollowedIndexUUIDS = new HashMap<>(); List existingUUIDS = new ArrayList<>(); existingUUIDS.add("_val"); @@ -120,7 +120,7 @@ public void testInnerPut_existingLeaderIndicesAndAutoFollowMetadata() { .metaData(mdBuilder) .build(); - ClusterState result = TransportPutAutoFollowPatternAction.innerPut(request, localState, remoteState); + ClusterState result = TransportPutAutoFollowPatternAction.innerPut(request, null, localState, remoteState); AutoFollowMetadata autoFollowMetadata = result.metaData().custom(AutoFollowMetadata.TYPE); assertThat(autoFollowMetadata, notNullValue()); assertThat(autoFollowMetadata.getPatterns().size(), equalTo(1)); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowMetadata.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowMetadata.java index 9c64ea3da764c..cc4ea7b009ec1 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowMetadata.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowMetadata.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.util.Arrays; +import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; import java.util.List; @@ -171,12 +172,14 @@ public static class AutoFollowPattern implements Writeable, ToXContentObject { public static final ParseField MAX_WRITE_BUFFER_SIZE = new ParseField("max_write_buffer_size"); public static final ParseField MAX_RETRY_DELAY = new ParseField("max_retry_delay"); public static final ParseField IDLE_SHARD_RETRY_DELAY = new ParseField("idle_shard_retry_delay"); + private static final ParseField HEADERS = new ParseField("headers"); @SuppressWarnings("unchecked") private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>("auto_follow_pattern", args -> new AutoFollowPattern((List) args[0], (String) args[1], (Integer) args[2], (Integer) args[3], - (Long) args[4], (Integer) args[5], (Integer) args[6], (TimeValue) args[7], (TimeValue) args[8])); + (Long) args[4], (Integer) args[5], (Integer) args[6], (TimeValue) args[7], (TimeValue) args[8], + (Map) args[9])); static { PARSER.declareStringArray(ConstructingObjectParser.constructorArg(), LEADER_PATTERNS_FIELD); @@ -192,6 +195,7 @@ public static class AutoFollowPattern implements Writeable, ToXContentObject { PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> TimeValue.parseTimeValue(p.text(), IDLE_SHARD_RETRY_DELAY.getPreferredName()), IDLE_SHARD_RETRY_DELAY, ObjectParser.ValueType.STRING); + PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> p.mapStrings(), HEADERS); } private final List leaderIndexPatterns; @@ -203,10 +207,18 @@ public static class AutoFollowPattern implements Writeable, ToXContentObject { private final Integer maxWriteBufferSize; private final TimeValue maxRetryDelay; private final TimeValue idleShardRetryDelay; - - public AutoFollowPattern(List leaderIndexPatterns, String followIndexPattern, Integer maxBatchOperationCount, - Integer maxConcurrentReadBatches, Long maxOperationSizeInBytes, Integer maxConcurrentWriteBatches, - Integer maxWriteBufferSize, TimeValue maxRetryDelay, TimeValue idleShardRetryDelay) { + private final Map headers; + + public AutoFollowPattern(List leaderIndexPatterns, + String followIndexPattern, + Integer maxBatchOperationCount, + Integer maxConcurrentReadBatches, + Long maxOperationSizeInBytes, + Integer maxConcurrentWriteBatches, + Integer maxWriteBufferSize, + TimeValue maxRetryDelay, + TimeValue idleShardRetryDelay, + Map headers) { this.leaderIndexPatterns = leaderIndexPatterns; this.followIndexPattern = followIndexPattern; this.maxBatchOperationCount = maxBatchOperationCount; @@ -216,6 +228,7 @@ public AutoFollowPattern(List leaderIndexPatterns, String followIndexPat this.maxWriteBufferSize = maxWriteBufferSize; this.maxRetryDelay = maxRetryDelay; this.idleShardRetryDelay = idleShardRetryDelay; + this.headers = headers != null ? Collections.unmodifiableMap(headers) : Collections.emptyMap(); } AutoFollowPattern(StreamInput in) throws IOException { @@ -228,6 +241,7 @@ public AutoFollowPattern(List leaderIndexPatterns, String followIndexPat maxWriteBufferSize = in.readOptionalVInt(); maxRetryDelay = in.readOptionalTimeValue(); idleShardRetryDelay = in.readOptionalTimeValue(); + this.headers = Collections.unmodifiableMap(in.readMap(StreamInput::readString, StreamInput::readString)); } public boolean match(String indexName) { @@ -274,6 +288,10 @@ public TimeValue getIdleShardRetryDelay() { return idleShardRetryDelay; } + public Map getHeaders() { + return headers; + } + @Override public void writeTo(StreamOutput out) throws IOException { out.writeStringList(leaderIndexPatterns); @@ -285,6 +303,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeOptionalVInt(maxWriteBufferSize); out.writeOptionalTimeValue(maxRetryDelay); out.writeOptionalTimeValue(idleShardRetryDelay); + out.writeMap(headers, StreamOutput::writeString, StreamOutput::writeString); } @Override @@ -314,6 +333,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (idleShardRetryDelay != null) { builder.field(IDLE_SHARD_RETRY_DELAY.getPreferredName(), idleShardRetryDelay); } + builder.field(HEADERS.getPreferredName(), headers); return builder; } @@ -335,7 +355,8 @@ public boolean equals(Object o) { Objects.equals(maxConcurrentWriteBatches, that.maxConcurrentWriteBatches) && Objects.equals(maxWriteBufferSize, that.maxWriteBufferSize) && Objects.equals(maxRetryDelay, that.maxRetryDelay) && - Objects.equals(idleShardRetryDelay, that.idleShardRetryDelay); + Objects.equals(idleShardRetryDelay, that.idleShardRetryDelay) && + Objects.equals(headers, that.headers); } @Override @@ -349,7 +370,8 @@ public int hashCode() { maxConcurrentWriteBatches, maxWriteBufferSize, maxRetryDelay, - idleShardRetryDelay + idleShardRetryDelay, + headers ); } }