Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update index mappings when ccr restore complete #36879

Merged
merged 6 commits into from
Dec 20, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ccr.action;

import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.common.xcontent.XContentType;

public final class CcrRequests {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍


private CcrRequests() {}

public static ClusterStateRequest clusterStateRequest(String leaderIndex) {
Tim-Brooks marked this conversation as resolved.
Show resolved Hide resolved
ClusterStateRequest clusterStateRequest = new ClusterStateRequest();
clusterStateRequest.clear();
clusterStateRequest.metaData(true);
clusterStateRequest.indices(leaderIndex);
return clusterStateRequest;
}

public static PutMappingRequest putMappingRequest(String followerIndex, MappingMetaData mappingMetaData) {
PutMappingRequest putMappingRequest = new PutMappingRequest(followerIndex);
putMappingRequest.type(mappingMetaData.type());
putMappingRequest.source(mappingMetaData.source().string(), XContentType.JSON);
return putMappingRequest;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.engine.CommitStats;
Expand Down Expand Up @@ -123,10 +122,7 @@ protected void innerUpdateMapping(LongConsumer handler, Consumer<Exception> erro
Index leaderIndex = params.getLeaderShardId().getIndex();
Index followIndex = params.getFollowShardId().getIndex();

ClusterStateRequest clusterStateRequest = new ClusterStateRequest();
clusterStateRequest.clear();
clusterStateRequest.metaData(true);
clusterStateRequest.indices(leaderIndex.getName());
ClusterStateRequest clusterStateRequest = CcrRequests.clusterStateRequest(leaderIndex.getName());

remoteClient.admin().cluster().state(clusterStateRequest, ActionListener.wrap(clusterStateResponse -> {
IndexMetaData indexMetaData = clusterStateResponse.getState().metaData().getIndexSafe(leaderIndex);
Expand All @@ -140,9 +136,7 @@ protected void innerUpdateMapping(LongConsumer handler, Consumer<Exception> erro
indexMetaData.getMappings().size() + "]";
MappingMetaData mappingMetaData = indexMetaData.getMappings().iterator().next().value;

PutMappingRequest putMappingRequest = new PutMappingRequest(followIndex.getName());
putMappingRequest.type(mappingMetaData.type());
putMappingRequest.source(mappingMetaData.source().string(), XContentType.JSON);
PutMappingRequest putMappingRequest = CcrRequests.putMappingRequest(followIndex.getName(), mappingMetaData);
followerClient.admin().indices().putMapping(putMappingRequest, ActionListener.wrap(
putMappingResponse -> handler.accept(indexMetaData.getMappingVersion()),
errorHandler));
Expand All @@ -154,10 +148,7 @@ protected void innerUpdateSettings(final LongConsumer finalHandler, final Consum
final Index leaderIndex = params.getLeaderShardId().getIndex();
final Index followIndex = params.getFollowShardId().getIndex();

final ClusterStateRequest clusterStateRequest = new ClusterStateRequest();
clusterStateRequest.clear();
clusterStateRequest.metaData(true);
clusterStateRequest.indices(leaderIndex.getName());
ClusterStateRequest clusterStateRequest = CcrRequests.clusterStateRequest(leaderIndex.getName());

CheckedConsumer<ClusterStateResponse, Exception> onResponse = clusterStateResponse -> {
final IndexMetaData leaderIMD = clusterStateResponse.getState().metaData().getIndexSafe(leaderIndex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.EngineException;
Expand All @@ -42,6 +41,7 @@
import org.elasticsearch.snapshots.SnapshotState;
import org.elasticsearch.xpack.ccr.Ccr;
import org.elasticsearch.xpack.ccr.CcrLicenseChecker;
import org.elasticsearch.xpack.ccr.action.CcrRequests;
import org.elasticsearch.xpack.ccr.action.repositories.ClearCcrRestoreSessionAction;
import org.elasticsearch.xpack.ccr.action.repositories.ClearCcrRestoreSessionRequest;
import org.elasticsearch.xpack.ccr.action.repositories.PutCcrRestoreSessionAction;
Expand Down Expand Up @@ -116,15 +116,9 @@ public SnapshotInfo getSnapshotInfo(SnapshotId snapshotId) {
public MetaData getSnapshotGlobalMetaData(SnapshotId snapshotId) {
assert SNAPSHOT_ID.equals(snapshotId) : "RemoteClusterRepository only supports " + SNAPSHOT_ID + " as the SnapshotId";
Client remoteClient = client.getRemoteClusterClient(remoteClusterAlias);
ClusterStateResponse response = remoteClient
.admin()
.cluster()
.prepareState()
.clear()
.setMetaData(true)
.setIndices("dummy_index_name") // We set a single dummy index name to avoid fetching all the index data
.get();
return response.getState().metaData();
ClusterStateRequest clusterStateRequest = CcrRequests.clusterStateRequest("dummy_index_name");
Tim-Brooks marked this conversation as resolved.
Show resolved Hide resolved
ClusterStateResponse clusterState = remoteClient.admin().cluster().state(clusterStateRequest).actionGet();
return clusterState.getState().metaData();
}

@Override
Expand All @@ -133,18 +127,12 @@ public IndexMetaData getSnapshotIndexMetaData(SnapshotId snapshotId, IndexId ind
String leaderIndex = index.getName();
Client remoteClient = client.getRemoteClusterClient(remoteClusterAlias);

ClusterStateResponse response = remoteClient
.admin()
.cluster()
.prepareState()
.clear()
.setMetaData(true)
.setIndices(leaderIndex)
.get();
ClusterStateRequest clusterStateRequest = CcrRequests.clusterStateRequest(leaderIndex);
ClusterStateResponse clusterState = remoteClient.admin().cluster().state(clusterStateRequest).actionGet();

// Validates whether the leader cluster has been configured properly:
PlainActionFuture<String[]> future = PlainActionFuture.newFuture();
IndexMetaData leaderIndexMetaData = response.getState().metaData().index(leaderIndex);
IndexMetaData leaderIndexMetaData = clusterState.getState().metaData().index(leaderIndex);
ccrLicenseChecker.fetchLeaderHistoryUUIDs(remoteClient, leaderIndexMetaData, future::onFailure, future::onResponse);
String[] leaderHistoryUUIDs = future.actionGet();

Expand Down Expand Up @@ -276,20 +264,15 @@ public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Ve
}

private void maybeUpdateMappings(Client localClient, Client remoteClient, Index leaderIndex, IndexSettings followerIndexSettings) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@martijnvg would it make sense to share this code with the one in ShardFollowTasksExecutor?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Conceptually this code is doing the same as the update mapping code in shard follow task, but the update mapping code is tightly coupled with ShardFollowTasksExecutor and ShardFollowNodeTask. Also the mapping update code is asynchronous over there and here it is synchronous.

What I think is possible, is that factory methods for both ClusterStateRequest and PutMappingRequest are introduced that is then used here and in ShardFollowTasksExecutor.

ClusterStateRequest clusterStateRequest = new ClusterStateRequest();
clusterStateRequest.clear();
clusterStateRequest.metaData(true);
clusterStateRequest.indices(leaderIndex.getName());
ClusterStateRequest clusterStateRequest = CcrRequests.clusterStateRequest(leaderIndex.getName());
ClusterStateResponse clusterState = remoteClient.admin().cluster().state(clusterStateRequest).actionGet();
IndexMetaData leaderIndexMetadata = clusterState.getState().metaData().getIndexSafe(leaderIndex);
long leaderMappingVersion = leaderIndexMetadata.getMappingVersion();

if (leaderMappingVersion > followerIndexSettings.getIndexMetaData().getMappingVersion()) {
Index followerIndex = followerIndexSettings.getIndex();
MappingMetaData mappingMetaData = leaderIndexMetadata.mapping();
PutMappingRequest putMappingRequest = new PutMappingRequest(followerIndex.getName());
putMappingRequest.type(mappingMetaData.type());
putMappingRequest.source(mappingMetaData.source().string(), XContentType.JSON);
PutMappingRequest putMappingRequest = CcrRequests.putMappingRequest(followerIndex.getName(), mappingMetaData);
localClient.admin().indices().putMapping(putMappingRequest).actionGet();
}
}
Expand Down