Skip to content

Commit

Permalink
address commens and add ut's
Browse files Browse the repository at this point in the history
Signed-off-by: Sandeep Kumawat <[email protected]>
  • Loading branch information
skumawat2025 committed May 14, 2024
1 parent fc368da commit 7586b7d
Show file tree
Hide file tree
Showing 22 changed files with 353 additions and 201 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -590,8 +590,8 @@ public void addRemoteStoreCustomMetadata(IndexMetadata.Builder tmpImdBuilder, bo
Map<String, String> remoteCustomData = new HashMap<>();

// Determine if the ckp would be stored as translog metadata
boolean isCkpAsTranslogMetadata = remoteStoreCustomMetadataResolver.isCkpAsTranslogMetadata();
remoteCustomData.put(RemoteStoreEnums.CKP_AS_METADATA, Boolean.toString(isCkpAsTranslogMetadata));
boolean isTranslogMetadataEnabled = remoteStoreCustomMetadataResolver.isTranslogMetadataEnabled();
remoteCustomData.put(RemoteStoreEnums.TRANSLOG_METADATA, Boolean.toString(isTranslogMetadataEnabled));

// Determine the path type for use using the remoteStorePathResolver.
RemoteStorePathStrategy newPathStrategy = remoteStoreCustomMetadataResolver.getPathStrategy();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@ default Map<Metric, Map<String, Long>> extendedStats() {
default void reload(RepositoryMetadata repositoryMetadata) {}

/**
* Returns a boolean indicating if blobStore support object metadata upload
* Returns a boolean indicating if blobStore has object metadata support enabled
*/
default boolean isBlobMetadataSupported() {
default boolean isBlobMetadataEnabled() {
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -740,7 +740,7 @@ public void apply(Settings value, Settings current, Settings previous) {
RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING,
RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING,
RemoteStoreSettings.CLUSTER_REMOTE_MAX_TRANSLOG_READERS,
RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_CKP_AS_METADATA
RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_TRANSLOG_METADATA
)
)
);
Expand Down
8 changes: 4 additions & 4 deletions server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -763,7 +763,7 @@ public static IndexMergePolicy fromString(String text) {
private final boolean widenIndexSortType;
private final boolean assignedOnRemoteNode;
private final RemoteStorePathStrategy remoteStorePathStrategy;
private final boolean ckpAsTranslogMetadata;
private final boolean isTranslogMetadataEnabled;

/**
* The maximum age of a retention lease before it is considered expired.
Expand Down Expand Up @@ -990,7 +990,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
assignedOnRemoteNode = RemoteStoreNodeAttribute.isRemoteDataAttributePresent(this.getNodeSettings());
remoteStorePathStrategy = RemoteStoreUtils.determineRemoteStorePathStrategy(indexMetadata);

ckpAsTranslogMetadata = RemoteStoreUtils.determineCkpAsTranslogMetadata(indexMetadata);
isTranslogMetadataEnabled = RemoteStoreUtils.determineisTranslogMetadataEnabled(indexMetadata);

setEnableFuzzySetForDocId(scopedSettings.get(INDEX_DOC_ID_FUZZY_SET_ENABLED_SETTING));
setDocIdFuzzySetFalsePositiveProbability(scopedSettings.get(INDEX_DOC_ID_FUZZY_SET_FALSE_POSITIVE_PROBABILITY_SETTING));
Expand Down Expand Up @@ -1915,7 +1915,7 @@ public RemoteStorePathStrategy getRemoteStorePathStrategy() {
return remoteStorePathStrategy;
}

public boolean isCkpAsTranslogMetadata() {
return ckpAsTranslogMetadata;
public boolean isTranslogMetadataEnabled() {
return isTranslogMetadataEnabled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ public static boolean indexHasRemoteCustomMetadata(IndexMetadata indexMetadata)
Map<String, String> customMetadata = indexMetadata.getCustomData(REMOTE_STORE_CUSTOM_KEY);
return Objects.nonNull(customMetadata)
&& Objects.nonNull(customMetadata.get(PathType.NAME))
&& Objects.nonNull(customMetadata.get(RemoteStoreEnums.CKP_AS_METADATA));
&& Objects.nonNull(customMetadata.get(RemoteStoreEnums.TRANSLOG_METADATA));
}

public static void updateRemoteStoreSettings(Settings.Builder settingsBuilder, String segmentRepository, String translogRepository) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ public RemoteStorePathStrategy getPathStrategy() {
return new RemoteStorePathStrategy(pathType, pathHashAlgorithm);
}

public boolean isCkpAsTranslogMetadata() {
return Version.CURRENT.compareTo(minNodeVersionSupplier.get()) <= 0 && remoteStoreSettings.isCkpAsTranslogMetadata();
public boolean isTranslogMetadataEnabled() {
return Version.V_2_15_0.compareTo(minNodeVersionSupplier.get()) <= 0 && remoteStoreSettings.isTranslogMetadataEnabled();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
@ExperimentalApi
public class RemoteStoreEnums {

public static final String CKP_AS_METADATA = "ckp-as-metadata";
public static final String TRANSLOG_METADATA = "translog-metadata";

/**
* Categories of the data in Remote store.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@

import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING;
import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING;
import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_CKP_AS_METADATA;
import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_TRANSLOG_METADATA;

/**
* Utils for remote store
Expand Down Expand Up @@ -185,11 +185,11 @@ public static RemoteStorePathStrategy determineRemoteStorePathStrategy(IndexMeta
/**
* Determines whether translog ckp upload as metadata allowed or not
*/
public static boolean determineCkpAsTranslogMetadata(IndexMetadata indexMetadata) {
public static boolean determineisTranslogMetadataEnabled(IndexMetadata indexMetadata) {
Map<String, String> remoteCustomData = indexMetadata.getCustomData(IndexMetadata.REMOTE_STORE_CUSTOM_KEY);
assert remoteCustomData == null || remoteCustomData.containsKey(RemoteStoreEnums.CKP_AS_METADATA);
if (remoteCustomData != null && remoteCustomData.containsKey(RemoteStoreEnums.CKP_AS_METADATA)) {
return Boolean.parseBoolean(remoteCustomData.get(RemoteStoreEnums.CKP_AS_METADATA));
assert remoteCustomData == null || remoteCustomData.containsKey(RemoteStoreEnums.TRANSLOG_METADATA);
if (remoteCustomData != null && remoteCustomData.containsKey(RemoteStoreEnums.TRANSLOG_METADATA)) {
return Boolean.parseBoolean(remoteCustomData.get(RemoteStoreEnums.TRANSLOG_METADATA));
}
return false;
}
Expand All @@ -209,8 +209,8 @@ public static Map<String, String> determineRemoteStoreCustomMetadataDuringMigrat
Version minNodeVersion = discoveryNodes.getMinNodeVersion();

boolean ckpAsMetadata = Version.CURRENT.compareTo(minNodeVersion) <= 0
&& CLUSTER_REMOTE_STORE_TRANSLOG_CKP_AS_METADATA.get(clusterSettings);
remoteCustomData.put(RemoteStoreEnums.CKP_AS_METADATA, Boolean.toString(ckpAsMetadata));
&& CLUSTER_REMOTE_STORE_TRANSLOG_TRANSLOG_METADATA.get(clusterSettings);
remoteCustomData.put(RemoteStoreEnums.TRANSLOG_METADATA, Boolean.toString(ckpAsMetadata));

RemoteStoreEnums.PathType pathType = Version.CURRENT.compareTo(minNodeVersion) <= 0
? CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING.get(clusterSettings)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4982,7 +4982,7 @@ public void deleteTranslogFilesFromRemoteTranslog() throws IOException {
getThreadPool(),
indexSettings.getRemoteStorePathStrategy(),
remoteStoreSettings,
indexSettings().isCkpAsTranslogMetadata()
indexSettings().isTranslogMetadataEnabled()
);
}

Expand All @@ -5009,7 +5009,7 @@ public void syncTranslogFilesFromRemoteTranslog() throws IOException {
remoteStoreSettings,
logger,
shouldSeedRemoteStore(),
indexSettings().isCkpAsTranslogMetadata()
indexSettings().isTranslogMetadataEnabled()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public class RemoteFsTranslog extends Translog {
private static final int SYNC_PERMIT = 1;
private final Semaphore syncPermit = new Semaphore(SYNC_PERMIT);
private final AtomicBoolean pauseSync = new AtomicBoolean(false);
boolean ckpAsTranslogMetadata;
boolean isTranslogMetadataEnabled;

public RemoteFsTranslog(
TranslogConfig config,
Expand All @@ -111,8 +111,7 @@ public RemoteFsTranslog(
this.startedPrimarySupplier = startedPrimarySupplier;
this.remoteTranslogTransferTracker = remoteTranslogTransferTracker;
fileTransferTracker = new FileTransferTracker(shardId, remoteTranslogTransferTracker);
ckpAsTranslogMetadata = isCkpAsTranslogMetadata(indexSettings().isCkpAsTranslogMetadata(), blobStoreRepository);
;
isTranslogMetadataEnabled = isTranslogMetadataEnabled(indexSettings().isTranslogMetadataEnabled(), blobStoreRepository);
this.translogTransferManager = buildTranslogTransferManager(
blobStoreRepository,
threadPool,
Expand All @@ -121,7 +120,7 @@ public RemoteFsTranslog(
remoteTranslogTransferTracker,
indexSettings().getRemoteStorePathStrategy(),
remoteStoreSettings,
ckpAsTranslogMetadata
isTranslogMetadataEnabled
);
try {
download(translogTransferManager, location, logger, config.shouldSeedRemote());
Expand Down Expand Up @@ -160,8 +159,8 @@ public RemoteFsTranslog(
}
}

private static boolean isCkpAsTranslogMetadata(boolean ckpAsTranslogMetadata, BlobStoreRepository blobStoreRepository) {
return blobStoreRepository.blobStore().isBlobMetadataSupported() && ckpAsTranslogMetadata;
private static boolean isTranslogMetadataEnabled(boolean isTranslogMetadataEnabled, BlobStoreRepository blobStoreRepository) {
return blobStoreRepository.blobStore().isBlobMetadataEnabled() && isTranslogMetadataEnabled;
}

// visible for testing
Expand All @@ -178,7 +177,7 @@ public static void download(
RemoteStoreSettings remoteStoreSettings,
Logger logger,
boolean seedRemote,
boolean ckpAsTranslogMetadata
boolean isTranslogMetadataEnabled
) throws IOException {
assert repository instanceof BlobStoreRepository : String.format(
Locale.ROOT,
Expand All @@ -188,7 +187,7 @@ public static void download(
BlobStoreRepository blobStoreRepository = (BlobStoreRepository) repository;
// We use a dummy stats tracker to ensure the flow doesn't break.
// TODO: To be revisited as part of https://github.com/opensearch-project/OpenSearch/issues/7567
ckpAsTranslogMetadata = isCkpAsTranslogMetadata(ckpAsTranslogMetadata, blobStoreRepository);
isTranslogMetadataEnabled = isTranslogMetadataEnabled(isTranslogMetadataEnabled, blobStoreRepository);
RemoteTranslogTransferTracker remoteTranslogTransferTracker = new RemoteTranslogTransferTracker(shardId, 1000);
FileTransferTracker fileTransferTracker = new FileTransferTracker(shardId, remoteTranslogTransferTracker);
TranslogTransferManager translogTransferManager = buildTranslogTransferManager(
Expand All @@ -199,7 +198,7 @@ public static void download(
remoteTranslogTransferTracker,
pathStrategy,
remoteStoreSettings,
ckpAsTranslogMetadata
isTranslogMetadataEnabled
);
RemoteFsTranslog.download(translogTransferManager, location, logger, seedRemote);
logger.trace(remoteTranslogTransferTracker.toString());
Expand Down Expand Up @@ -305,7 +304,7 @@ public static TranslogTransferManager buildTranslogTransferManager(
RemoteTranslogTransferTracker tracker,
RemoteStorePathStrategy pathStrategy,
RemoteStoreSettings remoteStoreSettings,
boolean ckpAsTranslogMetadata
boolean isTranslogMetadataEnabled
) {
assert Objects.nonNull(pathStrategy);
String indexUUID = shardId.getIndex().getUUID();
Expand Down Expand Up @@ -335,7 +334,7 @@ public static TranslogTransferManager buildTranslogTransferManager(
fileTransferTracker,
tracker,
remoteStoreSettings,
ckpAsTranslogMetadata
isTranslogMetadataEnabled
);
}

Expand Down Expand Up @@ -614,13 +613,13 @@ public static void cleanup(
ThreadPool threadPool,
RemoteStorePathStrategy pathStrategy,
RemoteStoreSettings remoteStoreSettings,
boolean ckpAsTranslogMetadata
boolean isTranslogMetadataEnabled
) throws IOException {
assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository";
BlobStoreRepository blobStoreRepository = (BlobStoreRepository) repository;
// We use a dummy stats tracker to ensure the flow doesn't break.
// TODO: To be revisited as part of https://github.com/opensearch-project/OpenSearch/issues/7567
ckpAsTranslogMetadata = isCkpAsTranslogMetadata(ckpAsTranslogMetadata, blobStoreRepository);
isTranslogMetadataEnabled = isTranslogMetadataEnabled(isTranslogMetadataEnabled, blobStoreRepository);
RemoteTranslogTransferTracker remoteTranslogTransferTracker = new RemoteTranslogTransferTracker(shardId, 1000);
FileTransferTracker fileTransferTracker = new FileTransferTracker(shardId, remoteTranslogTransferTracker);
TranslogTransferManager translogTransferManager = buildTranslogTransferManager(
Expand All @@ -631,7 +630,7 @@ public static void cleanup(
remoteTranslogTransferTracker,
pathStrategy,
remoteStoreSettings,
ckpAsTranslogMetadata
isTranslogMetadataEnabled
);
// clean up all remote translog files
translogTransferManager.deleteTranslogFiles();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,29 +95,34 @@ public void uploadBlobs(
Set<TransferFileSnapshot> fileSnapshots,
final Map<Long, BlobPath> blobPaths,
ActionListener<TransferFileSnapshot> listener,
WritePriority writePriority,
final Map<TransferFileSnapshot, InputStream> transferFileMetadata
WritePriority writePriority
) {
fileSnapshots.forEach(fileSnapshot -> {
BlobPath blobPath = blobPaths.get(fileSnapshot.getPrimaryTerm());
InputStream fileMetadata = transferFileMetadata.get(fileSnapshot);
if (!(blobStore.blobContainer(blobPath) instanceof AsyncMultiStreamBlobContainer)) {
uploadBlob(ThreadPool.Names.TRANSLOG_TRANSFER, fileSnapshot, blobPath, listener, writePriority);
} else {
uploadBlob(fileSnapshot, fileMetadata, listener, blobPath, writePriority);
uploadBlob(fileSnapshot, listener, blobPath, writePriority);
}
});

}

public Map<String, String> buildTransferFileMetadata(InputStream fileMetadata) throws IOException {
// this function creates metadata of checkpoint file data to be associated with translog file.
static Map<String, String> buildTransferFileMetadata(InputStream metadataInputStream) throws IOException {
Map<String, String> metadata = new HashMap<>();
try (fileMetadata; ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) {
byte[] buffer = new byte[4096];
try (metadataInputStream) {
byte[] buffer = new byte[128];
int bytesRead;
int totalBytesRead = 0;
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();

while ((bytesRead = fileMetadata.read(buffer)) != -1) {
while ((bytesRead = metadataInputStream.read(buffer)) != -1) {
byteArrayOutputStream.write(buffer, 0, bytesRead);
totalBytesRead += bytesRead;
if (totalBytesRead > 1024) {
throw new AssertionError("Input stream exceeds 1KB limit");
}
}

byte[] bytes = byteArrayOutputStream.toByteArray();
Expand All @@ -129,7 +134,6 @@ public Map<String, String> buildTransferFileMetadata(InputStream fileMetadata) t

private void uploadBlob(
TransferFileSnapshot fileSnapshot,
InputStream fileMetadata,
ActionListener<TransferFileSnapshot> listener,
BlobPath blobPath,
WritePriority writePriority
Expand All @@ -138,9 +142,10 @@ private void uploadBlob(
try {
ChannelFactory channelFactory = FileChannel::open;
Map<String, String> metadata = null;
if (fileMetadata != null) {
metadata = buildTransferFileMetadata(fileMetadata);
if (fileSnapshot.getMetadataFileInputStream() != null) {
metadata = buildTransferFileMetadata(fileSnapshot.getMetadataFileInputStream());
}

long contentLength;
try (FileChannel channel = channelFactory.open(fileSnapshot.getPath(), StandardOpenOption.READ)) {
contentLength = channel.size();
Expand Down Expand Up @@ -198,7 +203,7 @@ public InputStream downloadBlob(Iterable<String> path, String fileName) throws I
@ExperimentalApi
@Override
public FetchBlobResult downloadBlobWithMetadata(Iterable<String> path, String fileName) throws IOException {
assert blobStore.isBlobMetadataSupported();
assert blobStore.isBlobMetadataEnabled();
return blobStore.blobContainer((BlobPath) path).readBlobWithMetadata(fileName);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ public static class TransferFileSnapshot extends FileSnapshot {

private final long primaryTerm;
private Long checksum;
@Nullable
private InputStream metadataFileInputStream;

public TransferFileSnapshot(Path path, long primaryTerm, Long checksum) throws IOException {
super(path);
Expand All @@ -128,6 +130,14 @@ public long getPrimaryTerm() {
return primaryTerm;
}

public void setMetadataFileInputStream(InputStream inputStream) {
this.metadataFileInputStream = inputStream;
}

public InputStream getMetadataFileInputStream() {
return metadataFileInputStream;
}

@Override
public int hashCode() {
return Objects.hash(primaryTerm, super.hashCode());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@ void uploadBlobs(
Set<TransferFileSnapshot> fileSnapshots,
final Map<Long, BlobPath> blobPaths,
ActionListener<TransferFileSnapshot> listener,
WritePriority writePriority,
final Map<TransferFileSnapshot, InputStream> transferFileMetadata
WritePriority writePriority
) throws Exception;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot;
import org.opensearch.index.translog.transfer.FileSnapshot.TranslogFileSnapshot;

import java.util.Map;
import java.io.IOException;
import java.util.Set;

/**
Expand Down Expand Up @@ -42,8 +42,8 @@ public interface TransferSnapshot {
TranslogTransferMetadata getTranslogTransferMetadata();

/**
* The map of translog to checkpoint file snapshot of this {@link TransferSnapshot}
* @return the map of translog and checkpoint file snapshot
* The snapshot of the translog generational files having checkpoint file inputStream as metadata
* @return the set of translog files having checkpoint file inputStream as metadata.
*/
Map<TransferFileSnapshot, TransferFileSnapshot> getTranslogCheckpointSnapshotMap();
Set<TransferFileSnapshot> getTranslogFileSnapshotWithMetadata() throws IOException;
}
Loading

0 comments on commit 7586b7d

Please sign in to comment.