Skip to content

Commit

Permalink
[server][dvc] Delete deprecated chunks when processing partial update (
Browse files Browse the repository at this point in the history
…#535)

This PR aims to clean up deprecated value and RMD chunks to avoid storage engine chunking leaks. When we processed a partial update request, it will also try to look up old value and RMD manifest and will produce DELETE messages to old value and RMD chunks to remove them.
Note:

As discussed offline, this solution will be a short term solution to clean up disk space, but it will not clean up Kafka version topic garbage chunks as even after log compression there will be one DELETE message left per deprecated chunk key, but we think this is acceptable at this stage as DELETE message is considered small and we can work on long term complete solution later.
Based on current implementation, we could not fully delete a RMD of a key. We can only insert an empty RMD into it. (If this is not acceptable, we will need to add new implementations to execute real RocksDB delete on RMD Column Family.
We refactor the chunking get method to store the retrieved value's manifest if it is chunked. That means for AASIT, we will always delete old RMD chunks (as it is always fetched for every operation) and we will do best effort to delete old value's RMD (in theory, we will always do it for UPDATE). For LFSIT, we will always delete value chunks for UPDATE operation.
  • Loading branch information
sixpluszero authored Jul 20, 2023
1 parent c00010a commit a0a365f
Show file tree
Hide file tree
Showing 24 changed files with 1,221 additions and 466 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -596,7 +596,8 @@ protected <T> T bufferAndAssembleRecordChangeEvent(
null,
readerSchemaId,
deserializerCache,
compressor);
compressor,
null);
} catch (Exception ex) {
// We might get an exception if we haven't persisted all the chunks for a given key. This
// can actually happen if the client seeks to the middle of a chunked record either by
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static com.linkedin.davinci.kafka.consumer.LeaderFollowerStateType.LEADER;
import static com.linkedin.davinci.kafka.consumer.LeaderFollowerStateType.STANDBY;
import static com.linkedin.venice.VeniceConstants.REWIND_TIME_DECIDED_BY_SERVER;
import static com.linkedin.venice.writer.VeniceWriter.APP_DEFAULT_LOGICAL_TS;

import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
import com.linkedin.davinci.config.VeniceStoreVersionConfig;
Expand All @@ -13,7 +14,7 @@
import com.linkedin.davinci.replication.merge.RmdSerDe;
import com.linkedin.davinci.replication.merge.StringAnnotatedStoreSchemaCache;
import com.linkedin.davinci.stats.AggVersionedIngestionStats;
import com.linkedin.davinci.storage.chunking.ChunkingUtils;
import com.linkedin.davinci.storage.chunking.ChunkedValueManifestContainer;
import com.linkedin.davinci.storage.chunking.RawBytesChunkingAdapter;
import com.linkedin.davinci.storage.chunking.SingleGetChunkingAdapter;
import com.linkedin.davinci.store.cache.backend.ObjectCacheBackend;
Expand Down Expand Up @@ -42,6 +43,7 @@
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import com.linkedin.venice.schema.rmd.RmdUtils;
import com.linkedin.venice.serialization.RawBytesStoreDeserializerCache;
import com.linkedin.venice.storage.protocol.ChunkedValueManifest;
import com.linkedin.venice.utils.ByteUtils;
import com.linkedin.venice.utils.LatencyUtils;
import com.linkedin.venice.utils.Time;
Expand All @@ -50,7 +52,6 @@
import com.linkedin.venice.writer.DeleteMetadata;
import com.linkedin.venice.writer.LeaderMetadataWrapper;
import com.linkedin.venice.writer.PutMetadata;
import com.linkedin.venice.writer.VeniceWriter;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -116,6 +117,7 @@ public ActiveActiveStoreIngestionTask(
cacheBackend);

this.rmdProtocolVersionID = version.getRmdVersionId();

this.aggVersionedIngestionStats = versionedIngestionStats;
int knownKafkaClusterNumber = serverConfig.getKafkaClusterIdToUrlMap().size();
int consumerPoolSizePerKafkaCluster = serverConfig.getConsumerPoolSizePerKafkaCluster();
Expand Down Expand Up @@ -304,20 +306,39 @@ RmdWithValueSchemaId getReplicationMetadataAndSchemaId(
return new RmdWithValueSchemaId(
cachedRecord.getValueSchemaId(),
getRmdProtocolVersionID(),
cachedRecord.getReplicationMetadataRecord());
cachedRecord.getReplicationMetadataRecord(),
cachedRecord.getRmdManifest());
}
ChunkedValueManifestContainer rmdManifestContainer = new ChunkedValueManifestContainer();
byte[] replicationMetadataWithValueSchemaBytes =
getRmdWithValueSchemaByteBufferFromStorage(subPartition, key, currentTimeForMetricsMs);
getRmdWithValueSchemaByteBufferFromStorage(subPartition, key, rmdManifestContainer, currentTimeForMetricsMs);
if (replicationMetadataWithValueSchemaBytes == null) {
return null; // No RMD for this key
}
return rmdSerDe.deserializeValueSchemaIdPrependedRmdBytes(replicationMetadataWithValueSchemaBytes);
RmdWithValueSchemaId rmdWithValueSchemaId = new RmdWithValueSchemaId();
// Get old RMD manifest value from RMD Manifest container object.
rmdWithValueSchemaId.setRmdManifest(rmdManifestContainer.getManifest());
getRmdSerDe()
.deserializeValueSchemaIdPrependedRmdBytes(replicationMetadataWithValueSchemaBytes, rmdWithValueSchemaId);
return rmdWithValueSchemaId;
}

public RmdSerDe getRmdSerDe() {
return rmdSerDe;
}

byte[] getRmdWithValueSchemaByteBufferFromStorage(int subPartition, byte[] key, long currentTimeForMetricsMs) {
/**
* This method tries to retrieve the RMD bytes with prepended value schema ID from storage engine. It will also store
* RMD manifest into passed-in {@link ChunkedValueManifestContainer} container object if current RMD value is chunked.
*/
byte[] getRmdWithValueSchemaByteBufferFromStorage(
int subPartition,
byte[] key,
ChunkedValueManifestContainer rmdManifestContainer,
long currentTimeForMetricsMs) {
final long lookupStartTimeInNS = System.nanoTime();
ValueRecord result =
SingleGetChunkingAdapter.getReplicationMetadata(getStorageEngine(), subPartition, key, isChunked(), null);
ValueRecord result = SingleGetChunkingAdapter
.getReplicationMetadata(getStorageEngine(), subPartition, key, isChunked(), null, rmdManifestContainer);
getHostLevelIngestionStats().recordIngestionReplicationMetadataLookUpLatency(
LatencyUtils.getLatencyInMS(lookupStartTimeInNS),
currentTimeForMetricsMs);
Expand All @@ -327,7 +348,7 @@ byte[] getRmdWithValueSchemaByteBufferFromStorage(int subPartition, byte[] key,
return result.serialize();
}

// This function may modify the original record in KME and it is unsafe to use the payload from KME directly after
// This function may modify the original record in KME, it is unsafe to use the payload from KME directly after
// this function.
protected void processMessageAndMaybeProduceToKafka(
PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> consumerRecord,
Expand Down Expand Up @@ -380,12 +401,13 @@ protected void processMessageAndMaybeProduceToKafka(
throw new VeniceMessageException(
consumerTaskId + " : Invalid/Unrecognized operation type submitted: " + kafkaValue.messageType);
}

final ChunkedValueManifestContainer valueManifestContainer = new ChunkedValueManifestContainer();
Lazy<ByteBuffer> oldValueProvider = Lazy.of(
() -> getValueBytesForKey(
partitionConsumptionState,
keyBytes,
consumerRecord.getTopicPartition(),
valueManifestContainer,
currentTimeForMetricsMs));

final RmdWithValueSchemaId rmdWithValueSchemaID =
Expand Down Expand Up @@ -488,7 +510,9 @@ protected void processMessageAndMaybeProduceToKafka(
subPartition,
kafkaUrl,
kafkaClusterId,
beforeProcessingRecordTimestampNs);
beforeProcessingRecordTimestampNs,
valueManifestContainer.getManifest(),
rmdWithValueSchemaID == null ? null : rmdWithValueSchemaID.getRmdManifest());
}
}

Expand Down Expand Up @@ -547,6 +571,7 @@ private ByteBuffer getValueBytesForKey(
PartitionConsumptionState partitionConsumptionState,
byte[] key,
PubSubTopicPartition topicPartition,
ChunkedValueManifestContainer valueManifestContainer,
long currentTimeForMetricsMs) {
ByteBuffer originalValue = null;
// Find the existing value. If a value for this key is found from the transient map then use that value, otherwise
Expand All @@ -568,14 +593,18 @@ private ByteBuffer getValueBytesForKey(
null,
schemaRepository.getSupersetOrLatestValueSchema(storeName).getId(),
RawBytesStoreDeserializerCache.getInstance(),
compressor.get());
compressor.get(),
valueManifestContainer);
hostLevelIngestionStats.recordIngestionValueBytesLookUpLatency(
LatencyUtils.getLatencyInMS(lookupStartTimeInNS),
currentTimeForMetricsMs);
} else {
hostLevelIngestionStats.recordIngestionValueBytesCacheHitCount(currentTimeForMetricsMs);
// construct originalValue from this transient record only if it's not null.
if (transientRecord.getValue() != null) {
if (valueManifestContainer != null) {
valueManifestContainer.setManifest(transientRecord.getValueManifest());
}
originalValue = ByteBuffer
.wrap(transientRecord.getValue(), transientRecord.getValueOffset(), transientRecord.getValueLen());
}
Expand Down Expand Up @@ -604,7 +633,9 @@ private void producePutOrDeleteToKafka(
int subPartition,
String kafkaUrl,
int kafkaClusterId,
long beforeProcessingRecordTimestampNs) {
long beforeProcessingRecordTimestampNs,
ChunkedValueManifest oldValueManifest,
ChunkedValueManifest oldRmdManifest) {

final ByteBuffer updatedValueBytes = maybeCompressData(
consumerRecord.getTopicPartition().getPartitionNumber(),
Expand All @@ -615,13 +646,18 @@ private void producePutOrDeleteToKafka(
GenericRecord rmdRecord = mergeConflictResult.getRmdRecord();
final ByteBuffer updatedRmdBytes =
rmdSerDe.serializeRmdRecord(mergeConflictResult.getValueSchemaId(), mergeConflictResult.getRmdRecord());

// finally produce and update the transient record map.
if (updatedValueBytes == null) {
hostLevelIngestionStats.recordTombstoneCreatedDCR();
aggVersionedIngestionStats.recordTombStoneCreationDCR(storeName, versionNumber);
partitionConsumptionState
.setTransientRecord(kafkaClusterId, consumerRecord.getOffset(), key, valueSchemaId, rmdRecord);
partitionConsumptionState.setTransientRecord(
kafkaClusterId,
consumerRecord.getOffset(),
key,
valueSchemaId,
rmdRecord,
oldValueManifest,
oldRmdManifest);
Delete deletePayload = new Delete();
deletePayload.schemaId = valueSchemaId;
deletePayload.replicationMetadataVersionId = rmdProtocolVersionID;
Expand All @@ -632,7 +668,10 @@ private void producePutOrDeleteToKafka(
key,
callback,
sourceTopicOffset,
new DeleteMetadata(valueSchemaId, rmdProtocolVersionID, updatedRmdBytes));
APP_DEFAULT_LOGICAL_TS,
new DeleteMetadata(valueSchemaId, rmdProtocolVersionID, updatedRmdBytes),
oldValueManifest,
oldRmdManifest);
LeaderProducedRecordContext leaderProducedRecordContext =
LeaderProducedRecordContext.newDeleteRecord(kafkaClusterId, consumerRecord.getOffset(), key, deletePayload);
produceToLocalKafka(
Expand All @@ -654,7 +693,9 @@ private void producePutOrDeleteToKafka(
updatedValueBytes.position(),
valueLen,
valueSchemaId,
rmdRecord);
rmdRecord,
oldValueManifest,
oldRmdManifest);

Put updatedPut = new Put();
updatedPut.putValue = ByteUtils
Expand All @@ -663,23 +704,18 @@ private void producePutOrDeleteToKafka(
updatedPut.replicationMetadataVersionId = rmdProtocolVersionID;
updatedPut.replicationMetadataPayload = updatedRmdBytes;

byte[] updatedKeyBytes = key;
if (isChunked) {
// Since data is not chunked in RT but chunked in VT, creating the key for the small record case or CVM to be
// used to persist on disk after producing to Kafka.
updatedKeyBytes = ChunkingUtils.KEY_WITH_CHUNKING_SUFFIX_SERIALIZER.serializeNonChunkedKey(key);
}
BiConsumer<ChunkAwareCallback, LeaderMetadataWrapper> produceToTopicFunction = getProduceToTopicFunction(
key,
updatedValueBytes,
updatedRmdBytes,
oldValueManifest,
oldRmdManifest,
valueSchemaId,
mergeConflictResult.doesResultReuseInput());
produceToLocalKafka(
consumerRecord,
partitionConsumptionState,
LeaderProducedRecordContext
.newPutRecord(kafkaClusterId, consumerRecord.getOffset(), updatedKeyBytes, updatedPut),
LeaderProducedRecordContext.newPutRecord(kafkaClusterId, consumerRecord.getOffset(), key, updatedPut),
produceToTopicFunction,
subPartition,
kafkaUrl,
Expand Down Expand Up @@ -1345,6 +1381,8 @@ protected BiConsumer<ChunkAwareCallback, LeaderMetadataWrapper> getProduceToTopi
byte[] key,
ByteBuffer updatedValueBytes,
ByteBuffer updatedRmdBytes,
ChunkedValueManifest oldValueManifest,
ChunkedValueManifest oldRmdManifest,
int valueSchemaId,
boolean resultReuseInput) {
return (callback, leaderMetadataWrapper) -> {
Expand All @@ -1364,8 +1402,10 @@ protected BiConsumer<ChunkAwareCallback, LeaderMetadataWrapper> getProduceToTopi
valueSchemaId,
callback,
leaderMetadataWrapper,
VeniceWriter.APP_DEFAULT_LOGICAL_TS,
new PutMetadata(getRmdProtocolVersionID(), updatedRmdBytes));
APP_DEFAULT_LOGICAL_TS,
new PutMetadata(getRmdProtocolVersionID(), updatedRmdBytes),
oldValueManifest,
oldRmdManifest);
};
}

Expand Down
Loading

0 comments on commit a0a365f

Please sign in to comment.