From 2d6a7555316fcdaef79b8812cf13bf5a85df877d Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Tue, 18 Dec 2018 18:06:39 -0700 Subject: [PATCH 1/4] WIP --- .../xpack/ccr/repository/CcrRepository.java | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java index aeaa7fc5eaf57..bf2f3546f53a6 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -8,10 +8,14 @@ import org.apache.lucene.index.IndexCommit; import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; +import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.RepositoryMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -20,6 +24,7 @@ import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.Index; import org.elasticsearch.index.engine.EngineException; import org.elasticsearch.index.shard.IndexShard; @@ -261,6 +266,7 @@ public void restoreShard(IndexShard indexShard, SnapshotId snapshotId, Version v String nodeId = response.getNodeId(); // TODO: Implement file restore closeSession(remoteClient, nodeId, sessionUUID); + } @Override @@ -268,6 +274,22 @@ public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Ve throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); } + private void updateMapping(Client localClient, Client remoteClient, Index leaderIndex, Index followerIndex) { + ClusterStateRequest clusterStateRequest = new ClusterStateRequest(); + clusterStateRequest.clear(); + clusterStateRequest.metaData(true); + clusterStateRequest.indices(leaderIndex.getName()); + ClusterStateResponse clusterState = remoteClient.admin().cluster().state(clusterStateRequest).actionGet(); + IndexMetaData leaderIndexMetadata = clusterState.getState().metaData().getIndexSafe(leaderIndex); + long mappingVersion = leaderIndexMetadata.getMappingVersion(); + + MappingMetaData mappingMetaData = leaderIndexMetadata.mapping(); + PutMappingRequest putMappingRequest = new PutMappingRequest(followerIndex.getName()); + putMappingRequest.type(mappingMetaData.type()); + putMappingRequest.source(mappingMetaData.source().string(), XContentType.JSON); + localClient.admin().indices().putMapping(putMappingRequest).actionGet(); + } + private void closeSession(Client remoteClient, String nodeId, String sessionUUID) { ClearCcrRestoreSessionRequest clearRequest = new ClearCcrRestoreSessionRequest(nodeId, new ClearCcrRestoreSessionRequest.Request(nodeId, sessionUUID)); From ad235784b9e8c0e5eb38f8be09bdaef2b15f40f4 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Wed, 19 Dec 2018 21:20:01 -0700 Subject: [PATCH 2/4] WIP --- .../xpack/ccr/repository/CcrRepository.java | 26 ++++---- .../xpack/ccr/CcrRepositoryIT.java | 59 +++++++++++++++++++ 2 files changed, 74 insertions(+), 11 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java index bf2f3546f53a6..b0d1410a20071 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -8,7 +8,6 @@ import org.apache.lucene.index.IndexCommit; import org.elasticsearch.Version; -import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; @@ -26,6 +25,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.engine.EngineException; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardRecoveryException; @@ -257,7 +257,8 @@ public void restoreShard(IndexShard indexShard, SnapshotId snapshotId, Version v Map ccrMetaData = indexShard.indexSettings().getIndexMetaData().getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY); String leaderUUID = ccrMetaData.get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_UUID_KEY); - ShardId leaderShardId = new ShardId(shardId.getIndexName(), leaderUUID, shardId.getId()); + Index leaderIndex = new Index(shardId.getIndexName(), leaderUUID); + ShardId leaderShardId = new ShardId(leaderIndex, shardId.getId()); Client remoteClient = client.getRemoteClusterClient(remoteClusterAlias); String sessionUUID = UUIDs.randomBase64UUID(); @@ -266,7 +267,7 @@ public void restoreShard(IndexShard indexShard, SnapshotId snapshotId, Version v String nodeId = response.getNodeId(); // TODO: Implement file restore closeSession(remoteClient, nodeId, sessionUUID); - + maybeUpdateMappings(client, remoteClient, leaderIndex, indexShard.indexSettings()); } @Override @@ -274,20 +275,23 @@ public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Ve throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); } - private void updateMapping(Client localClient, Client remoteClient, Index leaderIndex, Index followerIndex) { + private void maybeUpdateMappings(Client localClient, Client remoteClient, Index leaderIndex, IndexSettings followerIndexSettings) { ClusterStateRequest clusterStateRequest = new ClusterStateRequest(); clusterStateRequest.clear(); clusterStateRequest.metaData(true); clusterStateRequest.indices(leaderIndex.getName()); ClusterStateResponse clusterState = remoteClient.admin().cluster().state(clusterStateRequest).actionGet(); IndexMetaData leaderIndexMetadata = clusterState.getState().metaData().getIndexSafe(leaderIndex); - long mappingVersion = leaderIndexMetadata.getMappingVersion(); - - MappingMetaData mappingMetaData = leaderIndexMetadata.mapping(); - PutMappingRequest putMappingRequest = new PutMappingRequest(followerIndex.getName()); - putMappingRequest.type(mappingMetaData.type()); - putMappingRequest.source(mappingMetaData.source().string(), XContentType.JSON); - localClient.admin().indices().putMapping(putMappingRequest).actionGet(); + long leaderMappingVersion = leaderIndexMetadata.getMappingVersion(); + + if (leaderMappingVersion > followerIndexSettings.getIndexMetaData().getMappingVersion()) { + Index followerIndex = followerIndexSettings.getIndex(); + MappingMetaData mappingMetaData = leaderIndexMetadata.mapping(); + PutMappingRequest putMappingRequest = new PutMappingRequest(followerIndex.getName()); + putMappingRequest.type(mappingMetaData.type()); + putMappingRequest.source(mappingMetaData.source().string(), XContentType.JSON); + localClient.admin().indices().putMapping(putMappingRequest).actionGet(); + } } private void closeSession(Client remoteClient, String nodeId, String sessionUUID) { diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java index f711dd4303f2a..2d3ca857ff848 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java @@ -8,6 +8,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; +import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.PlainActionFuture; @@ -15,12 +16,14 @@ import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.RestoreInProgress; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.repositories.RepositoriesService; @@ -35,6 +38,7 @@ import org.elasticsearch.xpack.ccr.repository.CcrRestoreSourceService; import java.io.IOException; +import java.util.Locale; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -42,6 +46,7 @@ import static java.util.Collections.singletonMap; import static org.elasticsearch.snapshots.RestoreService.restoreInProgress; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.equalTo; // TODO: Fold this integration test into a more expansive integration test as more bootstrap from remote work // TODO: is completed. @@ -195,6 +200,60 @@ public void testThatSessionIsRegisteredWithPrimaryShard() throws IOException { assertEquals(0, restoreInfo.failedShards()); } + public void testFollowerMappingIsUpdated() throws IOException { + String leaderClusterRepoName = CcrRepository.NAME_PREFIX + "leader_cluster"; + String leaderIndex = "index1"; + String followerIndex = "index2"; + + final int numberOfPrimaryShards = randomIntBetween(1, 3); + final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards, between(0, 1), + singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); + assertAcked(leaderClient().admin().indices().prepareCreate(leaderIndex).setSource(leaderIndexSettings, XContentType.JSON)); + ensureLeaderGreen(leaderIndex); + + final RestoreService restoreService = getFollowerCluster().getCurrentMasterNodeInstance(RestoreService.class); + final ClusterService clusterService = getFollowerCluster().getCurrentMasterNodeInstance(ClusterService.class); + + Settings.Builder settingsBuilder = Settings.builder() + .put(IndexMetaData.SETTING_INDEX_PROVIDED_NAME, followerIndex) + .put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true); + RestoreService.RestoreRequest restoreRequest = new RestoreService.RestoreRequest(leaderClusterRepoName, + CcrRepository.LATEST, new String[]{leaderIndex}, indicesOptions, + "^(.*)$", followerIndex, Settings.EMPTY, new TimeValue(1, TimeUnit.HOURS), false, + false, true, settingsBuilder.build(), new String[0], + "restore_snapshot[" + leaderClusterRepoName + ":" + leaderIndex + "]"); + + // TODO: Eventually when the file recovery work is complete, we should test updated mappings by + // indexing to the leader while the recovery is happening. However, into order to that test mappings + // are updated prior to that work, we index documents in the clear session callback. This will + // ensure a mapping change prior to the final mapping check on the follower side. + for (CcrRestoreSourceService restoreSourceService : getLeaderCluster().getDataNodeInstances(CcrRestoreSourceService.class)) { + restoreSourceService.addCloseSessionListener(s -> { + final String source = String.format(Locale.ROOT, "{\"k\":%d}", 1); + leaderClient().prepareIndex("index1", "doc", Long.toString(1)).setSource(source, XContentType.JSON).get(); + }); + } + + PlainActionFuture future = PlainActionFuture.newFuture(); + restoreService.restoreSnapshot(restoreRequest, waitForRestore(clusterService, future)); + RestoreInfo restoreInfo = future.actionGet(); + + assertEquals(restoreInfo.totalShards(), restoreInfo.successfulShards()); + assertEquals(0, restoreInfo.failedShards()); + + ClusterStateRequest clusterStateRequest = new ClusterStateRequest(); + clusterStateRequest.clear(); + clusterStateRequest.metaData(true); + clusterStateRequest.indices(followerIndex); + ClusterStateResponse clusterState = followerClient().admin().cluster().state(clusterStateRequest).actionGet(); + IndexMetaData followerIndexMetadata = clusterState.getState().metaData().index(followerIndex); + assertEquals(2, followerIndexMetadata.getMappingVersion()); + + MappingMetaData mappingMetaData = followerClient().admin().indices().prepareGetMappings("index2").get().getMappings() + .get("index2").get("doc"); + assertThat(XContentMapValues.extractValue("properties.k.type", mappingMetaData.sourceAsMap()), equalTo("long")); + } + private ActionListener waitForRestore(ClusterService clusterService, ActionListener listener) { return new ActionListener() { From e865fb2cd397a6d838a29bcfc4455526a1603202 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Thu, 20 Dec 2018 10:16:08 -0700 Subject: [PATCH 3/4] Changes --- .../xpack/ccr/action/CcrRequests.java | 31 ++++++++++++++++ .../ccr/action/ShardFollowTasksExecutor.java | 15 ++------ .../xpack/ccr/repository/CcrRepository.java | 35 +++++-------------- 3 files changed, 43 insertions(+), 38 deletions(-) create mode 100644 x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/CcrRequests.java diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/CcrRequests.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/CcrRequests.java new file mode 100644 index 0000000000000..d8bf9794a01cd --- /dev/null +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/CcrRequests.java @@ -0,0 +1,31 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.action; + +import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; +import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; +import org.elasticsearch.cluster.metadata.MappingMetaData; +import org.elasticsearch.common.xcontent.XContentType; + +public final class CcrRequests { + + private CcrRequests() {} + + public static ClusterStateRequest clusterStateRequest(String leaderIndex) { + ClusterStateRequest clusterStateRequest = new ClusterStateRequest(); + clusterStateRequest.clear(); + clusterStateRequest.metaData(true); + clusterStateRequest.indices(leaderIndex); + return clusterStateRequest; + } + + public static PutMappingRequest putMappingRequest(String followerIndex, MappingMetaData mappingMetaData) { + PutMappingRequest putMappingRequest = new PutMappingRequest(followerIndex); + putMappingRequest.type(mappingMetaData.type()); + putMappingRequest.source(mappingMetaData.source().string(), XContentType.JSON); + return putMappingRequest; + } +} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java index bd22b85684ca4..6651675671256 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java @@ -31,7 +31,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; -import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.engine.CommitStats; @@ -123,10 +122,7 @@ protected void innerUpdateMapping(LongConsumer handler, Consumer erro Index leaderIndex = params.getLeaderShardId().getIndex(); Index followIndex = params.getFollowShardId().getIndex(); - ClusterStateRequest clusterStateRequest = new ClusterStateRequest(); - clusterStateRequest.clear(); - clusterStateRequest.metaData(true); - clusterStateRequest.indices(leaderIndex.getName()); + ClusterStateRequest clusterStateRequest = CcrRequests.clusterStateRequest(leaderIndex.getName()); remoteClient.admin().cluster().state(clusterStateRequest, ActionListener.wrap(clusterStateResponse -> { IndexMetaData indexMetaData = clusterStateResponse.getState().metaData().getIndexSafe(leaderIndex); @@ -140,9 +136,7 @@ protected void innerUpdateMapping(LongConsumer handler, Consumer erro indexMetaData.getMappings().size() + "]"; MappingMetaData mappingMetaData = indexMetaData.getMappings().iterator().next().value; - PutMappingRequest putMappingRequest = new PutMappingRequest(followIndex.getName()); - putMappingRequest.type(mappingMetaData.type()); - putMappingRequest.source(mappingMetaData.source().string(), XContentType.JSON); + PutMappingRequest putMappingRequest = CcrRequests.putMappingRequest(followIndex.getName(), mappingMetaData); followerClient.admin().indices().putMapping(putMappingRequest, ActionListener.wrap( putMappingResponse -> handler.accept(indexMetaData.getMappingVersion()), errorHandler)); @@ -154,10 +148,7 @@ protected void innerUpdateSettings(final LongConsumer finalHandler, final Consum final Index leaderIndex = params.getLeaderShardId().getIndex(); final Index followIndex = params.getFollowShardId().getIndex(); - final ClusterStateRequest clusterStateRequest = new ClusterStateRequest(); - clusterStateRequest.clear(); - clusterStateRequest.metaData(true); - clusterStateRequest.indices(leaderIndex.getName()); + ClusterStateRequest clusterStateRequest = CcrRequests.clusterStateRequest(leaderIndex.getName()); CheckedConsumer onResponse = clusterStateResponse -> { final IndexMetaData leaderIMD = clusterStateResponse.getState().metaData().getIndexSafe(leaderIndex); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java index b0d1410a20071..a046b40c3f71c 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -23,7 +23,6 @@ import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.engine.EngineException; @@ -42,6 +41,7 @@ import org.elasticsearch.snapshots.SnapshotState; import org.elasticsearch.xpack.ccr.Ccr; import org.elasticsearch.xpack.ccr.CcrLicenseChecker; +import org.elasticsearch.xpack.ccr.action.CcrRequests; import org.elasticsearch.xpack.ccr.action.repositories.ClearCcrRestoreSessionAction; import org.elasticsearch.xpack.ccr.action.repositories.ClearCcrRestoreSessionRequest; import org.elasticsearch.xpack.ccr.action.repositories.PutCcrRestoreSessionAction; @@ -116,15 +116,9 @@ public SnapshotInfo getSnapshotInfo(SnapshotId snapshotId) { public MetaData getSnapshotGlobalMetaData(SnapshotId snapshotId) { assert SNAPSHOT_ID.equals(snapshotId) : "RemoteClusterRepository only supports " + SNAPSHOT_ID + " as the SnapshotId"; Client remoteClient = client.getRemoteClusterClient(remoteClusterAlias); - ClusterStateResponse response = remoteClient - .admin() - .cluster() - .prepareState() - .clear() - .setMetaData(true) - .setIndices("dummy_index_name") // We set a single dummy index name to avoid fetching all the index data - .get(); - return response.getState().metaData(); + ClusterStateRequest clusterStateRequest = CcrRequests.clusterStateRequest("dummy_index_name"); + ClusterStateResponse clusterState = remoteClient.admin().cluster().state(clusterStateRequest).actionGet(); + return clusterState.getState().metaData(); } @Override @@ -133,18 +127,12 @@ public IndexMetaData getSnapshotIndexMetaData(SnapshotId snapshotId, IndexId ind String leaderIndex = index.getName(); Client remoteClient = client.getRemoteClusterClient(remoteClusterAlias); - ClusterStateResponse response = remoteClient - .admin() - .cluster() - .prepareState() - .clear() - .setMetaData(true) - .setIndices(leaderIndex) - .get(); + ClusterStateRequest clusterStateRequest = CcrRequests.clusterStateRequest(leaderIndex); + ClusterStateResponse clusterState = remoteClient.admin().cluster().state(clusterStateRequest).actionGet(); // Validates whether the leader cluster has been configured properly: PlainActionFuture future = PlainActionFuture.newFuture(); - IndexMetaData leaderIndexMetaData = response.getState().metaData().index(leaderIndex); + IndexMetaData leaderIndexMetaData = clusterState.getState().metaData().index(leaderIndex); ccrLicenseChecker.fetchLeaderHistoryUUIDs(remoteClient, leaderIndexMetaData, future::onFailure, future::onResponse); String[] leaderHistoryUUIDs = future.actionGet(); @@ -276,10 +264,7 @@ public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Ve } private void maybeUpdateMappings(Client localClient, Client remoteClient, Index leaderIndex, IndexSettings followerIndexSettings) { - ClusterStateRequest clusterStateRequest = new ClusterStateRequest(); - clusterStateRequest.clear(); - clusterStateRequest.metaData(true); - clusterStateRequest.indices(leaderIndex.getName()); + ClusterStateRequest clusterStateRequest = CcrRequests.clusterStateRequest(leaderIndex.getName()); ClusterStateResponse clusterState = remoteClient.admin().cluster().state(clusterStateRequest).actionGet(); IndexMetaData leaderIndexMetadata = clusterState.getState().metaData().getIndexSafe(leaderIndex); long leaderMappingVersion = leaderIndexMetadata.getMappingVersion(); @@ -287,9 +272,7 @@ private void maybeUpdateMappings(Client localClient, Client remoteClient, Index if (leaderMappingVersion > followerIndexSettings.getIndexMetaData().getMappingVersion()) { Index followerIndex = followerIndexSettings.getIndex(); MappingMetaData mappingMetaData = leaderIndexMetadata.mapping(); - PutMappingRequest putMappingRequest = new PutMappingRequest(followerIndex.getName()); - putMappingRequest.type(mappingMetaData.type()); - putMappingRequest.source(mappingMetaData.source().string(), XContentType.JSON); + PutMappingRequest putMappingRequest = CcrRequests.putMappingRequest(followerIndex.getName(), mappingMetaData); localClient.admin().indices().putMapping(putMappingRequest).actionGet(); } } From 7f23deb925a37934d9afff47dff090666d7ee1c7 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Thu, 20 Dec 2018 11:24:54 -0700 Subject: [PATCH 4/4] Changes --- .../org/elasticsearch/xpack/ccr/action/CcrRequests.java | 2 +- .../xpack/ccr/action/ShardFollowTasksExecutor.java | 4 ++-- .../elasticsearch/xpack/ccr/repository/CcrRepository.java | 7 ++++--- 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/CcrRequests.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/CcrRequests.java index d8bf9794a01cd..12432c740a701 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/CcrRequests.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/CcrRequests.java @@ -14,7 +14,7 @@ public final class CcrRequests { private CcrRequests() {} - public static ClusterStateRequest clusterStateRequest(String leaderIndex) { + public static ClusterStateRequest metaDataRequest(String leaderIndex) { ClusterStateRequest clusterStateRequest = new ClusterStateRequest(); clusterStateRequest.clear(); clusterStateRequest.metaData(true); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java index 6651675671256..0fed083bba9ac 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java @@ -122,7 +122,7 @@ protected void innerUpdateMapping(LongConsumer handler, Consumer erro Index leaderIndex = params.getLeaderShardId().getIndex(); Index followIndex = params.getFollowShardId().getIndex(); - ClusterStateRequest clusterStateRequest = CcrRequests.clusterStateRequest(leaderIndex.getName()); + ClusterStateRequest clusterStateRequest = CcrRequests.metaDataRequest(leaderIndex.getName()); remoteClient.admin().cluster().state(clusterStateRequest, ActionListener.wrap(clusterStateResponse -> { IndexMetaData indexMetaData = clusterStateResponse.getState().metaData().getIndexSafe(leaderIndex); @@ -148,7 +148,7 @@ protected void innerUpdateSettings(final LongConsumer finalHandler, final Consum final Index leaderIndex = params.getLeaderShardId().getIndex(); final Index followIndex = params.getFollowShardId().getIndex(); - ClusterStateRequest clusterStateRequest = CcrRequests.clusterStateRequest(leaderIndex.getName()); + ClusterStateRequest clusterStateRequest = CcrRequests.metaDataRequest(leaderIndex.getName()); CheckedConsumer onResponse = clusterStateResponse -> { final IndexMetaData leaderIMD = clusterStateResponse.getState().metaData().getIndexSafe(leaderIndex); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java index a046b40c3f71c..e648264a4ad55 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -116,7 +116,8 @@ public SnapshotInfo getSnapshotInfo(SnapshotId snapshotId) { public MetaData getSnapshotGlobalMetaData(SnapshotId snapshotId) { assert SNAPSHOT_ID.equals(snapshotId) : "RemoteClusterRepository only supports " + SNAPSHOT_ID + " as the SnapshotId"; Client remoteClient = client.getRemoteClusterClient(remoteClusterAlias); - ClusterStateRequest clusterStateRequest = CcrRequests.clusterStateRequest("dummy_index_name"); + // We set a single dummy index name to avoid fetching all the index data + ClusterStateRequest clusterStateRequest = CcrRequests.metaDataRequest("dummy_index_name"); ClusterStateResponse clusterState = remoteClient.admin().cluster().state(clusterStateRequest).actionGet(); return clusterState.getState().metaData(); } @@ -127,7 +128,7 @@ public IndexMetaData getSnapshotIndexMetaData(SnapshotId snapshotId, IndexId ind String leaderIndex = index.getName(); Client remoteClient = client.getRemoteClusterClient(remoteClusterAlias); - ClusterStateRequest clusterStateRequest = CcrRequests.clusterStateRequest(leaderIndex); + ClusterStateRequest clusterStateRequest = CcrRequests.metaDataRequest(leaderIndex); ClusterStateResponse clusterState = remoteClient.admin().cluster().state(clusterStateRequest).actionGet(); // Validates whether the leader cluster has been configured properly: @@ -264,7 +265,7 @@ public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Ve } private void maybeUpdateMappings(Client localClient, Client remoteClient, Index leaderIndex, IndexSettings followerIndexSettings) { - ClusterStateRequest clusterStateRequest = CcrRequests.clusterStateRequest(leaderIndex.getName()); + ClusterStateRequest clusterStateRequest = CcrRequests.metaDataRequest(leaderIndex.getName()); ClusterStateResponse clusterState = remoteClient.admin().cluster().state(clusterStateRequest).actionGet(); IndexMetaData leaderIndexMetadata = clusterState.getState().metaData().getIndexSafe(leaderIndex); long leaderMappingVersion = leaderIndexMetadata.getMappingVersion();