Skip to content

Commit

Permalink
Make Repository.getRepositoryData an Async API (#49299) (#49312)
Browse files Browse the repository at this point in the history
This API call in most implementations is fairly IO heavy and slow
so it is more natural to be async in the first place.
Concretely though, this change is a prerequisite of #49060 since
determining the repository generation from the cluster state
introduces situations where this call would have to wait for other
operations to finish. Doing so in a blocking manner would break
`SnapshotResiliencyTests` and waste a thread.
Also, this sets up the possibility to in the future make use of async IO
where provided by the underlying Repository implementation.

In a follow-up `SnapshotsService#getRepositoryData` will be made async
as well (did not do it here, since it's another huge change to do so).
Note: This change for now does not alter the threading behaviour in any way (since `Repository#getRepositoryData` isn't forking) and is purely mechanical.
  • Loading branch information
original-brownbear authored Nov 19, 2019
1 parent cb5169a commit 0acba44
Show file tree
Hide file tree
Showing 18 changed files with 739 additions and 701 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.StepListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterState;
Expand All @@ -41,6 +42,7 @@
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.RepositoryCleanupResult;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.snapshots.SnapshotsService;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -167,97 +169,103 @@ private void cleanupRepo(String repositoryName, ActionListener<RepositoryCleanup
return;
}
final BlobStoreRepository blobStoreRepository = (BlobStoreRepository) repository;
final long repositoryStateId = repository.getRepositoryData().getGenId();
logger.info("Running cleanup operations on repository [{}][{}]", repositoryName, repositoryStateId);
clusterService.submitStateUpdateTask("cleanup repository [" + repositoryName + "][" + repositoryStateId + ']',
new ClusterStateUpdateTask() {
final StepListener<RepositoryData> repositoryDataListener = new StepListener<>();
repository.getRepositoryData(repositoryDataListener);
repositoryDataListener.whenComplete(repositoryData -> {
final long repositoryStateId = repositoryData.getGenId();
logger.info("Running cleanup operations on repository [{}][{}]", repositoryName, repositoryStateId);
clusterService.submitStateUpdateTask("cleanup repository [" + repositoryName + "][" + repositoryStateId + ']',
new ClusterStateUpdateTask() {

private boolean startedCleanup = false;
private boolean startedCleanup = false;

@Override
public ClusterState execute(ClusterState currentState) {
final RepositoryCleanupInProgress repositoryCleanupInProgress = currentState.custom(RepositoryCleanupInProgress.TYPE);
if (repositoryCleanupInProgress != null && repositoryCleanupInProgress.hasCleanupInProgress()) {
throw new IllegalStateException(
"Cannot cleanup [" + repositoryName + "] - a repository cleanup is already in-progress in ["
+ repositoryCleanupInProgress + "]");
}
SnapshotDeletionsInProgress deletionsInProgress = currentState.custom(SnapshotDeletionsInProgress.TYPE);
if (deletionsInProgress != null && deletionsInProgress.hasDeletionsInProgress()) {
throw new IllegalStateException("Cannot cleanup [" + repositoryName
+ "] - a snapshot is currently being deleted in [" + deletionsInProgress + "]");
}
SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE);
if (snapshots != null && !snapshots.entries().isEmpty()) {
throw new IllegalStateException(
"Cannot cleanup [" + repositoryName + "] - a snapshot is currently running in [" + snapshots + "]");
@Override
public ClusterState execute(ClusterState currentState) {
final RepositoryCleanupInProgress repositoryCleanupInProgress =
currentState.custom(RepositoryCleanupInProgress.TYPE);
if (repositoryCleanupInProgress != null && repositoryCleanupInProgress.hasCleanupInProgress()) {
throw new IllegalStateException(
"Cannot cleanup [" + repositoryName + "] - a repository cleanup is already in-progress in ["
+ repositoryCleanupInProgress + "]");
}
SnapshotDeletionsInProgress deletionsInProgress = currentState.custom(SnapshotDeletionsInProgress.TYPE);
if (deletionsInProgress != null && deletionsInProgress.hasDeletionsInProgress()) {
throw new IllegalStateException("Cannot cleanup [" + repositoryName
+ "] - a snapshot is currently being deleted in [" + deletionsInProgress + "]");
}
SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE);
if (snapshots != null && !snapshots.entries().isEmpty()) {
throw new IllegalStateException(
"Cannot cleanup [" + repositoryName + "] - a snapshot is currently running in [" + snapshots + "]");
}
return ClusterState.builder(currentState).putCustom(RepositoryCleanupInProgress.TYPE,
new RepositoryCleanupInProgress(
RepositoryCleanupInProgress.startedEntry(repositoryName, repositoryStateId))).build();
}
return ClusterState.builder(currentState).putCustom(RepositoryCleanupInProgress.TYPE,
new RepositoryCleanupInProgress(
RepositoryCleanupInProgress.startedEntry(repositoryName, repositoryStateId))).build();
}

@Override
public void onFailure(String source, Exception e) {
after(e, null);
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
startedCleanup = true;
logger.debug("Initialized repository cleanup in cluster state for [{}][{}]", repositoryName, repositoryStateId);
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener,
l -> blobStoreRepository.cleanup(
repositoryStateId,
newState.nodes().getMinNodeVersion().onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION),
ActionListener.wrap(result -> after(null, result), e -> after(e, null)))));
}

private void after(@Nullable Exception failure, @Nullable RepositoryCleanupResult result) {
if (failure == null) {
logger.debug("Finished repository cleanup operations on [{}][{}]", repositoryName, repositoryStateId);
} else {
logger.debug(() -> new ParameterizedMessage(
"Failed to finish repository cleanup operations on [{}][{}]", repositoryName, repositoryStateId), failure);
@Override
public void onFailure(String source, Exception e) {
after(e, null);
}
assert failure != null || result != null;
if (startedCleanup == false) {
logger.debug("No cleanup task to remove from cluster state because we failed to start one", failure);
listener.onFailure(failure);
return;

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
startedCleanup = true;
logger.debug("Initialized repository cleanup in cluster state for [{}][{}]", repositoryName, repositoryStateId);
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener,
l -> blobStoreRepository.cleanup(
repositoryStateId,
newState.nodes().getMinNodeVersion().onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION),
ActionListener.wrap(result -> after(null, result), e -> after(e, null)))));
}
clusterService.submitStateUpdateTask(
"remove repository cleanup task [" + repositoryName + "][" + repositoryStateId + ']',
new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
return removeInProgressCleanup(currentState);
}

@Override
public void onFailure(String source, Exception e) {
if (failure != null) {
e.addSuppressed(failure);
private void after(@Nullable Exception failure, @Nullable RepositoryCleanupResult result) {
if (failure == null) {
logger.debug("Finished repository cleanup operations on [{}][{}]", repositoryName, repositoryStateId);
} else {
logger.debug(() -> new ParameterizedMessage(
"Failed to finish repository cleanup operations on [{}][{}]", repositoryName, repositoryStateId), failure);
}
assert failure != null || result != null;
if (startedCleanup == false) {
logger.debug("No cleanup task to remove from cluster state because we failed to start one", failure);
listener.onFailure(failure);
return;
}
clusterService.submitStateUpdateTask(
"remove repository cleanup task [" + repositoryName + "][" + repositoryStateId + ']',
new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
return removeInProgressCleanup(currentState);
}

@Override
public void onFailure(String source, Exception e) {
if (failure != null) {
e.addSuppressed(failure);
}
logger.warn(() ->
new ParameterizedMessage("[{}] failed to remove repository cleanup task", repositoryName), e);
listener.onFailure(e);
}
logger.warn(() ->
new ParameterizedMessage("[{}] failed to remove repository cleanup task", repositoryName), e);
listener.onFailure(e);
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
if (failure == null) {
logger.info("Done with repository cleanup on [{}][{}] with result [{}]",
repositoryName, repositoryStateId, result);
listener.onResponse(result);
} else {
logger.warn(() -> new ParameterizedMessage("Failed to run repository cleanup operations on [{}][{}]",
repositoryName, repositoryStateId), failure);
listener.onFailure(failure);
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
if (failure == null) {
logger.info("Done with repository cleanup on [{}][{}] with result [{}]",
repositoryName, repositoryStateId, result);
listener.onResponse(result);
} else {
logger.warn(() -> new ParameterizedMessage(
"Failed to run repository cleanup operations on [{}][{}]",
repositoryName, repositoryStateId), failure);
listener.onFailure(failure);
}
}
}
});
}
});
});
}
});
}, listener::onFailure);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -483,15 +483,21 @@ private void restore(IndexShard indexShard, Repository repository, SnapshotRecov
translogState.totalOperations(0);
translogState.totalOperationsOnStart(0);
indexShard.prepareForIndexRecovery();
ShardId snapshotShardId = shardId;
final ShardId snapshotShardId;
final String indexName = restoreSource.index();
if (!shardId.getIndexName().equals(indexName)) {
snapshotShardId = new ShardId(indexName, IndexMetaData.INDEX_UUID_NA_VALUE, shardId.id());
} else {
snapshotShardId = shardId;
}
final IndexId indexId = repository.getRepositoryData().resolveIndexId(indexName);
assert indexShard.getEngineOrNull() == null;
repository.restoreShard(indexShard.store(), restoreSource.snapshot().getSnapshotId(), indexId, snapshotShardId,
indexShard.recoveryState(), restoreListener);
repository.getRepositoryData(ActionListener.wrap(
repositoryData -> {
final IndexId indexId = repositoryData.resolveIndexId(indexName);
assert indexShard.getEngineOrNull() == null;
repository.restoreShard(indexShard.store(), restoreSource.snapshot().getSnapshotId(), indexId, snapshotShardId,
indexShard.recoveryState(), restoreListener);
}, restoreListener::onFailure
));
} catch (Exception e) {
restoreListener.onFailure(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ public IndexMetaData getSnapshotIndexMetaData(SnapshotId snapshotId, IndexId ind
}

@Override
public RepositoryData getRepositoryData() {
return in.getRepositoryData();
public void getRepositoryData(ActionListener<RepositoryData> listener) {
in.getRepositoryData(listener);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ default Repository create(RepositoryMetaData metaData, Function<String, Reposito
* and the indices across all snapshots found in the repository. Throws a {@link RepositoryException}
* if there was an error in reading the data.
*/
RepositoryData getRepositoryData();
void getRepositoryData(ActionListener<RepositoryData> listener);

/**
* Starts snapshotting process
Expand Down
Loading

0 comments on commit 0acba44

Please sign in to comment.