Skip to content

Commit

Permalink
Re-factor some methods
Browse files Browse the repository at this point in the history
Signed-off-by: Sachin Kale <[email protected]>
  • Loading branch information
Sachin Kale committed Aug 2, 2024
1 parent 61f8c26 commit 755f393
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,12 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -181,12 +183,19 @@ public RemoteSegmentMetadata initializeToSpecificCommit(long primaryTerm, long c
return remoteSegmentMetadata;
}

/**
* Initializes the remote segment metadata to a specific timestamp.
*
* @param timestamp The timestamp to initialize the remote segment metadata to.
* @return The RemoteSegmentMetadata object corresponding to the specified timestamp, or null if no metadata file is found for that timestamp.
* @throws IOException If an I/O error occurs while reading the metadata file.
*/
public RemoteSegmentMetadata initializeToSpecificTimestamp(long timestamp) throws IOException {
List<String> metadataFiles = remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder(
MetadataFilenameUtils.METADATA_PREFIX,
Integer.MAX_VALUE
);
Set<String> lockedMetadataFiles = getImplicitLockedFiles(metadataFiles, Set.of(timestamp));
Set<String> lockedMetadataFiles = getPinnedTimestampLockedFiles(metadataFiles, Set.of(timestamp));
if (lockedMetadataFiles.isEmpty()) {
return null;
}
Expand Down Expand Up @@ -824,7 +833,7 @@ public void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOException

// ToDo: fetch pinned timestamps along with last fetch status of pinned timestamps. If stale, return.
Set<Long> pinnedTimestamps = new HashSet<>();
Set<String> implicitLockedFiles = getImplicitLockedFiles(metadataFilesEligibleToDelete, pinnedTimestamps);
Set<String> implicitLockedFiles = getPinnedTimestampLockedFiles(metadataFilesEligibleToDelete, pinnedTimestamps);
final Set<String> allLockFiles = new HashSet<>(implicitLockedFiles);

try {
Expand Down Expand Up @@ -899,56 +908,77 @@ public void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOException
logger.debug("deletedSegmentFiles={}", deletedSegmentFiles);
}

// Visible for testing
Set<String> getImplicitLockedFiles(List<String> metadataFiles, Set<Long> pinnedTimstampSet) {
/**
* Determines and returns a set of metadata files that match provided pinned timestamps.
*
* This method identifies metadata files that are considered implicitly locked due to their timestamps
* matching or being the closest preceding timestamp to the pinned timestamps. It uses a caching mechanism
* to improve performance for previously processed timestamps.
*
* @param metadataFiles A list of metadata file names. Expected to be sorted in descending order of timestamp.
* @param pinnedTimestampSet A set of timestamps representing pinned points in time.
* @return A set of metadata file names that are implicitly locked based on the pinned timestamps.
*
* @implSpec The method performs the following steps:
* 1. Validates input parameters.
* 2. Updates the cache (metadataFilePinnedTimestampMap) to remove outdated entries.
* 3. Processes cached entries and identifies new timestamps to process.
* 4. For new timestamps, iterates through metadata files to find matching or closest preceding files.
* 5. Updates the cache with newly processed timestamps and their corresponding metadata files.
*
* @implNote The method currently sorts the metadata files, but this may be unnecessary if files are
* already sorted when fetched from the remote store.
*/
Set<String> getPinnedTimestampLockedFiles(List<String> metadataFiles, Set<Long> pinnedTimestampSet) {
Set<String> implicitLockedFiles = new HashSet<>();

if (metadataFiles == null || metadataFiles.isEmpty() || pinnedTimstampSet == null) {
if (metadataFiles == null || metadataFiles.isEmpty() || pinnedTimestampSet == null) {
return implicitLockedFiles;
}
// Remove entries for timestamps that are no longer pinned
metadataFilePinnedTimestampMap.keySet().retainAll(pinnedTimstampSet);

// If metadata file is already known for given timestamp, fetch it from cache
for (Long pinnedTimestamp : pinnedTimstampSet) {
if (metadataFilePinnedTimestampMap.containsKey(pinnedTimestamp)) {
implicitLockedFiles.add(metadataFilePinnedTimestampMap.get(pinnedTimestamp));
// Remove entries for timestamps that are no longer pinned
metadataFilePinnedTimestampMap.keySet().retainAll(pinnedTimestampSet);

// Add cached entries and collect new timestamps
Set<Long> newPinnedTimestamps = new TreeSet<>(Collections.reverseOrder());
for (Long pinnedTimestamp : pinnedTimestampSet) {
String cachedFile = metadataFilePinnedTimestampMap.get(pinnedTimestamp);
if (cachedFile != null) {
implicitLockedFiles.add(cachedFile);
} else {
newPinnedTimestamps.add(pinnedTimestamp);
}
}

// Remove already cached key and sort the new pinned timestamps in descending order
List<Long> pinnedTimestamps = new ArrayList<>(pinnedTimstampSet);
pinnedTimestamps.removeAll(metadataFilePinnedTimestampMap.keySet());
if (pinnedTimstampSet.isEmpty()) {
if (newPinnedTimestamps.isEmpty()) {
return implicitLockedFiles;
}
pinnedTimestamps.sort(Collections.reverseOrder(Long::compare));

// Sort metadata files in descending order
// Sort metadata files in descending order of timestamp
// ToDo: Do we really need this? Files fetched from remote store are already lexicographically sorted.
metadataFiles.sort(String::compareTo);

int cursor = 0;
Iterator<Long> timestampIterator = newPinnedTimestamps.iterator();
Long currentPinnedTimestamp = timestampIterator.next();
long prevMdTimestamp = Long.MAX_VALUE;
for (int i = 0; i < metadataFiles.size(); i++) {
String metadataFileName = metadataFiles.get(i);

for (String metadataFileName : metadataFiles) {
long currentMdTimestamp = MetadataFilenameUtils.getTimestamp(metadataFileName.split(SEPARATOR));
long pinnedTimestamp = pinnedTimestamps.get(cursor);
if (currentMdTimestamp <= pinnedTimestamp && prevMdTimestamp > pinnedTimestamp) {

while (currentMdTimestamp <= currentPinnedTimestamp && prevMdTimestamp > currentPinnedTimestamp) {
implicitLockedFiles.add(metadataFileName);
// Do not cache entry for latest metadata file as the next metadata can also match the same pinned timestamp
if (prevMdTimestamp != Long.MAX_VALUE) {
metadataFilePinnedTimestampMap.put(pinnedTimestamps.get(cursor), metadataFileName);
metadataFilePinnedTimestampMap.put(currentPinnedTimestamp, metadataFileName);
}
cursor++;
if (cursor >= pinnedTimestamps.size()) {
break;
if (timestampIterator.hasNext() == false) {
return implicitLockedFiles;
}
i--;
} else {
prevMdTimestamp = currentMdTimestamp;
currentPinnedTimestamp = timestampIterator.next();
}
prevMdTimestamp = currentMdTimestamp;
}

return implicitLockedFiles;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1144,35 +1144,35 @@ public void testMetadataFileNameOrder() {
assertEquals(14, count);
}

public void testGetImplicitLockedFilesWithEmptyMetadataFiles() {
public void testGetPinnedTimestampLockedFilesWithEmptyMetadataFiles() {
List<String> metadataFiles = Collections.emptyList();
Set<Long> pinnedTimestampSet = new HashSet<>(Arrays.asList(1L, 2L, 3L));
Set<String> implicitLockedFiles = remoteSegmentStoreDirectory.getImplicitLockedFiles(metadataFiles, pinnedTimestampSet);
Set<String> implicitLockedFiles = remoteSegmentStoreDirectory.getPinnedTimestampLockedFiles(metadataFiles, pinnedTimestampSet);
assertTrue(implicitLockedFiles.isEmpty());
}

public void testGetImplicitLockedFilesWithNoPinnedTimestamps() {
public void testGetPinnedTimestampLockedFilesWithNoPinnedTimestamps() {
List<String> metadataFiles = Arrays.asList("file1.txt", "file2.txt", "file3.txt");
Set<Long> pinnedTimestampSet = Collections.emptySet();
Set<String> implicitLockedFiles = remoteSegmentStoreDirectory.getImplicitLockedFiles(metadataFiles, pinnedTimestampSet);
Set<String> implicitLockedFiles = remoteSegmentStoreDirectory.getPinnedTimestampLockedFiles(metadataFiles, pinnedTimestampSet);
assertTrue(implicitLockedFiles.isEmpty());
}

public void testGetImplicitLockedFilesWithNullMetadataFiles() {
public void testGetPinnedTimestampLockedFilesWithNullMetadataFiles() {
List<String> metadataFiles = null;
Set<Long> pinnedTimestampSet = new HashSet<>(Arrays.asList(1L, 2L, 3L));
Set<String> implicitLockedFiles = remoteSegmentStoreDirectory.getImplicitLockedFiles(metadataFiles, pinnedTimestampSet);
Set<String> implicitLockedFiles = remoteSegmentStoreDirectory.getPinnedTimestampLockedFiles(metadataFiles, pinnedTimestampSet);
assertTrue(implicitLockedFiles.isEmpty());
}

public void testGetImplicitLockedFilesWithNullPinnedTimestampSet() {
public void testGetPinnedTimestampLockedFilesWithNullPinnedTimestampSet() {
List<String> metadataFiles = Arrays.asList("file1.txt", "file2.txt", "file3.txt");
Set<Long> pinnedTimestampSet = null;
Set<String> implicitLockedFiles = remoteSegmentStoreDirectory.getImplicitLockedFiles(metadataFiles, pinnedTimestampSet);
Set<String> implicitLockedFiles = remoteSegmentStoreDirectory.getPinnedTimestampLockedFiles(metadataFiles, pinnedTimestampSet);
assertTrue(implicitLockedFiles.isEmpty());
}

private Tuple<Map<Long, String>, Set<String>> testGetImplicitLockedFilesWithPinnedTimestamps(
private Tuple<Map<Long, String>, Set<String>> testGetPinnedTimestampLockedFilesWithPinnedTimestamps(
List<Long> metadataFileTimestamps,
Set<Long> pinnedTimetamps
) {
Expand All @@ -1183,15 +1183,15 @@ private Tuple<Map<Long, String>, Set<String>> testGetImplicitLockedFilesWithPinn
}
return new Tuple<>(
metadataFiles,
remoteSegmentStoreDirectory.getImplicitLockedFiles(new ArrayList<>(metadataFiles.values()), pinnedTimetamps)
remoteSegmentStoreDirectory.getPinnedTimestampLockedFiles(new ArrayList<>(metadataFiles.values()), pinnedTimetamps)
);
}

public void testGetImplicitLockedFilesWithPinnedTimestamps() {
public void testGetPinnedTimestampLockedFilesWithPinnedTimestamps() {
// Pinned timestamps 800, 900
// Metadata with timestamp 990
// No metadata matches the timestamp
Tuple<Map<Long, String>, Set<String>> metadataAndLocks = testGetImplicitLockedFilesWithPinnedTimestamps(
Tuple<Map<Long, String>, Set<String>> metadataAndLocks = testGetPinnedTimestampLockedFilesWithPinnedTimestamps(
List.of(990L),
Set.of(800L, 900L)
);
Expand All @@ -1205,7 +1205,7 @@ public void testGetImplicitLockedFilesWithPinnedTimestamps() {
// Pinned timestamps 800, 900, 1000
// Metadata with timestamp 990
// Metadata timestamp 990 <= Pinned Timestamp 1000
metadataAndLocks = testGetImplicitLockedFilesWithPinnedTimestamps(List.of(990L), Set.of(800L, 900L, 1000L));
metadataAndLocks = testGetPinnedTimestampLockedFilesWithPinnedTimestamps(List.of(990L), Set.of(800L, 900L, 1000L));
metadataFiles = metadataAndLocks.v1();
implicitLockedFiles = metadataAndLocks.v2();

Expand All @@ -1218,7 +1218,7 @@ public void testGetImplicitLockedFilesWithPinnedTimestamps() {
// Pinned timestamps 800, 900, 1000
// Metadata with timestamp 990, 995
// Metadata timestamp 995 <= Pinned Timestamp 1000
metadataAndLocks = testGetImplicitLockedFilesWithPinnedTimestamps(List.of(990L, 995L), Set.of(800L, 900L, 1000L));
metadataAndLocks = testGetPinnedTimestampLockedFilesWithPinnedTimestamps(List.of(990L, 995L), Set.of(800L, 900L, 1000L));
metadataFiles = metadataAndLocks.v1();
implicitLockedFiles = metadataAndLocks.v2();

Expand All @@ -1231,7 +1231,7 @@ public void testGetImplicitLockedFilesWithPinnedTimestamps() {
// Pinned timestamps 800, 900, 1000
// Metadata with timestamp 990, 995, 1000
// Metadata timestamp 1000 <= Pinned Timestamp 1000
metadataAndLocks = testGetImplicitLockedFilesWithPinnedTimestamps(List.of(990L, 995L, 1000L), Set.of(800L, 900L, 1000L));
metadataAndLocks = testGetPinnedTimestampLockedFilesWithPinnedTimestamps(List.of(990L, 995L, 1000L), Set.of(800L, 900L, 1000L));
metadataFiles = metadataAndLocks.v1();
implicitLockedFiles = metadataAndLocks.v2();

Expand All @@ -1245,7 +1245,7 @@ public void testGetImplicitLockedFilesWithPinnedTimestamps() {
// Metadata with timestamp 990, 995, 1000, 1001
// Metadata timestamp 1000 <= Pinned Timestamp 1000
// Metadata timestamp 1001 <= Pinned Timestamp 2000
metadataAndLocks = testGetImplicitLockedFilesWithPinnedTimestamps(
metadataAndLocks = testGetPinnedTimestampLockedFilesWithPinnedTimestamps(
List.of(990L, 995L, 1000L, 1001L),
Set.of(800L, 900L, 1000L, 2000L)
);
Expand All @@ -1267,7 +1267,7 @@ public void testGetImplicitLockedFilesWithPinnedTimestamps() {
// Metadata timestamp 1001 <= Pinned Timestamp 3000
// Metadata timestamp 1001 <= Pinned Timestamp 4000
// Metadata timestamp 1001 <= Pinned Timestamp 5000
metadataAndLocks = testGetImplicitLockedFilesWithPinnedTimestamps(
metadataAndLocks = testGetPinnedTimestampLockedFilesWithPinnedTimestamps(
List.of(990L, 995L, 1000L, 1001L),
Set.of(800L, 900L, 1000L, 2000L, 3000L, 4000L, 5000L)
);
Expand All @@ -1289,7 +1289,7 @@ public void testGetImplicitLockedFilesWithPinnedTimestamps() {
// Metadata timestamp 2300 <= Pinned Timestamp 3000
// Metadata timestamp 2300 <= Pinned Timestamp 4000
// Metadata timestamp 2300 <= Pinned Timestamp 5000
metadataAndLocks = testGetImplicitLockedFilesWithPinnedTimestamps(
metadataAndLocks = testGetPinnedTimestampLockedFilesWithPinnedTimestamps(
List.of(990L, 995L, 1000L, 1001L, 1900L, 2300L),
Set.of(800L, 900L, 1000L, 2000L, 3000L, 4000L, 5000L)
);
Expand All @@ -1312,7 +1312,7 @@ public void testGetImplicitLockedFilesWithPinnedTimestamps() {
// Metadata timestamp 2300 <= Pinned Timestamp 3000
// Metadata timestamp 2300 <= Pinned Timestamp 4000
// Metadata timestamp 2300 <= Pinned Timestamp 5000
metadataAndLocks = testGetImplicitLockedFilesWithPinnedTimestamps(
metadataAndLocks = testGetPinnedTimestampLockedFilesWithPinnedTimestamps(
List.of(990L, 995L, 1000L, 1001L, 1900L, 2300L),
Set.of(2000L, 3000L, 4000L, 5000L)
);
Expand All @@ -1333,7 +1333,7 @@ public void testGetImplicitLockedFilesWithPinnedTimestamps() {
// Metadata timestamp 3000 <= Pinned Timestamp 3000
// Metadata timestamp 3001 <= Pinned Timestamp 4000
// Metadata timestamp 3001 <= Pinned Timestamp 5000
metadataAndLocks = testGetImplicitLockedFilesWithPinnedTimestamps(
metadataAndLocks = testGetPinnedTimestampLockedFilesWithPinnedTimestamps(
List.of(1001L, 1900L, 2300L, 3000L, 3001L, 5500L, 6000L),
Set.of(2000L, 3000L, 4000L, 5000L)
);
Expand All @@ -1358,7 +1358,7 @@ public void testGetImplicitLockedFilesWithPinnedTimestamps() {
// Metadata timestamp 3001 <= Pinned Timestamp 5000
// Metadata timestamp 6000 <= Pinned Timestamp 6000
// Metadata timestamp 6000 <= Pinned Timestamp 7000
metadataAndLocks = testGetImplicitLockedFilesWithPinnedTimestamps(
metadataAndLocks = testGetPinnedTimestampLockedFilesWithPinnedTimestamps(
List.of(2300L, 3000L, 3001L, 5500L, 6000L),
Set.of(4000L, 5000L, 6000L, 7000L)
);
Expand Down

0 comments on commit 755f393

Please sign in to comment.