Skip to content

Commit

Permalink
initial commits
Browse files Browse the repository at this point in the history
Signed-off-by: Sandeep Kumawat <[email protected]>
  • Loading branch information
skumawat2025 committed May 13, 2024
1 parent 0282e64 commit c20ae47
Show file tree
Hide file tree
Showing 5 changed files with 165 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ public class RemoteFsTranslog extends Translog {
private static final int SYNC_PERMIT = 1;
private final Semaphore syncPermit = new Semaphore(SYNC_PERMIT);
private final AtomicBoolean pauseSync = new AtomicBoolean(false);
boolean ckpAsMetadata;

public RemoteFsTranslog(
TranslogConfig config,
Expand All @@ -110,14 +111,16 @@ public RemoteFsTranslog(
this.startedPrimarySupplier = startedPrimarySupplier;
this.remoteTranslogTransferTracker = remoteTranslogTransferTracker;
fileTransferTracker = new FileTransferTracker(shardId, remoteTranslogTransferTracker);
ckpAsMetadata = true;
this.translogTransferManager = buildTranslogTransferManager(
blobStoreRepository,
threadPool,
shardId,
fileTransferTracker,
remoteTranslogTransferTracker,
indexSettings().getRemoteStorePathStrategy(),
remoteStoreSettings
remoteStoreSettings,
ckpAsMetadata
);
try {
download(translogTransferManager, location, logger);
Expand Down Expand Up @@ -288,7 +291,8 @@ public static TranslogTransferManager buildTranslogTransferManager(
FileTransferTracker fileTransferTracker,
RemoteTranslogTransferTracker tracker,
RemoteStorePathStrategy pathStrategy,
RemoteStoreSettings remoteStoreSettings
RemoteStoreSettings remoteStoreSettings,
boolean ckpAsMetadata
) {
assert Objects.nonNull(pathStrategy);
String indexUUID = shardId.getIndex().getUUID();
Expand All @@ -310,7 +314,16 @@ public static TranslogTransferManager buildTranslogTransferManager(
.build();
BlobPath mdPath = pathStrategy.generatePath(mdPathInput);
BlobStoreTransferService transferService = new BlobStoreTransferService(blobStoreRepository.blobStore(), threadPool);
return new TranslogTransferManager(shardId, transferService, dataPath, mdPath, fileTransferTracker, tracker, remoteStoreSettings);
return new TranslogTransferManager(
shardId,
transferService,
dataPath,
mdPath,
fileTransferTracker,
tracker,
remoteStoreSettings,
ckpAsMetadata
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Arrays;
import java.util.Map;
import java.util.Objects;

/**
Expand Down Expand Up @@ -108,6 +109,7 @@ public static class TransferFileSnapshot extends FileSnapshot {

private final long primaryTerm;
private Long checksum;
private Map<String, String> metadata;

public TransferFileSnapshot(Path path, long primaryTerm, Long checksum) throws IOException {
super(path);
Expand All @@ -128,6 +130,14 @@ public long getPrimaryTerm() {
return primaryTerm;
}

public void setMetadata(Map<String, String> metadata) {
this.metadata = metadata;
}

public Map<String, String> getMetadata() {
return metadata;
}

@Override
public int hashCode() {
return Objects.hash(primaryTerm, super.hashCode());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot;
import org.opensearch.index.translog.transfer.FileSnapshot.TranslogFileSnapshot;

import java.io.IOException;
import java.util.Set;

/**
Expand Down Expand Up @@ -39,4 +40,6 @@ public interface TransferSnapshot {
* @return the translog transfer metadata
*/
TranslogTransferMetadata getTranslogTransferMetadata();

Set<TransferFileSnapshot> getTranslogFileWithMetadataSnapshots() throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,15 @@

import java.io.Closeable;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Base64;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -64,6 +68,31 @@ public Set<TransferFileSnapshot> getTranslogFileSnapshots() {
return translogCheckpointFileInfoTupleSet.stream().map(Tuple::v1).collect(Collectors.toSet());
}

@Override
public Set<TransferFileSnapshot> getTranslogFileWithMetadataSnapshots() throws IOException {
Set<TransferFileSnapshot> toUploadList = new HashSet<>();
for (Tuple<TranslogFileSnapshot, CheckpointFileSnapshot> tuple : translogCheckpointFileInfoTupleSet) {
TranslogFileSnapshot translogFileSnapshot = tuple.v1();
CheckpointFileSnapshot checkpointFileSnapshot = tuple.v2();
translogFileSnapshot.setMetadata(buildMetadata(checkpointFileSnapshot.getPath()));
toUploadList.add(translogFileSnapshot);
}
return toUploadList;
}

public Map<String, String> buildMetadata(Path checkpointPath) throws IOException {
Map<String, String> metadata = new HashMap<>();
String ckpString = buildCheckpointDataAsBase64String(checkpointPath);
metadata.put("ckp-data", ckpString);
return metadata;
}

static String buildCheckpointDataAsBase64String(Path checkpointFilePath) throws IOException {
long fileSize = Files.size(checkpointFilePath);
byte[] fileBytes = Files.readAllBytes(checkpointFilePath);
return Base64.getEncoder().encodeToString(fileBytes);
}

@Override
public TranslogTransferMetadata getTranslogTransferMetadata() {
return new TranslogTransferMetadata(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.opensearch.common.SetOnce;
import org.opensearch.common.blobstore.BlobMetadata;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.blobstore.FetchBlobResult;
import org.opensearch.common.blobstore.stream.write.WritePriority;
import org.opensearch.common.io.VersionedCodecStreamWrapper;
import org.opensearch.common.io.stream.BytesStreamOutput;
Expand All @@ -36,6 +37,7 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Base64;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -63,6 +65,7 @@ public class TranslogTransferManager {
private final RemoteTranslogTransferTracker remoteTranslogTransferTracker;
private final RemoteStoreSettings remoteStoreSettings;
private static final int METADATA_FILES_TO_FETCH = 10;
boolean ckpAsMetadata;

private final Logger logger;

Expand All @@ -79,7 +82,8 @@ public TranslogTransferManager(
BlobPath remoteMetadataTransferPath,
FileTransferTracker fileTransferTracker,
RemoteTranslogTransferTracker remoteTranslogTransferTracker,
RemoteStoreSettings remoteStoreSettings
RemoteStoreSettings remoteStoreSettings,
boolean ckpAsMetadata
) {
this.shardId = shardId;
this.transferService = transferService;
Expand All @@ -89,6 +93,7 @@ public TranslogTransferManager(
this.logger = Loggers.getLogger(getClass(), shardId);
this.remoteTranslogTransferTracker = remoteTranslogTransferTracker;
this.remoteStoreSettings = remoteStoreSettings;
this.ckpAsMetadata = ckpAsMetadata;
}

public RemoteTranslogTransferTracker getRemoteTranslogTransferTracker() {
Expand All @@ -110,8 +115,12 @@ public boolean transferSnapshot(TransferSnapshot transferSnapshot, TranslogTrans
long prevUploadTimeInMillis = remoteTranslogTransferTracker.getTotalUploadTimeInMillis();

try {
toUpload.addAll(fileTransferTracker.exclusionFilter(transferSnapshot.getTranslogFileSnapshots()));
toUpload.addAll(fileTransferTracker.exclusionFilter((transferSnapshot.getCheckpointFileSnapshots())));
if (ckpAsMetadata) {
toUpload.addAll(fileTransferTracker.exclusionFilter(transferSnapshot.getTranslogFileWithMetadataSnapshots()));
} else {
toUpload.addAll(fileTransferTracker.exclusionFilter(transferSnapshot.getTranslogFileSnapshots()));
toUpload.addAll(fileTransferTracker.exclusionFilter((transferSnapshot.getCheckpointFileSnapshots())));
}
if (toUpload.isEmpty()) {
logger.trace("Nothing to upload for transfer");
return true;
Expand Down Expand Up @@ -236,15 +245,101 @@ public boolean downloadTranslog(String primaryTerm, String generation, Path loca
generation,
location
);
// Download Checkpoint file from remote to local FS
String ckpFileName = Translog.getCommitCheckpointFileName(Long.parseLong(generation));
downloadToFS(ckpFileName, location, primaryTerm);
// Download translog file from remote to local FS
String translogFilename = Translog.getFilename(Long.parseLong(generation));
downloadToFS(translogFilename, location, primaryTerm);
if (ckpAsMetadata == false) {
// Download Checkpoint file from remote to local FS
downloadToFS(ckpFileName, location, primaryTerm);
// Download translog file from remote to local FS
downloadToFS(translogFilename, location, primaryTerm);
} else {
// Download translog.tlog file with object metadata from remote to local FS
Map<String, String> metadata = downloadTranslogToFSAndGetMetadata(translogFilename, location, primaryTerm, generation);
try {
assert metadata != null && !metadata.isEmpty() && metadata.containsKey("ckp-data");
recoverCkpFileFromMetadata(metadata, location, generation, translogFilename);
} catch (Exception e) {
throw new IOException("Failed to recover checkpoint file from remote", e);
}
}
return true;
}

private Map<String, String> downloadTranslogToFSAndGetMetadata(String fileName, Path location, String primaryTerm, String generation)
throws IOException {
Path filePath = location.resolve(fileName);
// Here, we always override the existing file if present.
// We need to change this logic when we introduce incremental download
deleteFileIfExists(filePath);

boolean downloadStatus = false;
long bytesToRead = 0, downloadStartTime = System.nanoTime();
Map<String, String> metadata;

FetchBlobResult inputStreamWithMetadata = transferService.downloadBlobWithMetadata(
remoteDataTransferPath.add(primaryTerm),
fileName
);
try {
InputStream inputStream = inputStreamWithMetadata.getInputStream();
metadata = inputStreamWithMetadata.getMetadata();

bytesToRead = inputStream.available();
Files.copy(inputStream, filePath);
downloadStatus = true;

} finally {
remoteTranslogTransferTracker.addDownloadTimeInMillis((System.nanoTime() - downloadStartTime) / 1_000_000L);
if (downloadStatus) {
remoteTranslogTransferTracker.addDownloadBytesSucceeded(bytesToRead);
}
}

// Mark in FileTransferTracker so that the same files are not uploaded at the time of translog sync
fileTransferTracker.add(fileName, true);

return metadata;
}

/**
* Process the provided metadata and tries to write the content of the checkpoint (ckp) file to the FS.
*/
private void recoverCkpFileFromMetadata(Map<String, String> metadata, Path location, String generation, String fileName)
throws IOException {

boolean downloadStatus = false;
long bytesToRead = 0;
try {
String ckpFileName = Translog.getCommitCheckpointFileName(Long.parseLong(generation));
Path filePath = location.resolve(ckpFileName);
// Here, we always override the existing file if present.
deleteFileIfExists(filePath);

String ckpDataBase64 = metadata.get("ckp-data");
if (ckpDataBase64 == null) {
logger.error("Error processing metadata for translog file: {}", fileName);
throw new IllegalStateException(
"Checkpoint file data (key - ckp-data) is expected but not found in metadata for file: " + fileName
);
}
byte[] ckpFileBytes = Base64.getDecoder().decode(ckpDataBase64);
bytesToRead = ckpFileBytes.length;

Files.write(filePath, ckpFileBytes);
downloadStatus = true;
} finally {
if (downloadStatus) {
remoteTranslogTransferTracker.addDownloadBytesSucceeded(bytesToRead);
}
}
}

public void deleteFileIfExists(Path filePath) throws IOException {
if (Files.exists(filePath)) {
Files.delete(filePath);
}
}

private void downloadToFS(String fileName, Path location, String primaryTerm) throws IOException {
Path filePath = location.resolve(fileName);
// Here, we always override the existing file if present.
Expand Down Expand Up @@ -391,7 +486,11 @@ public void deleteGenerationAsync(long primaryTerm, Set<Long> generations, Runna
// Add .ckp and .tlog file to translog file list which is located in basePath/<primaryTerm>
String ckpFileName = Translog.getCommitCheckpointFileName(generation);
String translogFileName = Translog.getFilename(generation);
translogFiles.addAll(List.of(ckpFileName, translogFileName));
if (ckpAsMetadata == false) {
translogFiles.addAll(List.of(ckpFileName, translogFileName));
} else {
translogFiles.add(translogFileName);
}
});
// Delete the translog and checkpoint files asynchronously
deleteTranslogFilesAsync(primaryTerm, translogFiles, onCompletion);
Expand Down

0 comments on commit c20ae47

Please sign in to comment.