Skip to content

Commit

Permalink
Add CcrRestoreSourceService to track sessions (elastic#36578)
Browse files Browse the repository at this point in the history
This commit is related to elastic#36127. It adds a CcrRestoreSourceService to
track Engine.IndexCommitRef need for in-process file restores. When a
follower starts restoring a shard through the CcrRepository it opens a
session with the leader through the PutCcrRestoreSessionAction. The
leader responds to the request by telling the follower what files it
needs to fetch for a restore. This is not yet implemented.

Once, the restore is complete, the follower closes the session with the
DeleteCcrRestoreSessionAction action.
  • Loading branch information
Tim-Brooks committed Dec 20, 2018
1 parent 566c98a commit 67da976
Show file tree
Hide file tree
Showing 10 changed files with 788 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public Iterables() {

public static <T> Iterable<T> concat(Iterable<T>... inputs) {
Objects.requireNonNull(inputs);
return new ConcatenatedIterable(inputs);
return new ConcatenatedIterable<>(inputs);
}

static class ConcatenatedIterable<T> implements Iterable<T> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.IndexModule;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.license.XPackLicenseState;
Expand Down Expand Up @@ -59,10 +60,13 @@
import org.elasticsearch.xpack.ccr.action.TransportUnfollowAction;
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsAction;
import org.elasticsearch.xpack.ccr.action.bulk.TransportBulkShardOperationsAction;
import org.elasticsearch.xpack.ccr.action.repositories.ClearCcrRestoreSessionAction;
import org.elasticsearch.xpack.ccr.action.repositories.DeleteInternalCcrRepositoryAction;
import org.elasticsearch.xpack.ccr.action.repositories.PutCcrRestoreSessionAction;
import org.elasticsearch.xpack.ccr.action.repositories.PutInternalCcrRepositoryAction;
import org.elasticsearch.xpack.ccr.index.engine.FollowingEngineFactory;
import org.elasticsearch.xpack.ccr.repository.CcrRepository;
import org.elasticsearch.xpack.ccr.repository.CcrRestoreSourceService;
import org.elasticsearch.xpack.ccr.rest.RestCcrStatsAction;
import org.elasticsearch.xpack.ccr.rest.RestDeleteAutoFollowPatternAction;
import org.elasticsearch.xpack.ccr.rest.RestFollowStatsAction;
Expand Down Expand Up @@ -113,7 +117,9 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E
private final boolean enabled;
private final Settings settings;
private final CcrLicenseChecker ccrLicenseChecker;

private final SetOnce<CcrRepositoryManager> repositoryManager = new SetOnce<>();
private final SetOnce<CcrRestoreSourceService> restoreSourceService = new SetOnce<>();
private Client client;

private final boolean tribeNode;
Expand Down Expand Up @@ -160,9 +166,12 @@ public Collection<Object> createComponents(
}

this.repositoryManager.set(new CcrRepositoryManager(settings, clusterService, (NodeClient) client));

CcrRestoreSourceService restoreSourceService = new CcrRestoreSourceService(settings);
this.restoreSourceService.set(restoreSourceService);
return Arrays.asList(
ccrLicenseChecker,
restoreSourceService,
repositoryManager.get(),
new AutoFollowCoordinator(client, clusterService, ccrLicenseChecker, threadPool::relativeTimeInMillis)
);
}
Expand All @@ -189,6 +198,10 @@ public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(ClusterServic
PutInternalCcrRepositoryAction.TransportPutInternalRepositoryAction.class),
new ActionHandler<>(DeleteInternalCcrRepositoryAction.INSTANCE,
DeleteInternalCcrRepositoryAction.TransportDeleteInternalRepositoryAction.class),
new ActionHandler<>(PutCcrRestoreSessionAction.INSTANCE,
PutCcrRestoreSessionAction.TransportPutCcrRestoreSessionAction.class),
new ActionHandler<>(ClearCcrRestoreSessionAction.INSTANCE,
ClearCcrRestoreSessionAction.TransportDeleteCcrRestoreSessionAction.class),
// stats action
new ActionHandler<>(FollowStatsAction.INSTANCE, TransportFollowStatsAction.class),
new ActionHandler<>(CcrStatsAction.INSTANCE, TransportCcrStatsAction.class),
Expand Down Expand Up @@ -288,6 +301,11 @@ public Map<String, Repository.Factory> getInternalRepositories(Environment env,
return Collections.singletonMap(CcrRepository.TYPE, repositoryFactory);
}

@Override
public void onIndexModule(IndexModule indexModule) {
indexModule.addIndexEventListener(this.restoreSourceService.get());
}

protected XPackLicenseState getLicenseState() { return XPackPlugin.getSharedLicenseState(); }

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.ccr.action.repositories;

import org.elasticsearch.action.Action;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.nodes.BaseNodeResponse;
import org.elasticsearch.action.support.nodes.BaseNodesResponse;
import org.elasticsearch.action.support.nodes.TransportNodesAction;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ccr.repository.CcrRestoreSourceService;

import java.io.IOException;
import java.util.List;

public class ClearCcrRestoreSessionAction extends Action<ClearCcrRestoreSessionAction.ClearCcrRestoreSessionResponse> {

public static final ClearCcrRestoreSessionAction INSTANCE = new ClearCcrRestoreSessionAction();
private static final String NAME = "internal:admin/ccr/restore/session/clear";

private ClearCcrRestoreSessionAction() {
super(NAME);
}

@Override
public ClearCcrRestoreSessionResponse newResponse() {
return new ClearCcrRestoreSessionResponse();
}

public static class TransportDeleteCcrRestoreSessionAction extends TransportNodesAction<ClearCcrRestoreSessionRequest,
ClearCcrRestoreSessionResponse, ClearCcrRestoreSessionRequest.Request, Response> {

private final CcrRestoreSourceService ccrRestoreService;

@Inject
public TransportDeleteCcrRestoreSessionAction(ThreadPool threadPool, ClusterService clusterService, ActionFilters actionFilters,
TransportService transportService, CcrRestoreSourceService ccrRestoreService) {
super(NAME, threadPool, clusterService, transportService, actionFilters, ClearCcrRestoreSessionRequest::new,
ClearCcrRestoreSessionRequest.Request::new, ThreadPool.Names.GENERIC, Response.class);
this.ccrRestoreService = ccrRestoreService;
}

@Override
protected ClearCcrRestoreSessionResponse newResponse(ClearCcrRestoreSessionRequest request, List<Response> responses,
List<FailedNodeException> failures) {
return new ClearCcrRestoreSessionResponse(clusterService.getClusterName(), responses, failures);
}

@Override
protected ClearCcrRestoreSessionRequest.Request newNodeRequest(String nodeId, ClearCcrRestoreSessionRequest request) {
return request.getRequest();
}

@Override
protected Response newNodeResponse() {
return new Response();
}

@Override
protected Response nodeOperation(ClearCcrRestoreSessionRequest.Request request) {
ccrRestoreService.closeSession(request.getSessionUUID());
return new Response(clusterService.localNode());
}
}

public static class Response extends BaseNodeResponse {

private Response() {
}

private Response(StreamInput in) throws IOException {
readFrom(in);
}

private Response(DiscoveryNode node) {
super(node);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
}
}

public static class ClearCcrRestoreSessionResponse extends BaseNodesResponse<Response> {

ClearCcrRestoreSessionResponse() {
}

ClearCcrRestoreSessionResponse(ClusterName clusterName, List<Response> chunkResponses, List<FailedNodeException> failures) {
super(clusterName, chunkResponses, failures);
}

@Override
protected List<Response> readNodesFrom(StreamInput in) throws IOException {
return in.readList(Response::new);
}

@Override
protected void writeNodesTo(StreamOutput out, List<Response> nodes) throws IOException {
out.writeList(nodes);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.ccr.action.repositories;

import org.elasticsearch.action.support.nodes.BaseNodeRequest;
import org.elasticsearch.action.support.nodes.BaseNodesRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;

import java.io.IOException;

public class ClearCcrRestoreSessionRequest extends BaseNodesRequest<ClearCcrRestoreSessionRequest> {

private Request request;

ClearCcrRestoreSessionRequest() {
}

public ClearCcrRestoreSessionRequest(String nodeId, Request request) {
super(nodeId);
this.request = request;
}

@Override
public void readFrom(StreamInput streamInput) throws IOException {
super.readFrom(streamInput);
request = new Request();
request.readFrom(streamInput);
}

@Override
public void writeTo(StreamOutput streamOutput) throws IOException {
super.writeTo(streamOutput);
request.writeTo(streamOutput);
}

public Request getRequest() {
return request;
}

public static class Request extends BaseNodeRequest {

private String sessionUUID;

Request() {
}

public Request(String nodeId, String sessionUUID) {
super(nodeId);
this.sessionUUID = sessionUUID;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
sessionUUID = in.readString();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(sessionUUID);
}

public String getSessionUUID() {
return sessionUUID;
}
}
}
Loading

0 comments on commit 67da976

Please sign in to comment.