Skip to content

Commit

Permalink
Change RemoteSegmentStoreDirectory init at given timestamp to ignore …
Browse files Browse the repository at this point in the history
…pinned timestamp setting (#15457)

Signed-off-by: Sachin Kale <[email protected]>
Co-authored-by: Sachin Kale <[email protected]>
  • Loading branch information
sachinpkale and Sachin Kale authored Aug 29, 2024
1 parent b3d5874 commit 30ed15d
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -391,15 +391,24 @@ public static boolean isSwitchToStrictCompatibilityMode(ClusterUpdateSettingsReq
* @param pinnedTimestampSet A set of timestamps representing pinned points in time.
* @param getTimestampFunction A function that extracts the timestamp from a metadata file name.
* @param prefixFunction A function that extracts a tuple of prefix information from a metadata file name.
* @param ignorePinnedTimestampEnabledSetting A flag to ignore pinned timestamp enabled setting
* @return A set of metadata file names that are implicitly locked based on the pinned timestamps.
*/
public static Set<String> getPinnedTimestampLockedFiles(
List<String> metadataFiles,
Set<Long> pinnedTimestampSet,
Function<String, Long> getTimestampFunction,
Function<String, Tuple<String, String>> prefixFunction
Function<String, Tuple<String, String>> prefixFunction,
boolean ignorePinnedTimestampEnabledSetting
) {
return getPinnedTimestampLockedFiles(metadataFiles, pinnedTimestampSet, new HashMap<>(), getTimestampFunction, prefixFunction);
return getPinnedTimestampLockedFiles(
metadataFiles,
pinnedTimestampSet,
new HashMap<>(),
getTimestampFunction,
prefixFunction,
ignorePinnedTimestampEnabledSetting
);
}

/**
Expand Down Expand Up @@ -431,10 +440,28 @@ public static Set<String> getPinnedTimestampLockedFiles(
Map<Long, String> metadataFilePinnedTimestampMap,
Function<String, Long> getTimestampFunction,
Function<String, Tuple<String, String>> prefixFunction
) {
return getPinnedTimestampLockedFiles(
metadataFiles,
pinnedTimestampSet,
metadataFilePinnedTimestampMap,
getTimestampFunction,
prefixFunction,
false
);
}

private static Set<String> getPinnedTimestampLockedFiles(
List<String> metadataFiles,
Set<Long> pinnedTimestampSet,
Map<Long, String> metadataFilePinnedTimestampMap,
Function<String, Long> getTimestampFunction,
Function<String, Tuple<String, String>> prefixFunction,
boolean ignorePinnedTimestampEnabledSetting
) {
Set<String> implicitLockedFiles = new HashSet<>();

if (RemoteStoreSettings.isPinnedTimestampsEnabled() == false) {
if (ignorePinnedTimestampEnabledSetting == false && RemoteStoreSettings.isPinnedTimestampsEnabled() == false) {
return implicitLockedFiles;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,8 @@ public RemoteSegmentMetadata initializeToSpecificTimestamp(long timestamp) throw
metadataFiles,
Set.of(timestamp),
MetadataFilenameUtils::getTimestamp,
MetadataFilenameUtils::getNodeIdByPrimaryTermAndGen
MetadataFilenameUtils::getNodeIdByPrimaryTermAndGen,
true
);
if (lockedMetadataFiles.isEmpty()) {
return null;
Expand Down
3 changes: 2 additions & 1 deletion server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,7 @@
import static org.opensearch.env.NodeEnvironment.collectFileCacheDataPath;
import static org.opensearch.index.ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED_ATTRIBUTE_KEY;
import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteDataAttributePresent;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreAttributePresent;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled;

Expand Down Expand Up @@ -814,7 +815,7 @@ protected Node(
remoteClusterStateCleanupManager = null;
}
final RemoteStorePinnedTimestampService remoteStorePinnedTimestampService;
if (isRemoteStoreAttributePresent(settings) && CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED.get(settings)) {
if (isRemoteDataAttributePresent(settings) && CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED.get(settings)) {
remoteStorePinnedTimestampService = new RemoteStorePinnedTimestampService(
repositoriesServiceReference::get,
settings,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1141,6 +1141,62 @@ public void testMetadataFileNameOrder() {
assertEquals(14, count);
}

public void testInitializeToSpecificTimestampNoMetadataFiles() throws IOException {
when(
remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder(
RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX,
Integer.MAX_VALUE
)
).thenReturn(new ArrayList<>());
assertNull(remoteSegmentStoreDirectory.initializeToSpecificTimestamp(1234L));
}

public void testInitializeToSpecificTimestampNoMdMatchingTimestamp() throws IOException {
String metadataPrefix = "metadata__1__2__3__4__5__";
List<String> metadataFiles = new ArrayList<>();
metadataFiles.add(metadataPrefix + RemoteStoreUtils.invertLong(2000));
metadataFiles.add(metadataPrefix + RemoteStoreUtils.invertLong(3000));
metadataFiles.add(metadataPrefix + RemoteStoreUtils.invertLong(4000));

when(
remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder(
RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX,
Integer.MAX_VALUE
)
).thenReturn(metadataFiles);
assertNull(remoteSegmentStoreDirectory.initializeToSpecificTimestamp(1234L));
}

public void testInitializeToSpecificTimestampMatchingMdFile() throws IOException {
String metadataPrefix = "metadata__1__2__3__4__5__";
List<String> metadataFiles = new ArrayList<>();
metadataFiles.add(metadataPrefix + RemoteStoreUtils.invertLong(1000));
metadataFiles.add(metadataPrefix + RemoteStoreUtils.invertLong(2000));
metadataFiles.add(metadataPrefix + RemoteStoreUtils.invertLong(3000));

Map<String, String> metadata = new HashMap<>();
metadata.put("_0.cfe", "_0.cfe::_0.cfe__" + UUIDs.base64UUID() + "::1234::512::" + Version.LATEST.major);
metadata.put("_0.cfs", "_0.cfs::_0.cfs__" + UUIDs.base64UUID() + "::2345::1024::" + Version.LATEST.major);

when(
remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder(
RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX,
Integer.MAX_VALUE
)
).thenReturn(metadataFiles);
when(remoteMetadataDirectory.getBlobStream(metadataPrefix + RemoteStoreUtils.invertLong(1000))).thenReturn(
createMetadataFileBytes(metadata, indexShard.getLatestReplicationCheckpoint(), segmentInfos)
);

RemoteSegmentMetadata remoteSegmentMetadata = remoteSegmentStoreDirectory.initializeToSpecificTimestamp(1234L);
assertNotNull(remoteSegmentMetadata);
Map<String, RemoteSegmentStoreDirectory.UploadedSegmentMetadata> uploadedSegments = remoteSegmentStoreDirectory
.getSegmentsUploadedToRemoteStore();
assertEquals(2, uploadedSegments.size());
assertTrue(uploadedSegments.containsKey("_0.cfe"));
assertTrue(uploadedSegments.containsKey("_0.cfs"));
}

private static class WrapperIndexOutput extends IndexOutput {
public IndexOutput indexOutput;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@

package org.opensearch.index.store;

import org.apache.lucene.util.Version;
import org.opensearch.common.UUIDs;
import org.opensearch.common.blobstore.BlobMetadata;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.blobstore.support.PlainBlobMetadata;
Expand All @@ -18,8 +16,6 @@
import org.opensearch.core.action.ActionListener;
import org.opensearch.gateway.remote.model.RemotePinnedTimestamps;
import org.opensearch.gateway.remote.model.RemoteStorePinnedTimestampsBlobStore;
import org.opensearch.index.remote.RemoteStoreUtils;
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata;
import org.opensearch.index.translog.transfer.BlobStoreTransferService;
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.node.Node;
Expand All @@ -31,7 +27,6 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
Expand All @@ -40,7 +35,6 @@
import org.mockito.Mockito;

import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED;
import static org.opensearch.test.RemoteStoreTestUtils.createMetadataFileBytes;
import static org.hamcrest.CoreMatchers.is;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.anyInt;
Expand Down Expand Up @@ -143,62 +137,6 @@ private void metadataWithOlderTimestamp() {
);
}

public void testInitializeToSpecificTimestampNoMetadataFiles() throws IOException {
when(
remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder(
RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX,
Integer.MAX_VALUE
)
).thenReturn(new ArrayList<>());
assertNull(remoteSegmentStoreDirectory.initializeToSpecificTimestamp(1234L));
}

public void testInitializeToSpecificTimestampNoMdMatchingTimestamp() throws IOException {
String metadataPrefix = "metadata__1__2__3__4__5__";
List<String> metadataFiles = new ArrayList<>();
metadataFiles.add(metadataPrefix + RemoteStoreUtils.invertLong(2000));
metadataFiles.add(metadataPrefix + RemoteStoreUtils.invertLong(3000));
metadataFiles.add(metadataPrefix + RemoteStoreUtils.invertLong(4000));

when(
remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder(
RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX,
Integer.MAX_VALUE
)
).thenReturn(metadataFiles);
assertNull(remoteSegmentStoreDirectory.initializeToSpecificTimestamp(1234L));
}

public void testInitializeToSpecificTimestampMatchingMdFile() throws IOException {
String metadataPrefix = "metadata__1__2__3__4__5__";
List<String> metadataFiles = new ArrayList<>();
metadataFiles.add(metadataPrefix + RemoteStoreUtils.invertLong(1000));
metadataFiles.add(metadataPrefix + RemoteStoreUtils.invertLong(2000));
metadataFiles.add(metadataPrefix + RemoteStoreUtils.invertLong(3000));

Map<String, String> metadata = new HashMap<>();
metadata.put("_0.cfe", "_0.cfe::_0.cfe__" + UUIDs.base64UUID() + "::1234::512::" + Version.LATEST.major);
metadata.put("_0.cfs", "_0.cfs::_0.cfs__" + UUIDs.base64UUID() + "::2345::1024::" + Version.LATEST.major);

when(
remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder(
RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX,
Integer.MAX_VALUE
)
).thenReturn(metadataFiles);
when(remoteMetadataDirectory.getBlobStream(metadataPrefix + RemoteStoreUtils.invertLong(1000))).thenReturn(
createMetadataFileBytes(metadata, indexShard.getLatestReplicationCheckpoint(), segmentInfos)
);

RemoteSegmentMetadata remoteSegmentMetadata = remoteSegmentStoreDirectory.initializeToSpecificTimestamp(1234L);
assertNotNull(remoteSegmentMetadata);
Map<String, RemoteSegmentStoreDirectory.UploadedSegmentMetadata> uploadedSegments = remoteSegmentStoreDirectory
.getSegmentsUploadedToRemoteStore();
assertEquals(2, uploadedSegments.size());
assertTrue(uploadedSegments.containsKey("_0.cfe"));
assertTrue(uploadedSegments.containsKey("_0.cfs"));
}

public void testDeleteStaleCommitsNoPinnedTimestampMdFilesLatest() throws Exception {
metadataFilename = RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename(
12,
Expand Down

0 comments on commit 30ed15d

Please sign in to comment.