Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#1276] improvment(core): Optimize logic about dropping old version of data in KvGcCollector #2918

Merged
merged 8 commits into from
Apr 16, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import com.google.common.annotations.VisibleForTesting;
import java.io.Closeable;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
Expand All @@ -44,6 +44,15 @@ public final class KvGarbageCollector implements Closeable {
private final KvBackend kvBackend;
private final Config config;
private final EntityKeyEncoder<byte[]> entityKeyEncoder;
private static final byte[] LAST_COLLECT_COMMIT_ID_KEY =
Bytes.concat(
new byte[] {0x1D, 0x00, 0x03}, "last_collect_commit_id".getBytes(StandardCharsets.UTF_8));

// Keep the last collect commit id to avoid collecting the same data multiple times, the first
// time the commit is 1 (minimum), and assuming we have collected the data with transaction id
// (1, 100], then the second time we collect the data and current tx_id is 200,
// then the current transaction id range is (100, 200] and so on.
byte[] commitIdHasBeenCollected;
private long frequencyInMinutes;

private static final String TIME_STAMP_FORMAT = "yyyy-MM-dd HH:mm:ss.SSS";
Expand Down Expand Up @@ -139,24 +148,30 @@ private void collectAndRemoveOldVersionData() throws IOException {
long transactionIdToDelete = deleteTimeLine << 18;
LOG.info("Start to remove data which is older than {}", transactionIdToDelete);
byte[] startKey = TransactionalKvBackendImpl.generateCommitKey(transactionIdToDelete);
byte[] endKey = endOfTransactionId();
commitIdHasBeenCollected = kvBackend.get(LAST_COLLECT_COMMIT_ID_KEY);
if (commitIdHasBeenCollected == null) {
commitIdHasBeenCollected = endOfTransactionId();
}

long lastGCId = getTransactionId(getBinaryTransactionId(commitIdHasBeenCollected));
LOG.info(
"Start to collect data which is modified between '{}({})' (exclusive) and '{}({})' (inclusive)",
lastGCId,
lastGCId == 1 ? lastGCId : DateFormatUtils.format(lastGCId >> 18, TIME_STAMP_FORMAT),
transactionIdToDelete,
DateFormatUtils.format(deleteTimeLine, TIME_STAMP_FORMAT));

// Get all commit marks
// TODO(yuqi), Use multi-thread to scan the data in case of the data is too large.
List<Pair<byte[], byte[]>> kvs =
kvBackend.scan(
new KvRange.KvRangeBuilder()
.start(startKey)
.end(endKey)
.end(commitIdHasBeenCollected)
.startInclusive(true)
.endInclusive(false)
.build());

// Why should we reverse the order? Because we need to delete the data from the oldest data to
// the latest ones. kvs is sorted by transaction id in ascending order (Keys with bigger
// transaction id
// is smaller than keys with smaller transaction id). So we need to reverse the order.
Collections.sort(kvs, (o1, o2) -> Bytes.wrap(o2.getKey()).compareTo(o1.getKey()));
for (Pair<byte[], byte[]> kv : kvs) {
List<byte[]> keysInTheTransaction = SerializationUtils.deserialize(kv.getValue());
byte[] transactionId = getBinaryTransactionId(kv.getKey());
Expand All @@ -174,6 +189,9 @@ private void collectAndRemoveOldVersionData() throws IOException {

// Value has deleted mark, we can remove it.
if (null == TransactionalKvBackendImpl.getRealValue(rawValue)) {
// Delete the key of all versions.
removeAllVersionsOfKey(rawKey, key, false);

LogHelper logHelper = decodeKey(key, transactionId);
LOG.info(
"Physically delete key that has marked deleted: name identifier: '{}', entity type: '{}', createTime: '{}({})', key: '{}'",
Expand All @@ -200,6 +218,9 @@ private void collectAndRemoveOldVersionData() throws IOException {
.limit(1)
.build());
if (!newVersionOfKey.isEmpty()) {
// Have a new version, we can safely remove all old versions.
removeAllVersionsOfKey(rawKey, key, false);

// Has a newer version, we can remove it.
LogHelper logHelper = decodeKey(key, transactionId);
byte[] newVersionKey = newVersionOfKey.get(0).getKey();
Expand Down Expand Up @@ -232,6 +253,71 @@ private void collectAndRemoveOldVersionData() throws IOException {
kvBackend.delete(kv.getKey());
}
}

commitIdHasBeenCollected = kvs.isEmpty() ? startKey : kvs.get(0).getKey();
kvBackend.put(LAST_COLLECT_COMMIT_ID_KEY, commitIdHasBeenCollected, true);
yuqi1129 marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* Remove all versions of the key.
*
* @param rawKey raw key, it contains the transaction id.
* @param key key, it's the real key and does not contain the transaction id
* @param includeStart whether include the start key.
* @throws IOException if an I/O exception occurs during deletion.
*/
private void removeAllVersionsOfKey(byte[] rawKey, byte[] key, boolean includeStart)
throws IOException {
List<Pair<byte[], byte[]>> kvs =
kvBackend.scan(
new KvRange.KvRangeBuilder()
.start(rawKey)
.end(generateKey(key, 1))
.startInclusive(includeStart)
.endInclusive(false)
.build());

for (Pair<byte[], byte[]> kv : kvs) {
// Delete real data.
kvBackend.delete(kv.getKey());

LogHelper logHelper = decodeKey(kv.getKey());
LOG.info(
"Physically delete key that has marked deleted: name identifier: '{}', entity type: '{}', createTime: '{}({})', key: '{}'",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line might be longer than 100 characters?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that spotless does not work for it, I 'll make the necessary changes.

logHelper.identifier,
logHelper.type,
logHelper.createTimeAsString,
logHelper.createTimeInMs,
Bytes.wrap(key));

// Try to delete commit id if the all keys in the transaction id have been dropped.
byte[] transactionId = getBinaryTransactionId(kv.getKey());
byte[] transactionKey = generateCommitKey(transactionId);
byte[] transactionValue = kvBackend.get(transactionKey);

List<byte[]> keysInTheTransaction = SerializationUtils.deserialize(transactionValue);

boolean allDropped = true;
for (byte[] keyInTheTransaction : keysInTheTransaction) {
if (kvBackend.get(generateKey(keyInTheTransaction, transactionId)) != null) {
// There is still a key in the transaction, we cannot delete the commit mark.
allDropped = false;
break;
}
}

// Try to delete the commit mark.
if (allDropped) {
long timestamp = TransactionalKvBackendImpl.getTransactionId(transactionId) >> 18;
LOG.info(
"Physically delete commit mark: {}, createTime: '{}({})', key: '{}'",
Bytes.wrap(kv.getKey()),
DateFormatUtils.format(timestamp, TIME_STAMP_FORMAT),
timestamp,
Bytes.wrap(kv.getKey()));
kvBackend.delete(transactionKey);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless the "delete" is succeed should we print out the log. So this line should move above before the log line.

}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will be happened here if any of the storage operation is failed (scan, get, delete)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If any steps fails, it throws an exceptions and we would repeated do it in the next time, considering the following scenario:
-1 Scan transactions id
-2 For each transaction ID, check the data in the transaction.
-3 Drop the data if it needs to be deleted( deleted or with a newer version)
-4 Remove the transaction marks.
-5 done.

For any failures from steps 1 to 5, do collectAndRemoveOldVersionData again will solve it as the value of commitIdHasBeenCollected will not move forward if anything unexpected happens.

}

static class LogHelper {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import static com.datastrato.gravitino.storage.kv.TestKvEntityStorage.createFilesetEntity;
import static com.datastrato.gravitino.storage.kv.TestKvEntityStorage.createSchemaEntity;
import static com.datastrato.gravitino.storage.kv.TestKvEntityStorage.createTableEntity;
import static com.datastrato.gravitino.storage.kv.TransactionalKvBackendImpl.getBinaryTransactionId;
import static com.datastrato.gravitino.storage.kv.TransactionalKvBackendImpl.getTransactionId;

import com.datastrato.gravitino.Config;
import com.datastrato.gravitino.Configs;
Expand Down Expand Up @@ -409,4 +411,96 @@ void testRemoveWithGCCollector2() throws IOException, InterruptedException {
TableEntity.class));
}
}

@Test
void testIncrementalGC() throws Exception {
Config config = getConfig();
Mockito.when(config.get(STORE_TRANSACTION_MAX_SKEW_TIME)).thenReturn(1000L);

try (EntityStore store = EntityStoreFactory.createEntityStore(config)) {
store.initialize(config);

if (!(store instanceof KvEntityStore)) {
return;
}
KvEntityStore kvEntityStore = (KvEntityStore) store;

store.setSerDe(EntitySerDeFactory.createEntitySerDe(config.get(Configs.ENTITY_SERDE)));
AuditInfo auditInfo =
AuditInfo.builder().withCreator("creator").withCreateTime(Instant.now()).build();

BaseMetalake metalake1 = createBaseMakeLake(1L, "metalake1", auditInfo);
BaseMetalake metalake2 = createBaseMakeLake(2L, "metalake2", auditInfo);
BaseMetalake metalake3 = createBaseMakeLake(3L, "metalake3", auditInfo);

for (int i = 0; i < 10; i++) {
store.put(metalake1);
store.put(metalake2);
store.put(metalake3);

store.delete(NameIdentifier.of("metalake1"), Entity.EntityType.METALAKE);
store.delete(NameIdentifier.of("metalake2"), Entity.EntityType.METALAKE);
store.delete(NameIdentifier.of("metalake3"), Entity.EntityType.METALAKE);

Thread.sleep(10);
}

store.put(metalake1);
store.put(metalake2);
store.put(metalake3);

Mockito.when(config.get(STORE_DELETE_AFTER_TIME)).thenReturn(1000L);
Thread.sleep(1500);

// Scan raw key-value data from storage to confirm the data is deleted
kvEntityStore.kvGarbageCollector.collectAndClean();
List<Pair<byte[], byte[]>> allData =
kvEntityStore.backend.scan(
new KvRange.KvRangeBuilder()
.start("_".getBytes())
.end("z".getBytes())
.startInclusive(false)
.endInclusive(false)
.build());

Assertions.assertEquals(3, allData.size());

long transactionId =
getTransactionId(
getBinaryTransactionId(kvEntityStore.kvGarbageCollector.commitIdHasBeenCollected));
Assertions.assertNotEquals(1, transactionId);

for (int i = 0; i < 10; i++) {
store.delete(NameIdentifier.of("metalake1"), Entity.EntityType.METALAKE);
store.delete(NameIdentifier.of("metalake2"), Entity.EntityType.METALAKE);
store.delete(NameIdentifier.of("metalake3"), Entity.EntityType.METALAKE);
store.put(metalake1);
store.put(metalake2);
store.put(metalake3);
Thread.sleep(10);
}
store.delete(NameIdentifier.of("metalake1"), Entity.EntityType.METALAKE);
store.delete(NameIdentifier.of("metalake2"), Entity.EntityType.METALAKE);
store.delete(NameIdentifier.of("metalake3"), Entity.EntityType.METALAKE);

Thread.sleep(1500);
kvEntityStore.kvGarbageCollector.collectAndClean();

allData =
kvEntityStore.backend.scan(
new KvRange.KvRangeBuilder()
.start("_".getBytes())
.end("z".getBytes())
.startInclusive(false)
.endInclusive(false)
.build());

Assertions.assertTrue(allData.isEmpty());

long transactionIdV2 =
getTransactionId(
getBinaryTransactionId(kvEntityStore.kvGarbageCollector.commitIdHasBeenCollected));
Assertions.assertTrue(transactionIdV2 > transactionId);
}
}
}
1 change: 1 addition & 0 deletions rfc/rfc-3/Transaction-implementation-on-kv.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,5 +123,6 @@ Scan and range query are almost the same as that of read process, for more detai
- Keys that start with 0x'1D0000' store the contents of id-name mapping. for more please refer to class `KvNameMappingService`.
- Keys that start with 0x'1D0001' store the data of current timestamp which is used for generating transaction id, for more please refer to class `TransactionIdGeneratorImpl`.
- Keys that start with 0x'1D0002' store the information of storage layout version. For more please refer to `KvEntityStore#initStorageVersionInfo`
- Keys that start with 0x'1D0003' store tha transaction id that was used by `KvGarbageCollector` last time.
- Keys that start with 0x'1E' store transaction marks which mark the transaction is committed or not.
- Other key spaces are used to store gravitino entities like `metalakes`,`catalogs`, `scheams`, `tables` and so on. it usually starts with from 0x'20'(space) to 0x'7F'(delete). For more please refer to class `KvEntityStoreImpl`.
Loading