From 67da9762ed440c227eec16b631160f6da375b00d Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Tue, 18 Dec 2018 11:23:13 -0700 Subject: [PATCH] Add CcrRestoreSourceService to track sessions (#36578) This commit is related to #36127. It adds a CcrRestoreSourceService to track Engine.IndexCommitRef need for in-process file restores. When a follower starts restoring a shard through the CcrRepository it opens a session with the leader through the PutCcrRestoreSessionAction. The leader responds to the request by telling the follower what files it needs to fetch for a restore. This is not yet implemented. Once, the restore is complete, the follower closes the session with the DeleteCcrRestoreSessionAction action. --- .../common/util/iterable/Iterables.java | 2 +- .../java/org/elasticsearch/xpack/ccr/Ccr.java | 20 +- .../ClearCcrRestoreSessionAction.java | 121 ++++++++++++ .../ClearCcrRestoreSessionRequest.java | 73 ++++++++ .../PutCcrRestoreSessionAction.java | 126 +++++++++++++ .../PutCcrRestoreSessionRequest.java | 66 +++++++ .../xpack/ccr/repository/CcrRepository.java | 41 +++- .../repository/CcrRestoreSourceService.java | 175 ++++++++++++++++++ .../xpack/ccr/CcrRepositoryIT.java | 44 +++++ .../CcrRestoreSourceServiceTests.java | 125 +++++++++++++ 10 files changed, 788 insertions(+), 5 deletions(-) create mode 100644 x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionAction.java create mode 100644 x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionRequest.java create mode 100644 x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionAction.java create mode 100644 x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionRequest.java create mode 100644 x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceService.java create mode 100644 x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceServiceTests.java diff --git a/server/src/main/java/org/elasticsearch/common/util/iterable/Iterables.java b/server/src/main/java/org/elasticsearch/common/util/iterable/Iterables.java index 2852e33eb4307..3783de95585cd 100644 --- a/server/src/main/java/org/elasticsearch/common/util/iterable/Iterables.java +++ b/server/src/main/java/org/elasticsearch/common/util/iterable/Iterables.java @@ -34,7 +34,7 @@ public Iterables() { public static Iterable concat(Iterable... inputs) { Objects.requireNonNull(inputs); - return new ConcatenatedIterable(inputs); + return new ConcatenatedIterable<>(inputs); } static class ConcatenatedIterable implements Iterable { diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java index 753b6b0e978ea..9bec8dc206376 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java @@ -25,6 +25,7 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.index.IndexModule; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.engine.EngineFactory; import org.elasticsearch.license.XPackLicenseState; @@ -59,10 +60,13 @@ import org.elasticsearch.xpack.ccr.action.TransportUnfollowAction; import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsAction; import org.elasticsearch.xpack.ccr.action.bulk.TransportBulkShardOperationsAction; +import org.elasticsearch.xpack.ccr.action.repositories.ClearCcrRestoreSessionAction; import org.elasticsearch.xpack.ccr.action.repositories.DeleteInternalCcrRepositoryAction; +import org.elasticsearch.xpack.ccr.action.repositories.PutCcrRestoreSessionAction; import org.elasticsearch.xpack.ccr.action.repositories.PutInternalCcrRepositoryAction; import org.elasticsearch.xpack.ccr.index.engine.FollowingEngineFactory; import org.elasticsearch.xpack.ccr.repository.CcrRepository; +import org.elasticsearch.xpack.ccr.repository.CcrRestoreSourceService; import org.elasticsearch.xpack.ccr.rest.RestCcrStatsAction; import org.elasticsearch.xpack.ccr.rest.RestDeleteAutoFollowPatternAction; import org.elasticsearch.xpack.ccr.rest.RestFollowStatsAction; @@ -113,7 +117,9 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E private final boolean enabled; private final Settings settings; private final CcrLicenseChecker ccrLicenseChecker; + private final SetOnce repositoryManager = new SetOnce<>(); + private final SetOnce restoreSourceService = new SetOnce<>(); private Client client; private final boolean tribeNode; @@ -160,9 +166,12 @@ public Collection createComponents( } this.repositoryManager.set(new CcrRepositoryManager(settings, clusterService, (NodeClient) client)); - + CcrRestoreSourceService restoreSourceService = new CcrRestoreSourceService(settings); + this.restoreSourceService.set(restoreSourceService); return Arrays.asList( ccrLicenseChecker, + restoreSourceService, + repositoryManager.get(), new AutoFollowCoordinator(client, clusterService, ccrLicenseChecker, threadPool::relativeTimeInMillis) ); } @@ -189,6 +198,10 @@ public List> getPersistentTasksExecutor(ClusterServic PutInternalCcrRepositoryAction.TransportPutInternalRepositoryAction.class), new ActionHandler<>(DeleteInternalCcrRepositoryAction.INSTANCE, DeleteInternalCcrRepositoryAction.TransportDeleteInternalRepositoryAction.class), + new ActionHandler<>(PutCcrRestoreSessionAction.INSTANCE, + PutCcrRestoreSessionAction.TransportPutCcrRestoreSessionAction.class), + new ActionHandler<>(ClearCcrRestoreSessionAction.INSTANCE, + ClearCcrRestoreSessionAction.TransportDeleteCcrRestoreSessionAction.class), // stats action new ActionHandler<>(FollowStatsAction.INSTANCE, TransportFollowStatsAction.class), new ActionHandler<>(CcrStatsAction.INSTANCE, TransportCcrStatsAction.class), @@ -288,6 +301,11 @@ public Map getInternalRepositories(Environment env, return Collections.singletonMap(CcrRepository.TYPE, repositoryFactory); } + @Override + public void onIndexModule(IndexModule indexModule) { + indexModule.addIndexEventListener(this.restoreSourceService.get()); + } + protected XPackLicenseState getLicenseState() { return XPackPlugin.getSharedLicenseState(); } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionAction.java new file mode 100644 index 0000000000000..33b8b415d8362 --- /dev/null +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionAction.java @@ -0,0 +1,121 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ccr.action.repositories; + +import org.elasticsearch.action.Action; +import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.nodes.BaseNodeResponse; +import org.elasticsearch.action.support.nodes.BaseNodesResponse; +import org.elasticsearch.action.support.nodes.TransportNodesAction; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.ccr.repository.CcrRestoreSourceService; + +import java.io.IOException; +import java.util.List; + +public class ClearCcrRestoreSessionAction extends Action { + + public static final ClearCcrRestoreSessionAction INSTANCE = new ClearCcrRestoreSessionAction(); + private static final String NAME = "internal:admin/ccr/restore/session/clear"; + + private ClearCcrRestoreSessionAction() { + super(NAME); + } + + @Override + public ClearCcrRestoreSessionResponse newResponse() { + return new ClearCcrRestoreSessionResponse(); + } + + public static class TransportDeleteCcrRestoreSessionAction extends TransportNodesAction { + + private final CcrRestoreSourceService ccrRestoreService; + + @Inject + public TransportDeleteCcrRestoreSessionAction(ThreadPool threadPool, ClusterService clusterService, ActionFilters actionFilters, + TransportService transportService, CcrRestoreSourceService ccrRestoreService) { + super(NAME, threadPool, clusterService, transportService, actionFilters, ClearCcrRestoreSessionRequest::new, + ClearCcrRestoreSessionRequest.Request::new, ThreadPool.Names.GENERIC, Response.class); + this.ccrRestoreService = ccrRestoreService; + } + + @Override + protected ClearCcrRestoreSessionResponse newResponse(ClearCcrRestoreSessionRequest request, List responses, + List failures) { + return new ClearCcrRestoreSessionResponse(clusterService.getClusterName(), responses, failures); + } + + @Override + protected ClearCcrRestoreSessionRequest.Request newNodeRequest(String nodeId, ClearCcrRestoreSessionRequest request) { + return request.getRequest(); + } + + @Override + protected Response newNodeResponse() { + return new Response(); + } + + @Override + protected Response nodeOperation(ClearCcrRestoreSessionRequest.Request request) { + ccrRestoreService.closeSession(request.getSessionUUID()); + return new Response(clusterService.localNode()); + } + } + + public static class Response extends BaseNodeResponse { + + private Response() { + } + + private Response(StreamInput in) throws IOException { + readFrom(in); + } + + private Response(DiscoveryNode node) { + super(node); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + } + } + + public static class ClearCcrRestoreSessionResponse extends BaseNodesResponse { + + ClearCcrRestoreSessionResponse() { + } + + ClearCcrRestoreSessionResponse(ClusterName clusterName, List chunkResponses, List failures) { + super(clusterName, chunkResponses, failures); + } + + @Override + protected List readNodesFrom(StreamInput in) throws IOException { + return in.readList(Response::new); + } + + @Override + protected void writeNodesTo(StreamOutput out, List nodes) throws IOException { + out.writeList(nodes); + } + } +} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionRequest.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionRequest.java new file mode 100644 index 0000000000000..11605970736b0 --- /dev/null +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionRequest.java @@ -0,0 +1,73 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ccr.action.repositories; + +import org.elasticsearch.action.support.nodes.BaseNodeRequest; +import org.elasticsearch.action.support.nodes.BaseNodesRequest; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; + +public class ClearCcrRestoreSessionRequest extends BaseNodesRequest { + + private Request request; + + ClearCcrRestoreSessionRequest() { + } + + public ClearCcrRestoreSessionRequest(String nodeId, Request request) { + super(nodeId); + this.request = request; + } + + @Override + public void readFrom(StreamInput streamInput) throws IOException { + super.readFrom(streamInput); + request = new Request(); + request.readFrom(streamInput); + } + + @Override + public void writeTo(StreamOutput streamOutput) throws IOException { + super.writeTo(streamOutput); + request.writeTo(streamOutput); + } + + public Request getRequest() { + return request; + } + + public static class Request extends BaseNodeRequest { + + private String sessionUUID; + + Request() { + } + + public Request(String nodeId, String sessionUUID) { + super(nodeId); + this.sessionUUID = sessionUUID; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + sessionUUID = in.readString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(sessionUUID); + } + + public String getSessionUUID() { + return sessionUUID; + } + } +} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionAction.java new file mode 100644 index 0000000000000..7f362aa3b766c --- /dev/null +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionAction.java @@ -0,0 +1,126 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ccr.action.repositories; + +import org.elasticsearch.action.Action; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.single.shard.TransportSingleShardAction; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.routing.ShardsIterator; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.shard.ShardNotFoundException; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.ccr.repository.CcrRestoreSourceService; + +import java.io.IOException; + +public class PutCcrRestoreSessionAction extends Action { + + public static final PutCcrRestoreSessionAction INSTANCE = new PutCcrRestoreSessionAction(); + private static final String NAME = "internal:admin/ccr/restore/session/put"; + + private PutCcrRestoreSessionAction() { + super(NAME); + } + + @Override + public PutCcrRestoreSessionResponse newResponse() { + return new PutCcrRestoreSessionResponse(); + } + + @Override + public Writeable.Reader getResponseReader() { + return PutCcrRestoreSessionAction.PutCcrRestoreSessionResponse::new; + } + + public static class TransportPutCcrRestoreSessionAction + extends TransportSingleShardAction { + + private final IndicesService indicesService; + private final CcrRestoreSourceService ccrRestoreService; + + @Inject + public TransportPutCcrRestoreSessionAction(ThreadPool threadPool, ClusterService clusterService, ActionFilters actionFilters, + IndexNameExpressionResolver resolver, TransportService transportService, + IndicesService indicesService, CcrRestoreSourceService ccrRestoreService) { + super(NAME, threadPool, clusterService, transportService, actionFilters, resolver, PutCcrRestoreSessionRequest::new, + ThreadPool.Names.GENERIC); + this.indicesService = indicesService; + this.ccrRestoreService = ccrRestoreService; + } + + @Override + protected PutCcrRestoreSessionResponse shardOperation(PutCcrRestoreSessionRequest request, ShardId shardId) throws IOException { + IndexShard indexShard = indicesService.getShardOrNull(shardId); + if (indexShard == null) { + throw new ShardNotFoundException(shardId); + } + ccrRestoreService.openSession(request.getSessionUUID(), indexShard); + return new PutCcrRestoreSessionResponse(indexShard.routingEntry().currentNodeId()); + } + + @Override + protected PutCcrRestoreSessionResponse newResponse() { + return new PutCcrRestoreSessionResponse(); + } + + @Override + protected boolean resolveIndex(PutCcrRestoreSessionRequest request) { + return false; + } + + @Override + protected ShardsIterator shards(ClusterState state, InternalRequest request) { + final ShardId shardId = request.request().getShardId(); + return state.routingTable().shardRoutingTable(shardId).primaryShardIt(); + } + } + + + public static class PutCcrRestoreSessionResponse extends ActionResponse { + + private String nodeId; + + PutCcrRestoreSessionResponse() { + } + + PutCcrRestoreSessionResponse(String nodeId) { + this.nodeId = nodeId; + } + + PutCcrRestoreSessionResponse(StreamInput in) throws IOException { + super(in); + nodeId = in.readString(); + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + nodeId = in.readString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(nodeId); + } + + public String getNodeId() { + return nodeId; + } + } +} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionRequest.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionRequest.java new file mode 100644 index 0000000000000..2b94193f674af --- /dev/null +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionRequest.java @@ -0,0 +1,66 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ccr.action.repositories; + +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.support.single.shard.SingleShardRequest; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.store.Store; + +import java.io.IOException; + +public class PutCcrRestoreSessionRequest extends SingleShardRequest { + + private String sessionUUID; + private ShardId shardId; + private Store.MetadataSnapshot metaData; + + PutCcrRestoreSessionRequest() { + } + + public PutCcrRestoreSessionRequest(String sessionUUID, ShardId shardId, Store.MetadataSnapshot metaData) { + super(shardId.getIndexName()); + this.sessionUUID = sessionUUID; + this.shardId = shardId; + this.metaData = metaData; + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + sessionUUID = in.readString(); + shardId = ShardId.readShardId(in); + metaData = new Store.MetadataSnapshot(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(sessionUUID); + shardId.writeTo(out); + metaData.writeTo(out); + } + + public String getSessionUUID() { + return sessionUUID; + } + + public ShardId getShardId() { + return shardId; + } + + public Store.MetadataSnapshot getMetaData() { + return metaData; + } +} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java index d6eb100b277df..17450bc05ad2b 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -16,6 +16,7 @@ import org.elasticsearch.cluster.metadata.RepositoryMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.settings.Settings; @@ -36,6 +37,10 @@ import org.elasticsearch.snapshots.SnapshotState; import org.elasticsearch.xpack.ccr.Ccr; import org.elasticsearch.xpack.ccr.CcrLicenseChecker; +import org.elasticsearch.xpack.ccr.action.repositories.ClearCcrRestoreSessionAction; +import org.elasticsearch.xpack.ccr.action.repositories.ClearCcrRestoreSessionRequest; +import org.elasticsearch.xpack.ccr.action.repositories.PutCcrRestoreSessionAction; +import org.elasticsearch.xpack.ccr.action.repositories.PutCcrRestoreSessionRequest; import java.io.IOException; import java.util.ArrayList; @@ -81,7 +86,7 @@ protected void doStop() { } @Override - protected void doClose() throws IOException { + protected void doClose() { } @@ -227,19 +232,49 @@ public void snapshotShard(IndexShard shard, Store store, SnapshotId snapshotId, @Override public void restoreShard(IndexShard indexShard, SnapshotId snapshotId, Version version, IndexId indexId, ShardId shardId, RecoveryState recoveryState) { + // TODO: Add timeouts to network calls / the restore process. final Store store = indexShard.store(); store.incRef(); try { store.createEmpty(); } catch (EngineException | IOException e) { - throw new IndexShardRecoveryException(shardId, "failed to recover from gateway", e); + throw new IndexShardRecoveryException(shardId, "failed to create empty store", e); } finally { store.decRef(); } + + Store.MetadataSnapshot recoveryMetadata; + try { + recoveryMetadata = indexShard.snapshotStoreMetadata(); + } catch (IOException e) { + throw new IndexShardRecoveryException(shardId, "failed access store metadata", e); + } + + Map ccrMetaData = indexShard.indexSettings().getIndexMetaData().getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY); + String leaderUUID = ccrMetaData.get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_UUID_KEY); + ShardId leaderShardId = new ShardId(shardId.getIndexName(), leaderUUID, shardId.getId()); + + Client remoteClient = client.getRemoteClusterClient(remoteClusterAlias); + String sessionUUID = UUIDs.randomBase64UUID(); + PutCcrRestoreSessionAction.PutCcrRestoreSessionResponse response = remoteClient.execute(PutCcrRestoreSessionAction.INSTANCE, + new PutCcrRestoreSessionRequest(sessionUUID, leaderShardId, recoveryMetadata)).actionGet(); + String nodeId = response.getNodeId(); + // TODO: Implement file restore + closeSession(remoteClient, nodeId, sessionUUID); } @Override - public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Version version, IndexId indexId, ShardId shardId) { + public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Version version, IndexId indexId, ShardId leaderShardId) { throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); } + + private void closeSession(Client remoteClient, String nodeId, String sessionUUID) { + ClearCcrRestoreSessionRequest clearRequest = new ClearCcrRestoreSessionRequest(nodeId, + new ClearCcrRestoreSessionRequest.Request(nodeId, sessionUUID)); + ClearCcrRestoreSessionAction.ClearCcrRestoreSessionResponse response = + remoteClient.execute(ClearCcrRestoreSessionAction.INSTANCE, clearRequest).actionGet(); + if (response.hasFailures()) { + throw response.failures().get(0); + } + } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceService.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceService.java new file mode 100644 index 0000000000000..642036168ad7b --- /dev/null +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceService.java @@ -0,0 +1,175 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ccr.repository; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.component.AbstractLifecycleComponent; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.core.internal.io.IOUtils; +import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.shard.IndexEventListener; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.IndexShardClosedException; +import org.elasticsearch.index.shard.IndexShardState; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.store.Store; + +import java.io.Closeable; +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.function.Consumer; + +public class CcrRestoreSourceService extends AbstractLifecycleComponent implements IndexEventListener { + + private static final Logger logger = LogManager.getLogger(CcrRestoreSourceService.class); + + private final Map onGoingRestores = ConcurrentCollections.newConcurrentMap(); + private final Map> sessionsForShard = new HashMap<>(); + private final CopyOnWriteArrayList> openSessionListeners = new CopyOnWriteArrayList<>(); + private final CopyOnWriteArrayList> closeSessionListeners = new CopyOnWriteArrayList<>(); + + public CcrRestoreSourceService(Settings settings) { + super(settings); + } + + @Override + public synchronized void afterIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, Settings indexSettings) { + if (indexShard != null) { + HashSet sessions = sessionsForShard.remove(indexShard); + if (sessions != null) { + for (String sessionUUID : sessions) { + RestoreContext restore = onGoingRestores.remove(sessionUUID); + IOUtils.closeWhileHandlingException(restore); + } + } + } + } + + @Override + protected void doStart() { + + } + + @Override + protected void doStop() { + + } + + @Override + protected synchronized void doClose() throws IOException { + sessionsForShard.clear(); + IOUtils.closeWhileHandlingException(onGoingRestores.values()); + onGoingRestores.clear(); + } + + // TODO: The listeners are for testing. Once end-to-end file restore is implemented and can be tested, + // these should be removed. + public void addOpenSessionListener(Consumer listener) { + openSessionListeners.add(listener); + } + + public void addCloseSessionListener(Consumer listener) { + closeSessionListeners.add(listener); + } + + // default visibility for testing + synchronized HashSet getSessionsForShard(IndexShard indexShard) { + return sessionsForShard.get(indexShard); + } + + // default visibility for testing + synchronized RestoreContext getOngoingRestore(String sessionUUID) { + return onGoingRestores.get(sessionUUID); + } + + // TODO: Add a local timeout for the session. This timeout might might be for the entire session to be + // complete. Or it could be for session to have been touched. + public synchronized Store.MetadataSnapshot openSession(String sessionUUID, IndexShard indexShard) throws IOException { + boolean success = false; + RestoreContext restore = null; + try { + if (onGoingRestores.containsKey(sessionUUID)) { + logger.debug("not opening new session [{}] as it already exists", sessionUUID); + restore = onGoingRestores.get(sessionUUID); + } else { + logger.debug("opening session [{}] for shard [{}]", sessionUUID, indexShard.shardId()); + if (indexShard.state() == IndexShardState.CLOSED) { + throw new IndexShardClosedException(indexShard.shardId(), "cannot open ccr restore session if shard closed"); + } + restore = new RestoreContext(sessionUUID, indexShard, indexShard.acquireSafeIndexCommit()); + onGoingRestores.put(sessionUUID, restore); + openSessionListeners.forEach(c -> c.accept(sessionUUID)); + HashSet sessions = sessionsForShard.computeIfAbsent(indexShard, (s) -> new HashSet<>()); + sessions.add(sessionUUID); + } + Store.MetadataSnapshot metaData = restore.getMetaData(); + success = true; + return metaData; + } finally { + if (success == false) { + onGoingRestores.remove(sessionUUID); + IOUtils.closeWhileHandlingException(restore); + } + } + } + + public synchronized void closeSession(String sessionUUID) { + closeSessionListeners.forEach(c -> c.accept(sessionUUID)); + RestoreContext restore = onGoingRestores.remove(sessionUUID); + if (restore == null) { + logger.info("could not close session [{}] because session not found", sessionUUID); + throw new IllegalArgumentException("session [" + sessionUUID + "] not found"); + } + IOUtils.closeWhileHandlingException(restore); + } + + private class RestoreContext implements Closeable { + + private final String sessionUUID; + private final IndexShard indexShard; + private final Engine.IndexCommitRef commitRef; + + private RestoreContext(String sessionUUID, IndexShard indexShard, Engine.IndexCommitRef commitRef) { + this.sessionUUID = sessionUUID; + this.indexShard = indexShard; + this.commitRef = commitRef; + } + + Store.MetadataSnapshot getMetaData() throws IOException { + indexShard.store().incRef(); + try { + return indexShard.store().getMetadata(commitRef.getIndexCommit()); + } finally { + indexShard.store().decRef(); + } + } + + @Override + public void close() { + assert Thread.holdsLock(CcrRestoreSourceService.this); + removeSessionForShard(sessionUUID, indexShard); + IOUtils.closeWhileHandlingException(commitRef); + } + + private void removeSessionForShard(String sessionUUID, IndexShard indexShard) { + logger.debug("closing session [{}] for shard [{}]", sessionUUID, indexShard.shardId()); + HashSet sessions = sessionsForShard.get(indexShard); + if (sessions != null) { + sessions.remove(sessionUUID); + if (sessions.isEmpty()) { + sessionsForShard.remove(indexShard); + } + } + } + } +} diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java index 0057df49b7c36..f711dd4303f2a 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java @@ -19,6 +19,7 @@ import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.shard.ShardId; @@ -31,9 +32,11 @@ import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.CcrIntegTestCase; import org.elasticsearch.xpack.ccr.repository.CcrRepository; +import org.elasticsearch.xpack.ccr.repository.CcrRestoreSourceService; import java.io.IOException; import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeUnit; import static java.util.Collections.singletonMap; @@ -151,6 +154,47 @@ public void testThatRepositoryRecoversEmptyIndexBasedOnLeaderSettings() throws I assertNotEquals(leaderMetadata.getIndexUUID(), followerMetadata.getIndexUUID()); } + public void testThatSessionIsRegisteredWithPrimaryShard() throws IOException { + String leaderClusterRepoName = CcrRepository.NAME_PREFIX + "leader_cluster"; + String leaderIndex = "index1"; + String followerIndex = "index2"; + + final int numberOfPrimaryShards = randomIntBetween(1, 3); + final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards, between(0, 1), + singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); + assertAcked(leaderClient().admin().indices().prepareCreate(leaderIndex).setSource(leaderIndexSettings, XContentType.JSON)); + ensureLeaderGreen(leaderIndex); + + final RestoreService restoreService = getFollowerCluster().getCurrentMasterNodeInstance(RestoreService.class); + final ClusterService clusterService = getFollowerCluster().getCurrentMasterNodeInstance(ClusterService.class); + + Settings.Builder settingsBuilder = Settings.builder() + .put(IndexMetaData.SETTING_INDEX_PROVIDED_NAME, followerIndex) + .put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true); + RestoreService.RestoreRequest restoreRequest = new RestoreService.RestoreRequest(leaderClusterRepoName, + CcrRepository.LATEST, new String[]{leaderIndex}, indicesOptions, + "^(.*)$", followerIndex, Settings.EMPTY, new TimeValue(1, TimeUnit.HOURS), false, + false, true, settingsBuilder.build(), new String[0], + "restore_snapshot[" + leaderClusterRepoName + ":" + leaderIndex + "]"); + + Set sessionsOpened = ConcurrentCollections.newConcurrentSet(); + Set sessionsClosed = ConcurrentCollections.newConcurrentSet(); + for (CcrRestoreSourceService restoreSourceService : getLeaderCluster().getDataNodeInstances(CcrRestoreSourceService.class)) { + restoreSourceService.addOpenSessionListener(sessionsOpened::add); + restoreSourceService.addCloseSessionListener(sessionsClosed::add); + } + + PlainActionFuture future = PlainActionFuture.newFuture(); + restoreService.restoreSnapshot(restoreRequest, waitForRestore(clusterService, future)); + RestoreInfo restoreInfo = future.actionGet(); + + assertEquals(numberOfPrimaryShards, sessionsOpened.size()); + assertEquals(numberOfPrimaryShards, sessionsClosed.size()); + + assertEquals(restoreInfo.totalShards(), restoreInfo.successfulShards()); + assertEquals(0, restoreInfo.failedShards()); + } + private ActionListener waitForRestore(ClusterService clusterService, ActionListener listener) { return new ActionListener() { diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceServiceTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceServiceTests.java new file mode 100644 index 0000000000000..dfa7e5ef12660 --- /dev/null +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceServiceTests.java @@ -0,0 +1,125 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ccr.repository; + +import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.shard.IllegalIndexShardStateException; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.IndexShardTestCase; +import org.junit.Before; + +import java.io.IOException; +import java.util.HashSet; + +public class CcrRestoreSourceServiceTests extends IndexShardTestCase { + + private CcrRestoreSourceService restoreSourceService; + + @Before + public void setUp() throws Exception { + super.setUp(); + restoreSourceService = new CcrRestoreSourceService(Settings.EMPTY); + } + + public void testOpenSession() throws IOException { + IndexShard indexShard1 = newStartedShard(true); + IndexShard indexShard2 = newStartedShard(true); + final String sessionUUID1 = UUIDs.randomBase64UUID(); + final String sessionUUID2 = UUIDs.randomBase64UUID(); + final String sessionUUID3 = UUIDs.randomBase64UUID(); + + assertNull(restoreSourceService.getSessionsForShard(indexShard1)); + + assertNotNull(restoreSourceService.openSession(sessionUUID1, indexShard1)); + HashSet sessionsForShard = restoreSourceService.getSessionsForShard(indexShard1); + assertEquals(1, sessionsForShard.size()); + assertTrue(sessionsForShard.contains(sessionUUID1)); + assertNotNull(restoreSourceService.openSession(sessionUUID2, indexShard1)); + sessionsForShard = restoreSourceService.getSessionsForShard(indexShard1); + assertEquals(2, sessionsForShard.size()); + assertTrue(sessionsForShard.contains(sessionUUID2)); + + assertNull(restoreSourceService.getSessionsForShard(indexShard2)); + assertNotNull(restoreSourceService.openSession(sessionUUID3, indexShard2)); + sessionsForShard = restoreSourceService.getSessionsForShard(indexShard2); + assertEquals(1, sessionsForShard.size()); + assertTrue(sessionsForShard.contains(sessionUUID3)); + + restoreSourceService.closeSession(sessionUUID1); + restoreSourceService.closeSession(sessionUUID2); + restoreSourceService.closeSession(sessionUUID3); + + closeShards(indexShard1, indexShard2); + } + + public void testCannotOpenSessionForClosedShard() throws IOException { + IndexShard indexShard = newStartedShard(true); + closeShards(indexShard); + String sessionUUID = UUIDs.randomBase64UUID(); + expectThrows(IllegalIndexShardStateException.class, () -> restoreSourceService.openSession(sessionUUID, indexShard)); + assertNull(restoreSourceService.getOngoingRestore(sessionUUID)); + } + + public void testCloseSession() throws IOException { + IndexShard indexShard1 = newStartedShard(true); + IndexShard indexShard2 = newStartedShard(true); + final String sessionUUID1 = UUIDs.randomBase64UUID(); + final String sessionUUID2 = UUIDs.randomBase64UUID(); + final String sessionUUID3 = UUIDs.randomBase64UUID(); + + restoreSourceService.openSession(sessionUUID1, indexShard1); + restoreSourceService.openSession(sessionUUID2, indexShard1); + restoreSourceService.openSession(sessionUUID3, indexShard2); + + assertEquals(2, restoreSourceService.getSessionsForShard(indexShard1).size()); + assertEquals(1, restoreSourceService.getSessionsForShard(indexShard2).size()); + assertNotNull(restoreSourceService.getOngoingRestore(sessionUUID1)); + assertNotNull(restoreSourceService.getOngoingRestore(sessionUUID2)); + assertNotNull(restoreSourceService.getOngoingRestore(sessionUUID3)); + + restoreSourceService.closeSession(sessionUUID1); + assertEquals(1, restoreSourceService.getSessionsForShard(indexShard1).size()); + assertNull(restoreSourceService.getOngoingRestore(sessionUUID1)); + assertFalse(restoreSourceService.getSessionsForShard(indexShard1).contains(sessionUUID1)); + assertTrue(restoreSourceService.getSessionsForShard(indexShard1).contains(sessionUUID2)); + + restoreSourceService.closeSession(sessionUUID2); + assertNull(restoreSourceService.getSessionsForShard(indexShard1)); + assertNull(restoreSourceService.getOngoingRestore(sessionUUID2)); + + restoreSourceService.closeSession(sessionUUID3); + assertNull(restoreSourceService.getSessionsForShard(indexShard2)); + assertNull(restoreSourceService.getOngoingRestore(sessionUUID3)); + + closeShards(indexShard1, indexShard2); + } + + public void testCloseShardListenerFunctionality() throws IOException { + IndexShard indexShard1 = newStartedShard(true); + IndexShard indexShard2 = newStartedShard(true); + final String sessionUUID1 = UUIDs.randomBase64UUID(); + final String sessionUUID2 = UUIDs.randomBase64UUID(); + final String sessionUUID3 = UUIDs.randomBase64UUID(); + + restoreSourceService.openSession(sessionUUID1, indexShard1); + restoreSourceService.openSession(sessionUUID2, indexShard1); + restoreSourceService.openSession(sessionUUID3, indexShard2); + + assertEquals(2, restoreSourceService.getSessionsForShard(indexShard1).size()); + assertEquals(1, restoreSourceService.getSessionsForShard(indexShard2).size()); + + restoreSourceService.afterIndexShardClosed(indexShard1.shardId(), indexShard1, Settings.EMPTY); + + assertNull(restoreSourceService.getSessionsForShard(indexShard1)); + assertNull(restoreSourceService.getOngoingRestore(sessionUUID1)); + assertNull(restoreSourceService.getOngoingRestore(sessionUUID2)); + + restoreSourceService.closeSession(sessionUUID3); + closeShards(indexShard1, indexShard2); + } +}