Skip to content

Commit

Permalink
Experimental changes to support hashed prefix
Browse files Browse the repository at this point in the history
Signed-off-by: Ashish Singh <[email protected]>
  • Loading branch information
ashking94 committed Jul 30, 2024
1 parent 98d5f0d commit 27fcb40
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -259,4 +259,77 @@ public PathInput build() {
}
}

/**
* Wrapper class for the snapshot aware input required to generate path for optimised snapshot paths. This input is
* composed of the parent inputs, shard id, and static indices.
*
* @opensearch.internal
*/
@PublicApi(since = "2.14.0")
@ExperimentalApi
public static class SnapshotShardPathInput extends BasePathInput {
private final String shardId;

public SnapshotShardPathInput(BlobPath basePath, String indexUUID, String shardId) {
super(basePath, indexUUID);
this.shardId = shardId;
}

public SnapshotShardPathInput(Builder builder) {
super(builder);
this.shardId = Objects.requireNonNull(builder.shardId);
}

String shardId() {
return shardId;
}

@Override
BlobPath fixedSubPath() {
return BlobPath.cleanPath().add("indices").add(super.fixedSubPath()).add(shardId);
}

/**
* Returns a new builder for {@link PathInput}.
*/
public static Builder builder() {
return new Builder();
}

/**
* Builder for {@link PathInput}.
*
* @opensearch.internal
*/
@PublicApi(since = "2.14.0")
@ExperimentalApi
public static class Builder extends BasePathInput.Builder<Builder> {
private String shardId;

public Builder basePath(BlobPath basePath) {
super.basePath = basePath;
return this;
}

public Builder indexUUID(String indexUUID) {
super.indexUUID = indexUUID;
return this;
}

public Builder shardId(String shardId) {
this.shardId = shardId;
return this;
}

@Override
protected Builder self() {
return this;
}

public SnapshotShardPathInput build() {
return new SnapshotShardPathInput(this);
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.remote.RemoteStorePathStrategy;
import org.opensearch.index.remote.RemoteStorePathStrategy.BasePathInput;
import org.opensearch.index.remote.RemoteStorePathStrategy.SnapshotShardPathInput;
import org.opensearch.index.snapshots.IndexShardRestoreFailedException;
import org.opensearch.index.snapshots.IndexShardSnapshotStatus;
import org.opensearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot;
Expand Down Expand Up @@ -390,6 +391,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp

private final SetOnce<BlobContainer> blobContainer = new SetOnce<>();

private final SetOnce<BlobContainer> rootBlobContainer = new SetOnce<>();

private final SetOnce<BlobStore> blobStore = new SetOnce<>();

protected final ClusterService clusterService;
Expand Down Expand Up @@ -813,6 +816,22 @@ protected BlobContainer blobContainer() {
return blobContainer;
}

private BlobContainer rootBlobContainer() {
assertSnapshotOrGenericThread();

BlobContainer rootBlobContainer = this.rootBlobContainer.get();
if (rootBlobContainer == null) {
synchronized (lock) {
rootBlobContainer = this.rootBlobContainer.get();
if (rootBlobContainer == null) {
rootBlobContainer = blobStore().blobContainer(BlobPath.cleanPath());
this.rootBlobContainer.set(rootBlobContainer);
}
}
}
return rootBlobContainer;
}

/**
* Maintains single lazy instance of {@link BlobStore}.
* Public for testing.
Expand Down Expand Up @@ -1280,6 +1299,8 @@ private void executeStaleShardDelete(
RemoteStoreLockManagerFactory remoteStoreLockManagerFactory,
GroupedActionListener<Void> listener
) throws InterruptedException {
final String basePath = basePath().buildAsString();
final int basePathLen = basePath.length();
List<String> filesToDelete = staleFilesToDeleteInBatch.poll(0L, TimeUnit.MILLISECONDS);
if (filesToDelete != null) {
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener, l -> {
Expand All @@ -1289,12 +1310,13 @@ private void executeStaleShardDelete(
List<String> eligibleFilesToDelete = new ArrayList<>();
for (String fileToDelete : filesToDelete) {
if (fileToDelete.contains(SHALLOW_SNAPSHOT_PREFIX)) {
String[] fileToDeletePath = fileToDelete.split("/");
String relativeFileToDeletePath = fileToDelete.substring(basePathLen);
String[] fileToDeletePath = relativeFileToDeletePath.split("/");
String indexId = fileToDeletePath[1];
String shardId = fileToDeletePath[2];
String shallowSnapBlob = fileToDeletePath[3];
String snapshotUUID = extractShallowSnapshotUUID(shallowSnapBlob).orElseThrow();
BlobContainer shardContainer = blobStore().blobContainer(indicesPath().add(indexId).add(shardId));
BlobContainer shardContainer = shardContainer(indexId, shardId);
try {
releaseRemoteStoreLockAndCleanup(shardId, snapshotUUID, shardContainer, remoteStoreLockManagerFactory);
eligibleFilesToDelete.add(fileToDelete);
Expand All @@ -1311,7 +1333,7 @@ private void executeStaleShardDelete(
}
}
// Deleting the shard blobs
deleteFromContainer(blobContainer(), eligibleFilesToDelete);
deleteFromContainer(rootBlobContainer(), eligibleFilesToDelete);
l.onResponse(null);
} catch (Exception e) {
logger.warn(
Expand Down Expand Up @@ -1474,7 +1496,7 @@ private List<String> resolveFilesToDelete(
Collection<ShardSnapshotMetaDeleteResult> deleteResults
) {
final String basePath = basePath().buildAsString();
final int basePathLen = basePath.length();
// final int basePathLen = basePath.length();
final Map<IndexId, Collection<String>> indexMetaGenerations = oldRepositoryData.indexMetaDataToRemoveAfterRemovingSnapshots(
snapshotIds
);
Expand All @@ -1484,10 +1506,11 @@ private List<String> resolveFilesToDelete(
}), indexMetaGenerations.entrySet().stream().flatMap(entry -> {
final String indexContainerPath = indexContainer(entry.getKey()).path().buildAsString();
return entry.getValue().stream().map(id -> indexContainerPath + INDEX_METADATA_FORMAT.blobName(id));
})).map(absolutePath -> {
assert absolutePath.startsWith(basePath);
return absolutePath.substring(basePathLen);
}).collect(Collectors.toList());
})).collect(Collectors.toList());
// .map(absolutePath -> {
// assert absolutePath.startsWith(basePath);
// return absolutePath.substring(basePathLen);
// }).collect(Collectors.toList());
}

/**
Expand Down Expand Up @@ -1963,7 +1986,13 @@ private BlobContainer shardContainer(IndexId indexId, ShardId shardId) {
}

public BlobContainer shardContainer(IndexId indexId, int shardId) {
return blobStore().blobContainer(indicesPath().add(indexId.getId()).add(Integer.toString(shardId)));
return shardContainer(indexId.getId(), String.valueOf(shardId));
}

private BlobContainer shardContainer(String indexId, String shardId) {
BasePathInput pathInput = SnapshotShardPathInput.builder().basePath(basePath()).indexUUID(indexId).shardId(shardId).build();
BlobPath shardPath = HASHED_PREFIX.path(pathInput, FNV_1A_COMPOSITE_1);
return blobStore().blobContainer(shardPath);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -914,6 +914,7 @@ protected void doRun() {
)
),
e -> {
logger.error("Clone failed during development", e);
logger.warn("Exception [{}] while trying to clone shard [{}]", e, repoShardId);
failCloneShardAndUpdateClusterState(target, sourceSnapshot, repoShardId);
}
Expand Down

0 comments on commit 27fcb40

Please sign in to comment.