Skip to content

Commit

Permalink
Create Remote Object managers and use them in orchestration from Remo…
Browse files Browse the repository at this point in the history
…teClusterStateService (opensearch-project#13924)

* Create Remote Object managers and use them in orchestration from RemoteClusterStateService

Signed-off-by: Shivansh Arora <[email protected]>
  • Loading branch information
shiv0408 authored Jun 11, 2024
1 parent d17e092 commit 81fd088
Show file tree
Hide file tree
Showing 37 changed files with 1,487 additions and 1,195 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.opensearch.gateway.remote.RemoteClusterStateService.COORDINATION_METADATA;
import static org.opensearch.gateway.remote.RemoteClusterStateService.CUSTOM_METADATA;
import static org.opensearch.gateway.remote.RemoteClusterStateService.DELIMITER;
import static org.opensearch.gateway.remote.RemoteClusterStateService.METADATA_FILE_PREFIX;
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
import static org.opensearch.gateway.remote.RemoteClusterStateService.SETTING_METADATA;
import static org.opensearch.gateway.remote.RemoteClusterStateService.TEMPLATES_METADATA;
import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER;
import static org.opensearch.gateway.remote.RemoteClusterStateUtils.METADATA_FILE_PREFIX;
import static org.opensearch.gateway.remote.model.RemoteCoordinationMetadata.COORDINATION_METADATA;
import static org.opensearch.gateway.remote.model.RemoteCustomMetadata.CUSTOM_METADATA;
import static org.opensearch.gateway.remote.model.RemotePersistentSettingsMetadata.SETTING_METADATA;
import static org.opensearch.gateway.remote.model.RemoteTemplatesMetadata.TEMPLATES_METADATA;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemoteClusterStateServiceIT extends RemoteStoreBaseIntegTestCase {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import static org.opensearch.cluster.metadata.Metadata.CLUSTER_READ_ONLY_BLOCK;
import static org.opensearch.cluster.metadata.Metadata.SETTING_READ_ONLY_SETTING;
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
import static org.opensearch.gateway.remote.RemoteClusterStateUtils.encodeString;
import static org.opensearch.indices.ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_NODE;
import static org.opensearch.repositories.blobstore.BlobStoreRepository.SYSTEM_REPOSITORY_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
Expand Down Expand Up @@ -326,9 +327,7 @@ public void testFullClusterRestoreManifestFilePointsToInvalidIndexMetadataPathTh
// Step - 3 Delete index metadata file in remote
try {
Files.move(
segmentRepoPath.resolve(
RemoteClusterStateService.encodeString(clusterName) + "/cluster-state/" + prevClusterUUID + "/index"
),
segmentRepoPath.resolve(encodeString(clusterName) + "/cluster-state/" + prevClusterUUID + "/index"),
segmentRepoPath.resolve("cluster-state/")
);
} catch (IOException e) {
Expand All @@ -354,10 +353,7 @@ public void testRemoteStateFullRestart() throws Exception {
try {
Files.move(
segmentRepoPath.resolve(
RemoteClusterStateService.encodeString(clusterService().state().getClusterName().value())
+ "/cluster-state/"
+ prevClusterUUID
+ "/manifest"
encodeString(clusterService().state().getClusterName().value()) + "/cluster-state/" + prevClusterUUID + "/manifest"
),
segmentRepoPath.resolve("cluster-state/")
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.gateway.remote.ClusterMetadataManifest;
import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.gateway.remote.RemoteStateTransferException;
import org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable;
import org.opensearch.index.remote.RemoteStoreEnums;
import org.opensearch.index.remote.RemoteStorePathStrategy;
Expand All @@ -52,6 +52,7 @@
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled;

/**
Expand Down Expand Up @@ -87,7 +88,6 @@ public class InternalRemoteRoutingTableService extends AbstractLifecycleComponen

public static final String INDEX_ROUTING_PATH_TOKEN = "index-routing";
public static final String INDEX_ROUTING_FILE_PREFIX = "index_routing";
public static final String DELIMITER = "__";
public static final String INDEX_ROUTING_METADATA_PREFIX = "indexRouting--";

private static final Logger logger = LogManager.getLogger(InternalRemoteRoutingTableService.class);
Expand Down Expand Up @@ -175,10 +175,7 @@ public CheckedRunnable<IOException> getIndexRoutingAsyncAction(
)
),
ex -> latchedActionListener.onFailure(
new RemoteClusterStateService.RemoteStateTransferException(
"Exception in writing index to remote store: " + indexRouting.getIndex().toString(),
ex
)
new RemoteStateTransferException("Exception in writing index to remote store: " + indexRouting.getIndex().toString(), ex)
)
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ public AbstractRemoteWritableBlobEntity(

public abstract BlobPathParameters getBlobPathParameters();

public abstract String getType();

public String getFullBlobName() {
return blobName;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,10 @@
import java.util.Set;
import java.util.function.Predicate;

import static org.opensearch.gateway.remote.RemoteGlobalMetadataManager.GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING;
import static org.opensearch.gateway.remote.RemoteIndexMetadataManager.INDEX_METADATA_UPLOAD_TIMEOUT_SETTING;
import static org.opensearch.gateway.remote.RemoteManifestManager.METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING;

/**
* Encapsulates all valid cluster level settings.
*
Expand Down Expand Up @@ -717,9 +721,9 @@ public void apply(Settings value, Settings current, Settings previous) {
// Remote cluster state settings
RemoteClusterStateCleanupManager.REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING,
RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING,
RemoteClusterStateService.INDEX_METADATA_UPLOAD_TIMEOUT_SETTING,
RemoteClusterStateService.GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING,
RemoteClusterStateService.METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING,
INDEX_METADATA_UPLOAD_TIMEOUT_SETTING,
GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING,
METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING,
RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING,
RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING,
IndicesService.CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1158,8 +1158,7 @@ public String getComponent() {
}

public String getUploadedFilename() {
String[] splitPath = uploadedFilename.split("/");
return splitPath[splitPath.length - 1];
return uploadedFilename;
}

public String getIndexName() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,9 @@
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.opensearch.gateway.remote.RemoteClusterStateService.GLOBAL_METADATA_FORMAT;
import static org.opensearch.gateway.remote.RemoteClusterStateService.GLOBAL_METADATA_PATH_TOKEN;
import static org.opensearch.gateway.remote.RemoteClusterStateService.INDEX_METADATA_FORMAT;
import static org.opensearch.gateway.remote.RemoteClusterStateService.INDEX_PATH_TOKEN;
import static org.opensearch.gateway.remote.RemoteClusterStateService.MANIFEST_FILE_PREFIX;
import static org.opensearch.gateway.remote.RemoteClusterStateService.MANIFEST_PATH_TOKEN;
import static org.opensearch.gateway.remote.RemoteClusterStateUtils.GLOBAL_METADATA_PATH_TOKEN;
import static org.opensearch.gateway.remote.model.RemoteClusterMetadataManifest.MANIFEST;
import static org.opensearch.gateway.remote.model.RemoteGlobalMetadata.GLOBAL_METADATA_FORMAT;

/**
* A Manager which provides APIs to clean up stale cluster state files and runs an async stale cleanup task
Expand Down Expand Up @@ -74,6 +71,7 @@ public class RemoteClusterStateCleanupManager implements Closeable {
private long lastCleanupAttemptStateVersion;
private final ThreadPool threadpool;
private final ClusterApplierService clusterApplierService;
private RemoteManifestManager remoteManifestManager;

public RemoteClusterStateCleanupManager(RemoteClusterStateService remoteClusterStateService, ClusterService clusterService) {
this.remoteClusterStateService = remoteClusterStateService;
Expand All @@ -89,6 +87,7 @@ public RemoteClusterStateCleanupManager(RemoteClusterStateService remoteClusterS

void start() {
staleFileDeletionTask = new AsyncStaleFileDeletion(this);
remoteManifestManager = remoteClusterStateService.getRemoteManifestManager();
}

@Override
Expand Down Expand Up @@ -172,13 +171,17 @@ void deleteClusterMetadata(
Set<String> staleIndexMetadataPaths = new HashSet<>();
Set<String> staleGlobalMetadataPaths = new HashSet<>();
activeManifestBlobMetadata.forEach(blobMetadata -> {
ClusterMetadataManifest clusterMetadataManifest = remoteClusterStateService.fetchRemoteClusterMetadataManifest(
ClusterMetadataManifest clusterMetadataManifest = remoteManifestManager.fetchRemoteClusterMetadataManifest(
clusterName,
clusterUUID,
blobMetadata.name()
);
clusterMetadataManifest.getIndices()
.forEach(uploadedIndexMetadata -> filesToKeep.add(uploadedIndexMetadata.getUploadedFilename()));
.forEach(
uploadedIndexMetadata -> filesToKeep.add(
RemoteClusterStateUtils.getFormattedIndexFileName(uploadedIndexMetadata.getUploadedFilename())
)
);
if (clusterMetadataManifest.getCodecVersion() == ClusterMetadataManifest.CODEC_V1) {
filesToKeep.add(clusterMetadataManifest.getGlobalMetadataFileName());
} else if (clusterMetadataManifest.getCodecVersion() >= ClusterMetadataManifest.CODEC_V2) {
Expand All @@ -191,43 +194,38 @@ void deleteClusterMetadata(
}
});
staleManifestBlobMetadata.forEach(blobMetadata -> {
ClusterMetadataManifest clusterMetadataManifest = remoteClusterStateService.fetchRemoteClusterMetadataManifest(
ClusterMetadataManifest clusterMetadataManifest = remoteManifestManager.fetchRemoteClusterMetadataManifest(
clusterName,
clusterUUID,
blobMetadata.name()
);
staleManifestPaths.add(new BlobPath().add(MANIFEST_PATH_TOKEN).buildAsString() + blobMetadata.name());
staleManifestPaths.add(
remoteManifestManager.getManifestFolderPath(clusterName, clusterUUID).buildAsString() + blobMetadata.name()
);
if (clusterMetadataManifest.getCodecVersion() == ClusterMetadataManifest.CODEC_V1) {
addStaleGlobalMetadataPath(clusterMetadataManifest.getGlobalMetadataFileName(), filesToKeep, staleGlobalMetadataPaths);
} else if (clusterMetadataManifest.getCodecVersion() >= ClusterMetadataManifest.CODEC_V2) {
addStaleGlobalMetadataPath(
clusterMetadataManifest.getCoordinationMetadata().getUploadedFilename(),
filesToKeep,
staleGlobalMetadataPaths
);
addStaleGlobalMetadataPath(
clusterMetadataManifest.getSettingsMetadata().getUploadedFilename(),
filesToKeep,
staleGlobalMetadataPaths
);
addStaleGlobalMetadataPath(
clusterMetadataManifest.getTemplatesMetadata().getUploadedFilename(),
filesToKeep,
staleGlobalMetadataPaths
);
if (filesToKeep.contains(clusterMetadataManifest.getCoordinationMetadata().getUploadedFilename()) == false) {
staleGlobalMetadataPaths.add(clusterMetadataManifest.getCoordinationMetadata().getUploadedFilename());
}
if (filesToKeep.contains(clusterMetadataManifest.getSettingsMetadata().getUploadedFilename()) == false) {
staleGlobalMetadataPaths.add(clusterMetadataManifest.getSettingsMetadata().getUploadedFilename());
}
if (filesToKeep.contains(clusterMetadataManifest.getTemplatesMetadata().getUploadedFilename()) == false) {
staleGlobalMetadataPaths.add(clusterMetadataManifest.getTemplatesMetadata().getUploadedFilename());
}
clusterMetadataManifest.getCustomMetadataMap()
.values()
.forEach(
attribute -> addStaleGlobalMetadataPath(attribute.getUploadedFilename(), filesToKeep, staleGlobalMetadataPaths)
);
.stream()
.map(ClusterMetadataManifest.UploadedMetadataAttribute::getUploadedFilename)
.filter(file -> filesToKeep.contains(file) == false)
.forEach(staleGlobalMetadataPaths::add);
}

clusterMetadataManifest.getIndices().forEach(uploadedIndexMetadata -> {
if (filesToKeep.contains(uploadedIndexMetadata.getUploadedFilename()) == false) {
staleIndexMetadataPaths.add(
new BlobPath().add(INDEX_PATH_TOKEN).add(uploadedIndexMetadata.getIndexUUID()).buildAsString()
+ INDEX_METADATA_FORMAT.blobName(uploadedIndexMetadata.getUploadedFilename())
);
String fileName = RemoteClusterStateUtils.getFormattedIndexFileName(uploadedIndexMetadata.getUploadedFilename());
if (filesToKeep.contains(fileName) == false) {
staleIndexMetadataPaths.add(fileName);
}
});
});
Expand All @@ -237,9 +235,9 @@ void deleteClusterMetadata(
return;
}

deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleGlobalMetadataPaths));
deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleIndexMetadataPaths));
deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleManifestPaths));
deleteStalePaths(new ArrayList<>(staleGlobalMetadataPaths));
deleteStalePaths(new ArrayList<>(staleIndexMetadataPaths));
deleteStalePaths(new ArrayList<>(staleManifestPaths));
} catch (IllegalStateException e) {
logger.error("Error while fetching Remote Cluster Metadata manifests", e);
} catch (IOException e) {
Expand Down Expand Up @@ -267,8 +265,8 @@ void deleteStaleClusterMetadata(String clusterName, String clusterUUID, int mani
try {
getBlobStoreTransferService().listAllInSortedOrderAsync(
ThreadPool.Names.REMOTE_PURGE,
remoteClusterStateService.getManifestFolderPath(clusterName, clusterUUID),
MANIFEST_FILE_PREFIX,
remoteManifestManager.getManifestFolderPath(clusterName, clusterUUID),
MANIFEST,
Integer.MAX_VALUE,
new ActionListener<>() {
@Override
Expand Down Expand Up @@ -312,7 +310,11 @@ void deleteStaleUUIDsClusterMetadata(String clusterName, List<String> clusterUUI
clusterUUIDs.forEach(
clusterUUID -> getBlobStoreTransferService().deleteAsync(
ThreadPool.Names.REMOTE_PURGE,
remoteClusterStateService.getCusterMetadataBasePath(clusterName, clusterUUID),
RemoteClusterStateUtils.getClusterMetadataBasePath(
remoteClusterStateService.getBlobStoreRepository(),
clusterName,
clusterUUID
),
new ActionListener<>() {
@Override
public void onResponse(Void unused) {
Expand All @@ -336,12 +338,9 @@ public void onFailure(Exception e) {
}

// package private for testing
void deleteStalePaths(String clusterName, String clusterUUID, List<String> stalePaths) throws IOException {
void deleteStalePaths(List<String> stalePaths) throws IOException {
logger.debug(String.format(Locale.ROOT, "Deleting stale files from remote - %s", stalePaths));
getBlobStoreTransferService().deleteBlobs(
remoteClusterStateService.getCusterMetadataBasePath(clusterName, clusterUUID),
stalePaths
);
getBlobStoreTransferService().deleteBlobs(BlobPath.cleanPath(), stalePaths);
}

/**
Expand Down
Loading

0 comments on commit 81fd088

Please sign in to comment.