From a0a365f104863b675604ec37578fe587cbb7bd4b Mon Sep 17 00:00:00 2001 From: Jialin Liu Date: Thu, 20 Jul 2023 13:04:16 -0700 Subject: [PATCH] [server][dvc] Delete deprecated chunks when processing partial update (#535) This PR aims to clean up deprecated value and RMD chunks to avoid storage engine chunking leaks. When we processed a partial update request, it will also try to look up old value and RMD manifest and will produce DELETE messages to old value and RMD chunks to remove them. Note: As discussed offline, this solution will be a short term solution to clean up disk space, but it will not clean up Kafka version topic garbage chunks as even after log compression there will be one DELETE message left per deprecated chunk key, but we think this is acceptable at this stage as DELETE message is considered small and we can work on long term complete solution later. Based on current implementation, we could not fully delete a RMD of a key. We can only insert an empty RMD into it. (If this is not acceptable, we will need to add new implementations to execute real RocksDB delete on RMD Column Family. We refactor the chunking get method to store the retrieved value's manifest if it is chunked. That means for AASIT, we will always delete old RMD chunks (as it is always fetched for every operation) and we will do best effort to delete old value's RMD (in theory, we will always do it for UPDATE). For LFSIT, we will always delete value chunks for UPDATE operation. --- .../consumer/VeniceChangelogConsumerImpl.java | 3 +- .../ActiveActiveStoreIngestionTask.java | 96 +++-- .../LeaderFollowerStoreIngestionTask.java | 65 ++-- .../consumer/LeaderProducedRecordContext.java | 4 + .../consumer/LeaderProducerCallback.java | 52 ++- .../consumer/PartitionConsumptionState.java | 41 +- .../kafka/consumer/StoreIngestionTask.java | 1 - .../replication/RmdWithValueSchemaId.java | 43 +- .../davinci/replication/merge/RmdSerDe.java | 13 +- .../chunking/AbstractAvroChunkingAdapter.java | 6 +- .../ChunkedValueManifestContainer.java | 19 + .../storage/chunking/ChunkingUtils.java | 50 ++- .../chunking/SingleGetChunkingAdapter.java | 12 +- .../ActiveActiveStoreIngestionTaskTest.java | 28 +- .../consumer/LeaderProducerCallbackTest.java | 35 ++ .../PartitionConsumptionStateTest.java | 8 +- .../replication/merge/RmdSerDeTest.java | 4 +- .../storage/chunking/ChunkingTest.java | 3 +- .../venice/writer/ChunkAwareCallback.java | 12 +- .../linkedin/venice/writer/VeniceWriter.java | 176 ++++++++- .../venice/writer/WriterChunkingHelper.java | 2 +- .../venice/writer/VeniceWriterUnitTest.java | 368 ++++++++++++++++++ .../venice/endToEnd/PartialUpdateTest.java | 329 ++++++++++++++-- .../venice/writer/VeniceWriterTest.java | 317 --------------- 24 files changed, 1221 insertions(+), 466 deletions(-) create mode 100644 clients/da-vinci-client/src/main/java/com/linkedin/davinci/storage/chunking/ChunkedValueManifestContainer.java diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImpl.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImpl.java index d1a8aa81e0..c9e16d91d5 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImpl.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImpl.java @@ -596,7 +596,8 @@ protected T bufferAndAssembleRecordChangeEvent( null, readerSchemaId, deserializerCache, - compressor); + compressor, + null); } catch (Exception ex) { // We might get an exception if we haven't persisted all the chunks for a given key. This // can actually happen if the client seeks to the middle of a chunked record either by diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java index 4545cf9888..75e582ea25 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java @@ -3,6 +3,7 @@ import static com.linkedin.davinci.kafka.consumer.LeaderFollowerStateType.LEADER; import static com.linkedin.davinci.kafka.consumer.LeaderFollowerStateType.STANDBY; import static com.linkedin.venice.VeniceConstants.REWIND_TIME_DECIDED_BY_SERVER; +import static com.linkedin.venice.writer.VeniceWriter.APP_DEFAULT_LOGICAL_TS; import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper; import com.linkedin.davinci.config.VeniceStoreVersionConfig; @@ -13,7 +14,7 @@ import com.linkedin.davinci.replication.merge.RmdSerDe; import com.linkedin.davinci.replication.merge.StringAnnotatedStoreSchemaCache; import com.linkedin.davinci.stats.AggVersionedIngestionStats; -import com.linkedin.davinci.storage.chunking.ChunkingUtils; +import com.linkedin.davinci.storage.chunking.ChunkedValueManifestContainer; import com.linkedin.davinci.storage.chunking.RawBytesChunkingAdapter; import com.linkedin.davinci.storage.chunking.SingleGetChunkingAdapter; import com.linkedin.davinci.store.cache.backend.ObjectCacheBackend; @@ -42,6 +43,7 @@ import com.linkedin.venice.pubsub.api.PubSubTopicPartition; import com.linkedin.venice.schema.rmd.RmdUtils; import com.linkedin.venice.serialization.RawBytesStoreDeserializerCache; +import com.linkedin.venice.storage.protocol.ChunkedValueManifest; import com.linkedin.venice.utils.ByteUtils; import com.linkedin.venice.utils.LatencyUtils; import com.linkedin.venice.utils.Time; @@ -50,7 +52,6 @@ import com.linkedin.venice.writer.DeleteMetadata; import com.linkedin.venice.writer.LeaderMetadataWrapper; import com.linkedin.venice.writer.PutMetadata; -import com.linkedin.venice.writer.VeniceWriter; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; @@ -116,6 +117,7 @@ public ActiveActiveStoreIngestionTask( cacheBackend); this.rmdProtocolVersionID = version.getRmdVersionId(); + this.aggVersionedIngestionStats = versionedIngestionStats; int knownKafkaClusterNumber = serverConfig.getKafkaClusterIdToUrlMap().size(); int consumerPoolSizePerKafkaCluster = serverConfig.getConsumerPoolSizePerKafkaCluster(); @@ -304,20 +306,39 @@ RmdWithValueSchemaId getReplicationMetadataAndSchemaId( return new RmdWithValueSchemaId( cachedRecord.getValueSchemaId(), getRmdProtocolVersionID(), - cachedRecord.getReplicationMetadataRecord()); + cachedRecord.getReplicationMetadataRecord(), + cachedRecord.getRmdManifest()); } + ChunkedValueManifestContainer rmdManifestContainer = new ChunkedValueManifestContainer(); byte[] replicationMetadataWithValueSchemaBytes = - getRmdWithValueSchemaByteBufferFromStorage(subPartition, key, currentTimeForMetricsMs); + getRmdWithValueSchemaByteBufferFromStorage(subPartition, key, rmdManifestContainer, currentTimeForMetricsMs); if (replicationMetadataWithValueSchemaBytes == null) { return null; // No RMD for this key } - return rmdSerDe.deserializeValueSchemaIdPrependedRmdBytes(replicationMetadataWithValueSchemaBytes); + RmdWithValueSchemaId rmdWithValueSchemaId = new RmdWithValueSchemaId(); + // Get old RMD manifest value from RMD Manifest container object. + rmdWithValueSchemaId.setRmdManifest(rmdManifestContainer.getManifest()); + getRmdSerDe() + .deserializeValueSchemaIdPrependedRmdBytes(replicationMetadataWithValueSchemaBytes, rmdWithValueSchemaId); + return rmdWithValueSchemaId; + } + + public RmdSerDe getRmdSerDe() { + return rmdSerDe; } - byte[] getRmdWithValueSchemaByteBufferFromStorage(int subPartition, byte[] key, long currentTimeForMetricsMs) { + /** + * This method tries to retrieve the RMD bytes with prepended value schema ID from storage engine. It will also store + * RMD manifest into passed-in {@link ChunkedValueManifestContainer} container object if current RMD value is chunked. + */ + byte[] getRmdWithValueSchemaByteBufferFromStorage( + int subPartition, + byte[] key, + ChunkedValueManifestContainer rmdManifestContainer, + long currentTimeForMetricsMs) { final long lookupStartTimeInNS = System.nanoTime(); - ValueRecord result = - SingleGetChunkingAdapter.getReplicationMetadata(getStorageEngine(), subPartition, key, isChunked(), null); + ValueRecord result = SingleGetChunkingAdapter + .getReplicationMetadata(getStorageEngine(), subPartition, key, isChunked(), null, rmdManifestContainer); getHostLevelIngestionStats().recordIngestionReplicationMetadataLookUpLatency( LatencyUtils.getLatencyInMS(lookupStartTimeInNS), currentTimeForMetricsMs); @@ -327,7 +348,7 @@ byte[] getRmdWithValueSchemaByteBufferFromStorage(int subPartition, byte[] key, return result.serialize(); } - // This function may modify the original record in KME and it is unsafe to use the payload from KME directly after + // This function may modify the original record in KME, it is unsafe to use the payload from KME directly after // this function. protected void processMessageAndMaybeProduceToKafka( PubSubMessage consumerRecord, @@ -380,12 +401,13 @@ protected void processMessageAndMaybeProduceToKafka( throw new VeniceMessageException( consumerTaskId + " : Invalid/Unrecognized operation type submitted: " + kafkaValue.messageType); } - + final ChunkedValueManifestContainer valueManifestContainer = new ChunkedValueManifestContainer(); Lazy oldValueProvider = Lazy.of( () -> getValueBytesForKey( partitionConsumptionState, keyBytes, consumerRecord.getTopicPartition(), + valueManifestContainer, currentTimeForMetricsMs)); final RmdWithValueSchemaId rmdWithValueSchemaID = @@ -488,7 +510,9 @@ protected void processMessageAndMaybeProduceToKafka( subPartition, kafkaUrl, kafkaClusterId, - beforeProcessingRecordTimestampNs); + beforeProcessingRecordTimestampNs, + valueManifestContainer.getManifest(), + rmdWithValueSchemaID == null ? null : rmdWithValueSchemaID.getRmdManifest()); } } @@ -547,6 +571,7 @@ private ByteBuffer getValueBytesForKey( PartitionConsumptionState partitionConsumptionState, byte[] key, PubSubTopicPartition topicPartition, + ChunkedValueManifestContainer valueManifestContainer, long currentTimeForMetricsMs) { ByteBuffer originalValue = null; // Find the existing value. If a value for this key is found from the transient map then use that value, otherwise @@ -568,7 +593,8 @@ private ByteBuffer getValueBytesForKey( null, schemaRepository.getSupersetOrLatestValueSchema(storeName).getId(), RawBytesStoreDeserializerCache.getInstance(), - compressor.get()); + compressor.get(), + valueManifestContainer); hostLevelIngestionStats.recordIngestionValueBytesLookUpLatency( LatencyUtils.getLatencyInMS(lookupStartTimeInNS), currentTimeForMetricsMs); @@ -576,6 +602,9 @@ private ByteBuffer getValueBytesForKey( hostLevelIngestionStats.recordIngestionValueBytesCacheHitCount(currentTimeForMetricsMs); // construct originalValue from this transient record only if it's not null. if (transientRecord.getValue() != null) { + if (valueManifestContainer != null) { + valueManifestContainer.setManifest(transientRecord.getValueManifest()); + } originalValue = ByteBuffer .wrap(transientRecord.getValue(), transientRecord.getValueOffset(), transientRecord.getValueLen()); } @@ -604,7 +633,9 @@ private void producePutOrDeleteToKafka( int subPartition, String kafkaUrl, int kafkaClusterId, - long beforeProcessingRecordTimestampNs) { + long beforeProcessingRecordTimestampNs, + ChunkedValueManifest oldValueManifest, + ChunkedValueManifest oldRmdManifest) { final ByteBuffer updatedValueBytes = maybeCompressData( consumerRecord.getTopicPartition().getPartitionNumber(), @@ -615,13 +646,18 @@ private void producePutOrDeleteToKafka( GenericRecord rmdRecord = mergeConflictResult.getRmdRecord(); final ByteBuffer updatedRmdBytes = rmdSerDe.serializeRmdRecord(mergeConflictResult.getValueSchemaId(), mergeConflictResult.getRmdRecord()); - // finally produce and update the transient record map. if (updatedValueBytes == null) { hostLevelIngestionStats.recordTombstoneCreatedDCR(); aggVersionedIngestionStats.recordTombStoneCreationDCR(storeName, versionNumber); - partitionConsumptionState - .setTransientRecord(kafkaClusterId, consumerRecord.getOffset(), key, valueSchemaId, rmdRecord); + partitionConsumptionState.setTransientRecord( + kafkaClusterId, + consumerRecord.getOffset(), + key, + valueSchemaId, + rmdRecord, + oldValueManifest, + oldRmdManifest); Delete deletePayload = new Delete(); deletePayload.schemaId = valueSchemaId; deletePayload.replicationMetadataVersionId = rmdProtocolVersionID; @@ -632,7 +668,10 @@ private void producePutOrDeleteToKafka( key, callback, sourceTopicOffset, - new DeleteMetadata(valueSchemaId, rmdProtocolVersionID, updatedRmdBytes)); + APP_DEFAULT_LOGICAL_TS, + new DeleteMetadata(valueSchemaId, rmdProtocolVersionID, updatedRmdBytes), + oldValueManifest, + oldRmdManifest); LeaderProducedRecordContext leaderProducedRecordContext = LeaderProducedRecordContext.newDeleteRecord(kafkaClusterId, consumerRecord.getOffset(), key, deletePayload); produceToLocalKafka( @@ -654,7 +693,9 @@ private void producePutOrDeleteToKafka( updatedValueBytes.position(), valueLen, valueSchemaId, - rmdRecord); + rmdRecord, + oldValueManifest, + oldRmdManifest); Put updatedPut = new Put(); updatedPut.putValue = ByteUtils @@ -663,23 +704,18 @@ private void producePutOrDeleteToKafka( updatedPut.replicationMetadataVersionId = rmdProtocolVersionID; updatedPut.replicationMetadataPayload = updatedRmdBytes; - byte[] updatedKeyBytes = key; - if (isChunked) { - // Since data is not chunked in RT but chunked in VT, creating the key for the small record case or CVM to be - // used to persist on disk after producing to Kafka. - updatedKeyBytes = ChunkingUtils.KEY_WITH_CHUNKING_SUFFIX_SERIALIZER.serializeNonChunkedKey(key); - } BiConsumer produceToTopicFunction = getProduceToTopicFunction( key, updatedValueBytes, updatedRmdBytes, + oldValueManifest, + oldRmdManifest, valueSchemaId, mergeConflictResult.doesResultReuseInput()); produceToLocalKafka( consumerRecord, partitionConsumptionState, - LeaderProducedRecordContext - .newPutRecord(kafkaClusterId, consumerRecord.getOffset(), updatedKeyBytes, updatedPut), + LeaderProducedRecordContext.newPutRecord(kafkaClusterId, consumerRecord.getOffset(), key, updatedPut), produceToTopicFunction, subPartition, kafkaUrl, @@ -1345,6 +1381,8 @@ protected BiConsumer getProduceToTopi byte[] key, ByteBuffer updatedValueBytes, ByteBuffer updatedRmdBytes, + ChunkedValueManifest oldValueManifest, + ChunkedValueManifest oldRmdManifest, int valueSchemaId, boolean resultReuseInput) { return (callback, leaderMetadataWrapper) -> { @@ -1364,8 +1402,10 @@ protected BiConsumer getProduceToTopi valueSchemaId, callback, leaderMetadataWrapper, - VeniceWriter.APP_DEFAULT_LOGICAL_TS, - new PutMetadata(getRmdProtocolVersionID(), updatedRmdBytes)); + APP_DEFAULT_LOGICAL_TS, + new PutMetadata(getRmdProtocolVersionID(), updatedRmdBytes), + oldValueManifest, + oldRmdManifest); }; } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java index fd3fda2372..817270bf55 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java @@ -7,14 +7,15 @@ import static com.linkedin.davinci.kafka.consumer.LeaderFollowerStateType.PAUSE_TRANSITION_FROM_STANDBY_TO_LEADER; import static com.linkedin.davinci.kafka.consumer.LeaderFollowerStateType.STANDBY; import static com.linkedin.venice.kafka.protocol.enums.ControlMessageType.END_OF_PUSH; +import static com.linkedin.venice.writer.VeniceWriter.APP_DEFAULT_LOGICAL_TS; import static com.linkedin.venice.writer.VeniceWriter.DEFAULT_LEADER_METADATA_WRAPPER; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.MINUTES; import com.linkedin.davinci.config.VeniceStoreVersionConfig; import com.linkedin.davinci.helix.LeaderFollowerPartitionStateModel; +import com.linkedin.davinci.storage.chunking.ChunkedValueManifestContainer; import com.linkedin.davinci.storage.chunking.ChunkingAdapter; -import com.linkedin.davinci.storage.chunking.ChunkingUtils; import com.linkedin.davinci.storage.chunking.GenericRecordChunkingAdapter; import com.linkedin.davinci.store.AbstractStorageEngine; import com.linkedin.davinci.store.cache.backend.ObjectCacheBackend; @@ -51,6 +52,7 @@ import com.linkedin.venice.schema.merge.MergeRecordHelper; import com.linkedin.venice.serialization.AvroStoreDeserializerCache; import com.linkedin.venice.stats.StatsErrorCode; +import com.linkedin.venice.storage.protocol.ChunkedValueManifest; import com.linkedin.venice.utils.ByteUtils; import com.linkedin.venice.utils.LatencyUtils; import com.linkedin.venice.utils.PartitionUtils; @@ -2665,6 +2667,8 @@ protected void processMessageAndMaybeProduceToKafka( putValue.position(), putValue.remaining(), put.schemaId, + null, + null, // Since we did not perform read, it is not possible to delete old value chunks here. null); } @@ -2733,7 +2737,8 @@ protected void processMessageAndMaybeProduceToKafka( * For WC enabled stores update the transient record map with the latest {key,null} for similar reason as mentioned in PUT above. */ if (isWriteComputationEnabled && partitionConsumptionState.isEndOfPushReceived()) { - partitionConsumptionState.setTransientRecord(kafkaClusterId, consumerRecord.getOffset(), keyBytes, -1, null); + partitionConsumptionState + .setTransientRecord(kafkaClusterId, consumerRecord.getOffset(), keyBytes, -1, null, null, null); } leaderProducedRecordContext = LeaderProducedRecordContext.newDeleteRecord(kafkaClusterId, consumerRecord.getOffset(), keyBytes, null); @@ -2770,20 +2775,20 @@ protected void processMessageAndMaybeProduceToKafka( } /** - * 1. Currently we support chunking only for messages produced on VT topic during batch part of the ingestion + * 1. Currently, we support chunking only for messages produced on VT topic during batch part of the ingestion * for hybrid stores. Chunking is NOT supported for messages produced to RT topics during streaming ingestion. * * So the assumption here is that the PUT/UPDATE messages stored in transientRecord should always be a full value - * (non chunked). Decoding should succeed using the the simplified API + * (non chunked). Decoding should succeed using the simplified API * {@link ChunkingAdapter#constructValue} * * 2. We always use the latest value schema to deserialize stored value bytes. - * 3. We always use the write compute schema with an ID combination of latest value schema ID + update schema ID + * 3. We always use the partial update schema with an ID combination of the latest value schema ID + update schema ID * to deserialize the incoming Update request payload bytes. * * The reason for 2 and 3 is that we depend on the fact that the latest value schema must be a superset schema * that contains all value fields that ever existed in a store value schema. So, always using a superset schema - * as the reader schema avoids data loss where the serialized bytes contain data for a field , however, the + * as the reader schema avoids data loss where the serialized bytes contain data for a field, however, the * deserialized record does not contain that field because the reader schema does not contain that field. */ private void handleUpdateRequest( @@ -2809,15 +2814,17 @@ private void handleUpdateRequest( readerValueSchemaId = supersetSchemaEntry.getId(); readerUpdateProtocolVersion = update.updateSchemaId; } - + ChunkedValueManifestContainer valueManifestContainer = new ChunkedValueManifestContainer(); final GenericRecord currValue = readStoredValueRecord( partitionConsumptionState, keyBytes, readerValueSchemaId, - consumerRecord.getTopicPartition()); + consumerRecord.getTopicPartition(), + valueManifestContainer); - // Apply Write Compute. final byte[] updatedValueBytes; + final ChunkedValueManifest oldValueManifest = valueManifestContainer.getManifest(); + try { long writeComputeStartTimeInNS = System.nanoTime(); // Leader nodes are the only ones which process UPDATES, so it's valid to always compress and not call @@ -2856,6 +2863,8 @@ private void handleUpdateRequest( 0, updatedValueBytes.length, readerValueSchemaId, + null, + oldValueManifest, null); ByteBuffer updateValueWithSchemaId = @@ -2865,25 +2874,27 @@ private void handleUpdateRequest( updatedPut.putValue = updateValueWithSchemaId; updatedPut.schemaId = readerValueSchemaId; - byte[] updatedKeyBytes = keyBytes; - if (isChunked) { - // Samza VeniceWriter doesn't handle chunking config properly. It reads chunking config - // from user's input instead of getting it from store's metadata repo. This causes SN - // to der-se of keys a couple of times. - updatedKeyBytes = ChunkingUtils.KEY_WITH_CHUNKING_SUFFIX_SERIALIZER.serializeNonChunkedKey(keyBytes); - } - LeaderProducedRecordContext leaderProducedRecordContext = LeaderProducedRecordContext - .newPutRecord(kafkaClusterId, consumerRecord.getOffset(), updatedKeyBytes, updatedPut); + LeaderProducedRecordContext leaderProducedRecordContext = + LeaderProducedRecordContext.newPutRecord(kafkaClusterId, consumerRecord.getOffset(), keyBytes, updatedPut); - BiConsumer produce = + BiConsumer produceFunction = (callback, leaderMetadataWrapper) -> veniceWriter.get() - .put(keyBytes, updatedValueBytes, readerValueSchemaId, callback, leaderMetadataWrapper); + .put( + keyBytes, + updatedValueBytes, + readerValueSchemaId, + callback, + leaderMetadataWrapper, + APP_DEFAULT_LOGICAL_TS, + null, + oldValueManifest, + null); produceToLocalKafka( consumerRecord, partitionConsumptionState, leaderProducedRecordContext, - produce, + produceFunction, subPartition, kafkaUrl, kafkaClusterId, @@ -2900,7 +2911,8 @@ private GenericRecord readStoredValueRecord( PartitionConsumptionState partitionConsumptionState, byte[] keyBytes, int readerValueSchemaID, - PubSubTopicPartition topicPartition) { + PubSubTopicPartition topicPartition, + ChunkedValueManifestContainer manifestContainer) { final GenericRecord currValue; PartitionConsumptionState.TransientRecord transientRecord = partitionConsumptionState.getTransientRecord(keyBytes); if (transientRecord == null) { @@ -2916,7 +2928,8 @@ private GenericRecord readStoredValueRecord( null, readerValueSchemaID, storeDeserializerCache, - compressor.get()); + compressor.get(), + manifestContainer); hostLevelIngestionStats.recordWriteComputeLookUpLatency(LatencyUtils.getLatencyInMS(lookupStartTimeInNS)); } catch (Exception e) { writeComputeFailureCode = StatsErrorCode.WRITE_COMPUTE_DESERIALIZATION_FAILURE.code; @@ -2937,6 +2950,10 @@ private GenericRecord readStoredValueRecord( writeComputeFailureCode = StatsErrorCode.WRITE_COMPUTE_DESERIALIZATION_FAILURE.code; throw e; } + if (manifestContainer != null) { + manifestContainer.setManifest(transientRecord.getValueManifest()); + } + } else { currValue = null; } @@ -3121,7 +3138,7 @@ protected Lazy> getVeniceWriter() { } // test method - protected void addPartititionConsumptionState(Integer partition, PartitionConsumptionState pcs) { + protected void addPartitionConsumptionState(Integer partition, PartitionConsumptionState pcs) { partitionConsumptionStateMap.put(partition, pcs); } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderProducedRecordContext.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderProducedRecordContext.java index 5cd3ed747f..4cff79e933 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderProducedRecordContext.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderProducedRecordContext.java @@ -103,6 +103,10 @@ public static LeaderProducedRecordContext newChunkPutRecord(byte[] keyBytes, Put return new LeaderProducedRecordContext(NO_UPSTREAM, NO_UPSTREAM, PUT, keyBytes, valueUnion); } + public static LeaderProducedRecordContext newChunkDeleteRecord(byte[] keyBytes, Delete valueUnion) { + return new LeaderProducedRecordContext(NO_UPSTREAM, NO_UPSTREAM, DELETE, keyBytes, valueUnion); + } + public static LeaderProducedRecordContext newPutRecordWithFuture( int consumedKafkaClusterId, long consumedOffset, diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderProducerCallback.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderProducerCallback.java index 1ffa3e4953..2aa601895b 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderProducerCallback.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderProducerCallback.java @@ -4,6 +4,7 @@ import com.linkedin.davinci.store.record.ValueRecord; import com.linkedin.venice.exceptions.VeniceException; +import com.linkedin.venice.kafka.protocol.Delete; import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope; import com.linkedin.venice.kafka.protocol.Put; import com.linkedin.venice.message.KafkaKey; @@ -16,6 +17,7 @@ import com.linkedin.venice.utils.LatencyUtils; import com.linkedin.venice.utils.RedundantExceptionFilter; import com.linkedin.venice.writer.ChunkAwareCallback; +import com.linkedin.venice.writer.VeniceWriter; import java.nio.ByteBuffer; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -42,7 +44,7 @@ class LeaderProducerCallback implements ChunkAwareCallback { /** * The mutable fields below are determined by the {@link com.linkedin.venice.writer.VeniceWriter}, * which populates them via: - * {@link ChunkAwareCallback#setChunkingInfo(byte[], ByteBuffer[], ChunkedValueManifest, ByteBuffer[], ChunkedValueManifest)} + * {@link ChunkAwareCallback#setChunkingInfo(byte[], ByteBuffer[], ChunkedValueManifest, ByteBuffer[], ChunkedValueManifest, ChunkedValueManifest, ChunkedValueManifest)} */ private byte[] key = null; private ChunkedValueManifest chunkedValueManifest = null; @@ -50,6 +52,9 @@ class LeaderProducerCallback implements ChunkAwareCallback { protected ChunkedValueManifest chunkedRmdManifest = null; private ByteBuffer[] rmdChunks = null; + protected ChunkedValueManifest oldValueManifest = null; + protected ChunkedValueManifest oldRmdManifest = null; + public LeaderProducerCallback( LeaderFollowerStoreIngestionTask ingestionTask, PubSubMessage sourceConsumerRecord, @@ -136,21 +141,20 @@ public void onCompletion(PubSubProduceResult produceResult, Exception e) { currentTimeForMetricsMs); } } - + // update the keyBytes for the ProducedRecord in case it was changed due to isChunkingEnabled flag in + // VeniceWriter. + if (key != null) { + leaderProducedRecordContext.setKeyBytes(key); + } int producedRecordNum = 0; int producedRecordSize = 0; // produce to drainer buffer service for further processing. try { /** * queue the leaderProducedRecordContext to drainer service as is in case the value was not chunked. - * Otherwise queue the chunks and manifest individually to drainer service. + * Otherwise, queue the chunks and manifest individually to drainer service. */ if (chunkedValueManifest == null) { - // update the keyBytes for the ProducedRecord in case it was changed due to isChunkingEnabled flag in - // VeniceWriter. - if (key != null) { - leaderProducedRecordContext.setKeyBytes(key); - } leaderProducedRecordContext.setProducedOffset(produceResult.getOffset()); ingestionTask.produceToStoreBufferService( sourceConsumerRecord, @@ -200,6 +204,8 @@ public void onCompletion(PubSubProduceResult produceResult, Exception e) { producedRecordNum++; producedRecordSize += key.length + manifest.remaining(); } + produceDeprecatedChunkDeletionToStoreBufferService(oldValueManifest, currentTimeForMetricsMs); + produceDeprecatedChunkDeletionToStoreBufferService(oldRmdManifest, currentTimeForMetricsMs); recordProducerStats(producedRecordSize, producedRecordNum); } catch (Exception oe) { @@ -233,12 +239,16 @@ public void setChunkingInfo( ByteBuffer[] valueChunks, ChunkedValueManifest chunkedValueManifest, ByteBuffer[] rmdChunks, - ChunkedValueManifest chunkedRmdManifest) { + ChunkedValueManifest chunkedRmdManifest, + ChunkedValueManifest oldValueManifest, + ChunkedValueManifest oldRmdManifest) { this.key = key; this.chunkedValueManifest = chunkedValueManifest; this.valueChunks = valueChunks; this.chunkedRmdManifest = chunkedRmdManifest; this.rmdChunks = rmdChunks; + this.oldValueManifest = oldValueManifest; + this.oldRmdManifest = oldRmdManifest; } private void recordProducerStats(int producedRecordSize, int producedRecordNum) { @@ -296,4 +306,28 @@ private long produceChunksToStoreBufferService( } return totalChunkSize; } + + void produceDeprecatedChunkDeletionToStoreBufferService(ChunkedValueManifest manifest, long currentTimeForMetricsMs) + throws InterruptedException { + if (manifest == null) { + return; + } + for (int i = 0; i < manifest.keysWithChunkIdSuffix.size(); i++) { + ByteBuffer chunkKey = manifest.keysWithChunkIdSuffix.get(i); + Delete chunkDelete = new Delete(); + chunkDelete.schemaId = AvroProtocolDefinition.CHUNK.getCurrentProtocolVersion(); + chunkDelete.replicationMetadataVersionId = VeniceWriter.VENICE_DEFAULT_TIMESTAMP_METADATA_VERSION_ID; + chunkDelete.replicationMetadataPayload = EMPTY_BYTE_BUFFER; + LeaderProducedRecordContext producedRecordForChunk = + LeaderProducedRecordContext.newChunkDeleteRecord(ByteUtils.extractByteArray(chunkKey), chunkDelete); + producedRecordForChunk.setProducedOffset(-1); + ingestionTask.produceToStoreBufferService( + sourceConsumerRecord, + producedRecordForChunk, + subPartition, + kafkaUrl, + beforeProcessingRecordTimestampNs, + currentTimeForMetricsMs); + } + } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionState.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionState.java index 26036d2080..a509ac0c0a 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionState.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionState.java @@ -12,6 +12,7 @@ import com.linkedin.venice.pubsub.PubSubTopicRepository; import com.linkedin.venice.pubsub.api.PubSubTopic; import com.linkedin.venice.pubsub.api.PubSubTopicPartition; +import com.linkedin.venice.storage.protocol.ChunkedValueManifest; import com.linkedin.venice.utils.PartitionUtils; import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; import java.nio.ByteBuffer; @@ -463,7 +464,9 @@ public void setTransientRecord( long kafkaConsumedOffset, byte[] key, int valueSchemaId, - GenericRecord replicationMetadataRecord) { + GenericRecord replicationMetadataRecord, + ChunkedValueManifest valueManifest, + ChunkedValueManifest rmdManifest) { setTransientRecord( kafkaClusterId, kafkaConsumedOffset, @@ -472,7 +475,9 @@ public void setTransientRecord( -1, -1, valueSchemaId, - replicationMetadataRecord); + replicationMetadataRecord, + valueManifest, + rmdManifest); } public void setTransientRecord( @@ -483,9 +488,18 @@ public void setTransientRecord( int valueOffset, int valueLen, int valueSchemaId, - GenericRecord replicationMetadataRecord) { - TransientRecord transientRecord = - new TransientRecord(value, valueOffset, valueLen, valueSchemaId, kafkaClusterId, kafkaConsumedOffset); + GenericRecord replicationMetadataRecord, + ChunkedValueManifest valueManifest, + ChunkedValueManifest rmdManifest) { + TransientRecord transientRecord = new TransientRecord( + value, + valueOffset, + valueLen, + valueSchemaId, + kafkaClusterId, + kafkaConsumedOffset, + valueManifest, + rmdManifest); if (replicationMetadataRecord != null) { transientRecord.setReplicationMetadataRecord(replicationMetadataRecord); } @@ -563,19 +577,34 @@ public static class TransientRecord { private final long kafkaConsumedOffset; private GenericRecord replicationMetadataRecord; + private ChunkedValueManifest valueManifest; + private ChunkedValueManifest rmdManifest; + TransientRecord( byte[] value, int valueOffset, int valueLen, int valueSchemaId, int kafkaClusterId, - long kafkaConsumedOffset) { + long kafkaConsumedOffset, + ChunkedValueManifest valueManifest, + ChunkedValueManifest rmdManifest) { this.value = value; this.valueOffset = valueOffset; this.valueLen = valueLen; this.valueSchemaId = valueSchemaId; this.kafkaClusterId = kafkaClusterId; this.kafkaConsumedOffset = kafkaConsumedOffset; + this.valueManifest = valueManifest; + this.rmdManifest = rmdManifest; + } + + public ChunkedValueManifest getRmdManifest() { + return rmdManifest; + } + + public ChunkedValueManifest getValueManifest() { + return valueManifest; } public void setReplicationMetadataRecord(GenericRecord replicationMetadataRecord) { diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java index 6bf7a2f85c..73727d1347 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java @@ -2924,7 +2924,6 @@ private int processKafkaDataMessage( MessageType messageType = (leaderProducedRecordContext == null ? MessageType.valueOf(kafkaValue) : leaderProducedRecordContext.getMessageType()); - switch (messageType) { case PUT: // If single-threaded, we can re-use (and clobber) the same Put instance. // TODO: explore GC tuning later. diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/replication/RmdWithValueSchemaId.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/replication/RmdWithValueSchemaId.java index 5351be6033..b4ed0c22d0 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/replication/RmdWithValueSchemaId.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/replication/RmdWithValueSchemaId.java @@ -1,5 +1,6 @@ package com.linkedin.davinci.replication; +import com.linkedin.venice.storage.protocol.ChunkedValueManifest; import org.apache.avro.generic.GenericRecord; @@ -10,9 +11,22 @@ * 3. Value schema ID used to generate the RMD schema. */ public class RmdWithValueSchemaId { - private final int valueSchemaID; - private final int rmdProtocolVersionID; - private final GenericRecord rmdRecord; + private int valueSchemaID; + private int rmdProtocolVersionID; + private GenericRecord rmdRecord; + + private ChunkedValueManifest rmdManifest; + + public RmdWithValueSchemaId( + int valueSchemaID, + int rmdProtocolVersionID, + GenericRecord rmdRecord, + ChunkedValueManifest rmdManifest) { + this.valueSchemaID = valueSchemaID; + this.rmdRecord = rmdRecord; + this.rmdProtocolVersionID = rmdProtocolVersionID; + this.rmdManifest = rmdManifest; + } public RmdWithValueSchemaId(int valueSchemaID, int rmdProtocolVersionID, GenericRecord rmdRecord) { this.valueSchemaID = valueSchemaID; @@ -20,6 +34,25 @@ public RmdWithValueSchemaId(int valueSchemaID, int rmdProtocolVersionID, Generic this.rmdProtocolVersionID = rmdProtocolVersionID; } + public RmdWithValueSchemaId() { + } + + public void setValueSchemaID(int valueSchemaID) { + this.valueSchemaID = valueSchemaID; + } + + public void setRmdProtocolVersionID(int rmdProtocolVersionID) { + this.rmdProtocolVersionID = rmdProtocolVersionID; + } + + public void setRmdRecord(GenericRecord rmdRecord) { + this.rmdRecord = rmdRecord; + } + + public void setRmdManifest(ChunkedValueManifest rmdManifest) { + this.rmdManifest = rmdManifest; + } + public GenericRecord getRmdRecord() { return rmdRecord; } @@ -31,4 +64,8 @@ public int getValueSchemaId() { public int getRmdProtocolVersionID() { return rmdProtocolVersionID; } + + public ChunkedValueManifest getRmdManifest() { + return rmdManifest; + } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/replication/merge/RmdSerDe.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/replication/merge/RmdSerDe.java index 9e57272a41..96011d8477 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/replication/merge/RmdSerDe.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/replication/merge/RmdSerDe.java @@ -44,11 +44,12 @@ public RmdSerDe(StringAnnotatedStoreSchemaCache annotatedStoreSchemaCache, int r } /** - * @param valueSchemaIdPrependedBytes The raw bytes with value schema ID prepended. - * @return A {@link RmdWithValueSchemaId} object composed by extracting the value schema ID from the - * * header of the replication metadata. + * This method takes in the RMD bytes with prepended value schema ID and a {@link RmdWithValueSchemaId} container object. + * It will deserialize the RMD bytes into RMD record and fill the passed-in container. */ - public RmdWithValueSchemaId deserializeValueSchemaIdPrependedRmdBytes(byte[] valueSchemaIdPrependedBytes) { + public void deserializeValueSchemaIdPrependedRmdBytes( + byte[] valueSchemaIdPrependedBytes, + RmdWithValueSchemaId rmdWithValueSchemaId) { Validate.notNull(valueSchemaIdPrependedBytes); ByteBuffer rmdWithValueSchemaID = ByteBuffer.wrap(valueSchemaIdPrependedBytes); final int valueSchemaId = rmdWithValueSchemaID.getInt(); @@ -58,7 +59,9 @@ public RmdWithValueSchemaId deserializeValueSchemaIdPrependedRmdBytes(byte[] val rmdWithValueSchemaID.position(), rmdWithValueSchemaID.remaining()); GenericRecord rmdRecord = getRmdDeserializer(valueSchemaId, valueSchemaId).deserialize(binaryDecoder); - return new RmdWithValueSchemaId(valueSchemaId, rmdVersionId, rmdRecord); + rmdWithValueSchemaId.setValueSchemaID(valueSchemaId); + rmdWithValueSchemaId.setRmdProtocolVersionID(rmdVersionId); + rmdWithValueSchemaId.setRmdRecord(rmdRecord); } /** diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/storage/chunking/AbstractAvroChunkingAdapter.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/storage/chunking/AbstractAvroChunkingAdapter.java index d170984d1c..d9ef529740 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/storage/chunking/AbstractAvroChunkingAdapter.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/storage/chunking/AbstractAvroChunkingAdapter.java @@ -106,7 +106,8 @@ public T get( ReadResponse response, int readerSchemaId, StoreDeserializerCache storeDeserializerCache, - VeniceCompressor compressor) { + VeniceCompressor compressor, + ChunkedValueManifestContainer manifestContainer) { if (isChunked) { key = ChunkingUtils.KEY_WITH_CHUNKING_SUFFIX_SERIALIZER.serializeNonChunkedKey(key); } @@ -121,7 +122,8 @@ public T get( readerSchemaId, storeDeserializerCache, compressor, - false); + false, + manifestContainer); } public T get( diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/storage/chunking/ChunkedValueManifestContainer.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/storage/chunking/ChunkedValueManifestContainer.java new file mode 100644 index 0000000000..22cc693c3c --- /dev/null +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/storage/chunking/ChunkedValueManifestContainer.java @@ -0,0 +1,19 @@ +package com.linkedin.davinci.storage.chunking; + +import com.linkedin.venice.storage.protocol.ChunkedValueManifest; + + +public class ChunkedValueManifestContainer { + private ChunkedValueManifest manifest; + + public ChunkedValueManifestContainer() { + } + + public void setManifest(ChunkedValueManifest manifest) { + this.manifest = manifest; + } + + public ChunkedValueManifest getManifest() { + return manifest; + } +} diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/storage/chunking/ChunkingUtils.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/storage/chunking/ChunkingUtils.java index 7ba5cc32ae..dd5dd8297b 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/storage/chunking/ChunkingUtils.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/storage/chunking/ChunkingUtils.java @@ -73,7 +73,7 @@ static VALUE getFromStorage( int partition, ByteBuffer keyBuffer, ReadResponse response) { - return getFromStorage(adapter, store, partition, keyBuffer, response, null, null, -1, null, null, false); + return getFromStorage(adapter, store, partition, keyBuffer, response, null, null, -1, null, null, false, null); } static VALUE getReplicationMetadataFromStorage( @@ -81,8 +81,21 @@ static VALUE getReplicationMetadataFromStorag AbstractStorageEngine store, int partition, ByteBuffer keyBuffer, - ReadResponse response) { - return getFromStorage(adapter, store, partition, keyBuffer, response, null, null, -1, null, null, true); + ReadResponse response, + ChunkedValueManifestContainer manifestContainer) { + return getFromStorage( + adapter, + store, + partition, + keyBuffer, + response, + null, + null, + -1, + null, + null, + true, + manifestContainer); } static VALUE getFromStorage( @@ -115,7 +128,8 @@ static VALUE getFromStorage( readerSchemaId, storeDeserializerCache, compressor, - false); + false, + null); } static void getFromStorageByPartialKey( @@ -205,7 +219,8 @@ static VALUE getFromStorage( int readerSchemaID, StoreDeserializerCache storeDeserializerCache, VeniceCompressor compressor, - boolean isRmdValue) { + boolean isRmdValue, + ChunkedValueManifestContainer manifestContainer) { long databaseLookupStartTimeInNS = (response != null) ? System.nanoTime() : 0; byte[] value = isRmdValue ? store.getReplicationMetadata(partition, keyBuffer.array()) : store.get(partition, keyBuffer); @@ -223,7 +238,24 @@ static VALUE getFromStorage( readerSchemaID, storeDeserializerCache, compressor, - isRmdValue); + isRmdValue, + manifestContainer); + } + + public static ChunkedValueManifest getChunkValueManifestFromStorage( + byte[] key, + int partition, + boolean isRmd, + AbstractStorageEngine store) { + byte[] value = isRmd ? store.getReplicationMetadata(partition, key) : store.get(partition, key); + if (value == null) { + return null; + } + int writerSchemaId = ValueRecord.parseSchemaId(value); + if (writerSchemaId > 0) { + return null; + } + return CHUNKED_VALUE_MANIFEST_SERIALIZER.deserialize(value, writerSchemaId); } /** @@ -251,7 +283,8 @@ private static VALUE getFromStorage( int readerSchemaId, StoreDeserializerCache storeDeserializerCache, VeniceCompressor compressor, - boolean isRmdValue) { + boolean isRmdValue, + ChunkedValueManifestContainer manifestContainer) { if (value == null) { return null; @@ -281,6 +314,9 @@ private static VALUE getFromStorage( // End of initial sanity checks. We have a chunked value, so we need to fetch all chunks ChunkedValueManifest chunkedValueManifest = CHUNKED_VALUE_MANIFEST_SERIALIZER.deserialize(value, writerSchemaId); + if (manifestContainer != null) { + manifestContainer.setManifest(chunkedValueManifest); + } CHUNKS_CONTAINER assembledValueContainer = adapter.constructChunksContainer(chunkedValueManifest); int actualSize = 0; diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/storage/chunking/SingleGetChunkingAdapter.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/storage/chunking/SingleGetChunkingAdapter.java index d3b79dc86b..944c39416b 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/storage/chunking/SingleGetChunkingAdapter.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/storage/chunking/SingleGetChunkingAdapter.java @@ -56,11 +56,17 @@ public static ValueRecord getReplicationMetadata( int partition, byte[] key, boolean isChunked, - ReadResponse response) { + ReadResponse response, + ChunkedValueManifestContainer manifestContainer) { ByteBuffer keyBuffer = isChunked ? ChunkingUtils.KEY_WITH_CHUNKING_SUFFIX_SERIALIZER.serializeNonChunkedKeyAsByteBuffer(key) : ByteBuffer.wrap(key); - return ChunkingUtils - .getReplicationMetadataFromStorage(SINGLE_GET_CHUNKING_ADAPTER, store, partition, keyBuffer, response); + return ChunkingUtils.getReplicationMetadataFromStorage( + SINGLE_GET_CHUNKING_ADAPTER, + store, + partition, + keyBuffer, + response, + manifestContainer); } } diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTaskTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTaskTest.java index 5665f7356f..c2d488fa4e 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTaskTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTaskTest.java @@ -23,6 +23,7 @@ import com.linkedin.davinci.stats.AggVersionedIngestionStats; import com.linkedin.davinci.stats.HostLevelIngestionStats; import com.linkedin.davinci.storage.StorageEngineRepository; +import com.linkedin.davinci.storage.chunking.ChunkedValueManifestContainer; import com.linkedin.davinci.storage.chunking.ChunkingUtils; import com.linkedin.davinci.store.AbstractStorageEngine; import com.linkedin.davinci.store.blackhole.BlackHoleStorageEngine; @@ -165,7 +166,7 @@ public void testisReadyToServeAnnouncedWithRTLag() { when(badPartitionConsumptionState.hasLagCaughtUp()).thenReturn(true); // short circuit isReadyToServe when(badPartitionConsumptionState.isEndOfPushReceived()).thenReturn(false); - ingestionTask.addPartititionConsumptionState(1, badPartitionConsumptionState); + ingestionTask.addPartitionConsumptionState(1, badPartitionConsumptionState); Assert.assertTrue(ingestionTask.isReadyToServeAnnouncedWithRTLag()); @@ -173,11 +174,11 @@ public void testisReadyToServeAnnouncedWithRTLag() { when(goodPartitionConsumptionState.hasLagCaughtUp()).thenReturn(true); when(goodPartitionConsumptionState.isEndOfPushReceived()).thenReturn(true); when(goodPartitionConsumptionState.isWaitingForReplicationLag()).thenReturn(false); - ingestionTask.addPartititionConsumptionState(1, goodPartitionConsumptionState); + ingestionTask.addPartitionConsumptionState(1, goodPartitionConsumptionState); Assert.assertFalse(ingestionTask.isReadyToServeAnnouncedWithRTLag()); - ingestionTask.addPartititionConsumptionState(2, badPartitionConsumptionState); + ingestionTask.addPartitionConsumptionState(2, badPartitionConsumptionState); Assert.assertTrue(ingestionTask.isReadyToServeAnnouncedWithRTLag()); } @@ -201,7 +202,8 @@ public void testLeaderCanSendValueChunksIntoDrainer() when(ingestionTask.getKafkaVersionTopic()).thenReturn(testTopic); when(ingestionTask.createProducerCallback(any(), any(), any(), anyInt(), anyString(), anyLong())) .thenCallRealMethod(); - when(ingestionTask.getProduceToTopicFunction(any(), any(), any(), anyInt(), anyBoolean())).thenCallRealMethod(); + when(ingestionTask.getProduceToTopicFunction(any(), any(), any(), any(), any(), anyInt(), anyBoolean())) + .thenCallRealMethod(); when(ingestionTask.getRmdProtocolVersionID()).thenReturn(rmdProtocolVersionID); doCallRealMethod().when(ingestionTask) .produceToLocalKafka(any(), any(), any(), any(), anyInt(), anyString(), anyInt(), anyLong()); @@ -277,6 +279,8 @@ public void testLeaderCanSendValueChunksIntoDrainer() updatedKeyBytes, updatedValueBytes, updatedRmdBytes, + null, + null, valueSchemaId, resultReuseInput), subPartition, @@ -376,13 +380,15 @@ public void testReadingChunkedRmdFromStorage() { when(ingestionTask.getStorageEngine()).thenReturn(storageEngine); when(ingestionTask.getSchemaRepo()).thenReturn(schemaRepository); when(ingestionTask.getServerConfig()).thenReturn(serverConfig); - when(ingestionTask.getRmdWithValueSchemaByteBufferFromStorage(anyInt(), any(), anyLong())).thenCallRealMethod(); + when(ingestionTask.getRmdWithValueSchemaByteBufferFromStorage(anyInt(), any(), any(), anyLong())) + .thenCallRealMethod(); when(ingestionTask.isChunked()).thenReturn(true); when(ingestionTask.getHostLevelIngestionStats()).thenReturn(mock(HostLevelIngestionStats.class)); - + ChunkedValueManifestContainer container = new ChunkedValueManifestContainer(); when(storageEngine.getReplicationMetadata(subPartition, topLevelKey1)).thenReturn(expectedNonChunkedValue); - byte[] result = ingestionTask.getRmdWithValueSchemaByteBufferFromStorage(subPartition, key1, 0L); + byte[] result = ingestionTask.getRmdWithValueSchemaByteBufferFromStorage(subPartition, key1, container, 0L); Assert.assertNotNull(result); + Assert.assertNull(container.getManifest()); Assert.assertEquals(result, expectedNonChunkedValue); /** @@ -414,8 +420,10 @@ public void testReadingChunkedRmdFromStorage() { when(storageEngine.getReplicationMetadata(subPartition, topLevelKey2)).thenReturn(chunkedManifestWithSchemaBytes); when(storageEngine.getReplicationMetadata(subPartition, chunkedKey1InKey2)).thenReturn(chunkedValue1); - byte[] result2 = ingestionTask.getRmdWithValueSchemaByteBufferFromStorage(subPartition, key2, 0L); + byte[] result2 = ingestionTask.getRmdWithValueSchemaByteBufferFromStorage(subPartition, key2, container, 0L); Assert.assertNotNull(result2); + Assert.assertNotNull(container.getManifest()); + Assert.assertEquals(container.getManifest().getKeysWithChunkIdSuffix().size(), 1); Assert.assertEquals(result2, expectedChunkedValue1); /** @@ -453,8 +461,10 @@ public void testReadingChunkedRmdFromStorage() { when(storageEngine.getReplicationMetadata(subPartition, topLevelKey3)).thenReturn(chunkedManifestWithSchemaBytes); when(storageEngine.getReplicationMetadata(subPartition, chunkedKey1InKey3)).thenReturn(chunkedValue1); when(storageEngine.getReplicationMetadata(subPartition, chunkedKey2InKey3)).thenReturn(chunkedValue2); - byte[] result3 = ingestionTask.getRmdWithValueSchemaByteBufferFromStorage(subPartition, key3, 0L); + byte[] result3 = ingestionTask.getRmdWithValueSchemaByteBufferFromStorage(subPartition, key3, container, 0L); Assert.assertNotNull(result3); + Assert.assertNotNull(container.getManifest()); + Assert.assertEquals(container.getManifest().getKeysWithChunkIdSuffix().size(), 2); Assert.assertEquals(result3, expectedChunkedValue2); } } diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/LeaderProducerCallbackTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/LeaderProducerCallbackTest.java index 5b4fb41ba4..61ed2cae97 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/LeaderProducerCallbackTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/LeaderProducerCallbackTest.java @@ -1,7 +1,13 @@ package com.linkedin.davinci.kafka.consumer; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; @@ -10,8 +16,11 @@ import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope; import com.linkedin.venice.message.KafkaKey; import com.linkedin.venice.pubsub.api.PubSubMessage; +import com.linkedin.venice.storage.protocol.ChunkedValueManifest; import com.linkedin.venice.utils.InMemoryLogAppender; import com.linkedin.venice.utils.Utils; +import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import org.apache.logging.log4j.LogManager; @@ -84,4 +93,30 @@ public void testOnCompletionWithNonNullException() { inMemoryLogAppender.stop(); } } + + @Test + public void testLeaderProducerCallbackProduceDeprecatedChunkDeletion() throws InterruptedException { + LeaderFollowerStoreIngestionTask storeIngestionTask = mock(LeaderFollowerStoreIngestionTask.class); + PubSubMessage sourceConsumerRecord = mock(PubSubMessage.class); + PartitionConsumptionState partitionConsumptionState = mock(PartitionConsumptionState.class); + LeaderProducedRecordContext leaderProducedRecordContext = mock(LeaderProducedRecordContext.class); + LeaderProducerCallback leaderProducerCallback = new LeaderProducerCallback( + storeIngestionTask, + sourceConsumerRecord, + partitionConsumptionState, + leaderProducedRecordContext, + 0, + "url", + 0); + + ChunkedValueManifest manifest = new ChunkedValueManifest(); + manifest.keysWithChunkIdSuffix = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + manifest.keysWithChunkIdSuffix.add(ByteBuffer.wrap(new byte[] { 0xa, 0xb })); + } + leaderProducerCallback.produceDeprecatedChunkDeletionToStoreBufferService(manifest, 0); + verify(storeIngestionTask, times(10)) + .produceToStoreBufferService(any(), any(), anyInt(), anyString(), anyLong(), anyLong()); + + } } diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionStateTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionStateTest.java index a9c0264b3c..808e5cb150 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionStateTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionStateTest.java @@ -29,7 +29,7 @@ public void testTransientRecordMap() { Schema aaSchema = RmdSchemaGenerator.generateMetadataSchema(schema, 1); GenericRecord record = new GenericData.Record(aaSchema); // Test removal succeeds if the key is specified with same kafkaConsumedOffset - pcs.setTransientRecord(-1, 1, key1, 5, record); + pcs.setTransientRecord(-1, 1, key1, 5, record, null, null); PartitionConsumptionState.TransientRecord tr1 = pcs.getTransientRecord(key2); Assert.assertEquals(tr1.getValue(), null); Assert.assertEquals(tr1.getValueLen(), -1); @@ -43,10 +43,10 @@ public void testTransientRecordMap() { Assert.assertEquals(pcs.getTransientRecordMapSize(), 0); // Test removal fails if the key is specified with same kafkaConsumedOffset - pcs.setTransientRecord(-1, 1, key1, value1, 100, value1.length, 5, null); - pcs.setTransientRecord(-1, 2, key3, 5, null); + pcs.setTransientRecord(-1, 1, key1, value1, 100, value1.length, 5, null, null, null); + pcs.setTransientRecord(-1, 2, key3, 5, null, null, null); Assert.assertEquals(pcs.getTransientRecordMapSize(), 2); - pcs.setTransientRecord(-1, 3, key1, value2, 100, value2.length, 5, null); + pcs.setTransientRecord(-1, 3, key1, value2, 100, value2.length, 5, null, null, null); tr2 = pcs.mayRemoveTransientRecord(-1, 1, key1); Assert.assertNotNull(tr2); diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/replication/merge/RmdSerDeTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/replication/merge/RmdSerDeTest.java index 75ba9bd8cb..aa4c22a8e3 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/replication/merge/RmdSerDeTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/replication/merge/RmdSerDeTest.java @@ -64,8 +64,8 @@ public void testSerDeRmd() { rmdAndValueSchemaIDBytes.put(rmdBytes.array()); // Deserialize all bytes and expect to get value schema ID and RMD record back. - RmdWithValueSchemaId rmdAndValueID = - rmdSerDe.deserializeValueSchemaIdPrependedRmdBytes(rmdAndValueSchemaIDBytes.array()); + RmdWithValueSchemaId rmdAndValueID = new RmdWithValueSchemaId(); + rmdSerDe.deserializeValueSchemaIdPrependedRmdBytes(rmdAndValueSchemaIDBytes.array(), rmdAndValueID); Assert.assertEquals(rmdAndValueID.getValueSchemaId(), valueSchemaID); Assert.assertEquals(rmdAndValueID.getRmdRecord(), rmd); } diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/storage/chunking/ChunkingTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/storage/chunking/ChunkingTest.java index 0902621462..29772db722 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/storage/chunking/ChunkingTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/storage/chunking/ChunkingTest.java @@ -231,7 +231,8 @@ private void runTest( null, readerSchemaId, storeDeserializerCache, - compressor)); + compressor, + null)); } } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/writer/ChunkAwareCallback.java b/internal/venice-common/src/main/java/com/linkedin/venice/writer/ChunkAwareCallback.java index 7c19f5993a..9fb5c21481 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/writer/ChunkAwareCallback.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/writer/ChunkAwareCallback.java @@ -8,7 +8,7 @@ /** * The {@link VeniceWriter}, upon detecting an instance of this class being passed to it, will always call - * {@link #setChunkingInfo(byte[], ByteBuffer[], ChunkedValueManifest, ByteBuffer[], ChunkedValueManifest)} whenever + * {@link #setChunkingInfo(byte[], ByteBuffer[], ChunkedValueManifest, ByteBuffer[], ChunkedValueManifest, ChunkedValueManifest, ChunkedValueManifest)} whenever * processing a {@link MessageType#PUT}, whether it is chunked or not. */ public interface ChunkAwareCallback extends PubSubProducerCallback { @@ -19,12 +19,18 @@ public interface ChunkAwareCallback extends PubSubProducerCallback { * * @param key A byte[] corresponding to the top-level key written to Kafka, potentially including a chunking suffix * @param valueChunks An array of {@link ByteBuffer} where the backing array has sufficient headroom to prepend Venice's header - * @param chunkedValueManifest The {@link ChunkedValueManifest} of the chunked value + * @param chunkedValueManifest The {@link ChunkedValueManifest} of the new chunked value + * @param rmdChunks An array of {@link ByteBuffer} where the backing array has sufficient headroom to prepend Venice's header + * @param chunkedRmdManifest The {@link ChunkedValueManifest} of the new chunked RMD + * @param oldValueManifest The {@link ChunkedValueManifest} of the previous chunked value + * @param oldRmdManifest The {@link ChunkedValueManifest} of the previous chunked RMD */ void setChunkingInfo( byte[] key, ByteBuffer[] valueChunks, ChunkedValueManifest chunkedValueManifest, ByteBuffer[] rmdChunks, - ChunkedValueManifest chunkedRmdManifest); + ChunkedValueManifest chunkedRmdManifest, + ChunkedValueManifest oldValueManifest, + ChunkedValueManifest oldRmdManifest); } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java b/internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java index f659791423..ba37f751e9 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java @@ -508,6 +508,45 @@ public Future delete(K key, PubSubProducerCallback callback return delete(key, callback, DEFAULT_LEADER_METADATA_WRAPPER, APP_DEFAULT_LOGICAL_TS, deleteMetadata); } + /** + * This method produces a DELETE request to a deprecated chunk key. + */ + public void deleteDeprecatedChunk( + byte[] serializedKey, + int partition, + PubSubProducerCallback callback, + LeaderMetadataWrapper leaderMetadataWrapper, + DeleteMetadata deleteMetadata) { + + KafkaKey kafkaKey = new KafkaKey(MessageType.DELETE, serializedKey); + Delete delete = new Delete(); + delete.schemaId = AvroProtocolDefinition.CHUNK.getCurrentProtocolVersion(); + if (deleteMetadata == null) { + delete.replicationMetadataVersionId = VENICE_DEFAULT_TIMESTAMP_METADATA_VERSION_ID; + delete.replicationMetadataPayload = EMPTY_BYTE_BUFFER; + } else { + delete.replicationMetadataVersionId = deleteMetadata.getRmdVersionId(); + delete.replicationMetadataPayload = deleteMetadata.getRmdPayload(); + } + sendMessage( + producerMetadata -> kafkaKey, + MessageType.DELETE, + delete, + partition, + callback, + leaderMetadataWrapper, + APP_DEFAULT_LOGICAL_TS); + } + + private Future delete( + K key, + PubSubProducerCallback callback, + LeaderMetadataWrapper leaderMetadataWrapper, + long logicalTs, + DeleteMetadata deleteMetadata) { + return delete(key, callback, leaderMetadataWrapper, logicalTs, deleteMetadata, null, null); + } + /** * Execute a standard "delete" on the key. * @@ -516,20 +555,22 @@ public Future delete(K key, PubSubProducerCallback callback * @param leaderMetadataWrapper - The leader Metadata of this message in the source topic: * -1: VeniceWriter is sending this message in a Samza app to the real-time topic; or it's * sending the message in VPJ plugin to the version topic; - * >=0: Leader replica consumes a delete message from real-time topic, VeniceWriter in leader + * >=0: Leader replica consumes a DELETE message from real-time topic, VeniceWriter in leader * is sending this message to version topic with extra info: offset in the real-time topic. * @param logicalTs - An timestamp field to indicate when this record was produced from apps point of view. * @param deleteMetadata - a DeleteMetadata containing replication metadata related fields (can be null). - * @return a java.util.concurrent.Future Future for the RecordMetadata that will be assigned to this + * @return a java.util.concurrent.Future. Future for the RecordMetadata that will be assigned to this * record. Invoking java.util.concurrent.Future's get() on this future will block until the associated request * completes and then return the metadata for the record or throw any exception that occurred while sending the record. */ - private Future delete( + public Future delete( K key, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper, long logicalTs, - DeleteMetadata deleteMetadata) { + DeleteMetadata deleteMetadata, + ChunkedValueManifest oldValueManifest, + ChunkedValueManifest oldRmdManifest) { byte[] serializedKey = keySerializer.serialize(topicName, key); int partition = getPartition(serializedKey); @@ -546,7 +587,8 @@ private Future delete( } if (callback instanceof ChunkAwareCallback) { - ((ChunkAwareCallback) callback).setChunkingInfo(serializedKey, null, null, null, null); + ((ChunkAwareCallback) callback) + .setChunkingInfo(serializedKey, null, null, null, null, oldValueManifest, oldRmdManifest); } KafkaKey kafkaKey = new KafkaKey(MessageType.DELETE, serializedKey); @@ -562,7 +604,7 @@ private Future delete( delete.replicationMetadataPayload = deleteMetadata.getRmdPayload(); } - return sendMessage( + Future produceResultFuture = sendMessage( producerMetadata -> kafkaKey, MessageType.DELETE, delete, @@ -570,6 +612,23 @@ private Future delete( callback, leaderMetadataWrapper, logicalTs); + PubSubProducerCallback chunkCallback = callback == null ? null : new ErrorPropagationCallback(callback); + DeleteMetadata deleteMetadataForOldChunk = + new DeleteMetadata(delete.schemaId, delete.replicationMetadataVersionId, VeniceWriter.EMPTY_BYTE_BUFFER); + deleteDeprecatedChunksFromManifest( + oldValueManifest, + partition, + chunkCallback, + leaderMetadataWrapper, + deleteMetadataForOldChunk); + deleteDeprecatedChunksFromManifest( + oldRmdManifest, + partition, + chunkCallback, + leaderMetadataWrapper, + deleteMetadataForOldChunk); + + return produceResultFuture; } /** @@ -579,7 +638,7 @@ private Future delete( * @param value - The value to be associated with the given key * @param valueSchemaId - value schema id for the given value * @param callback - Callback function invoked by Kafka producer after sending the message - * @return a java.util.concurrent.Future Future for the RecordMetadata that will be assigned to this + * @return a java.util.concurrent.Future. Future for the RecordMetadata that will be assigned to this * record. Invoking java.util.concurrent.Future's get() on this future will block until the associated request * completes and then return the metadata for the record or throw any exception that occurred while sending the record. */ @@ -644,6 +703,37 @@ public Future put( return put(key, value, valueSchemaId, callback, leaderMetadataWrapper, APP_DEFAULT_LOGICAL_TS, null); } + public Future put( + K key, + V value, + int valueSchemaId, + PubSubProducerCallback callback, + LeaderMetadataWrapper leaderMetadataWrapper, + ChunkedValueManifest oldValueManifest, + ChunkedValueManifest oldRmdManifest) { + return put( + key, + value, + valueSchemaId, + callback, + leaderMetadataWrapper, + APP_DEFAULT_LOGICAL_TS, + null, + oldValueManifest, + oldRmdManifest); + } + + public Future put( + K key, + V value, + int valueSchemaId, + PubSubProducerCallback callback, + LeaderMetadataWrapper leaderMetadataWrapper, + long logicalTs, + PutMetadata putMetadata) { + return put(key, value, valueSchemaId, callback, leaderMetadataWrapper, logicalTs, putMetadata, null, null); + } + /** * Execute a standard "put" on the key. * @@ -671,7 +761,9 @@ public Future put( PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper, long logicalTs, - PutMetadata putMetadata) { + PutMetadata putMetadata, + ChunkedValueManifest oldValueManifest, + ChunkedValueManifest oldRmdManifest) { byte[] serializedKey = keySerializer.serialize(topicName, key); byte[] serializedValue = valueSerializer.serialize(topicName, value); int partition = getPartition(serializedKey); @@ -689,7 +781,9 @@ public Future put( partition, leaderMetadataWrapper, logicalTs, - putMetadata); + putMetadata, + oldValueManifest, + oldRmdManifest); } else { throw new RecordTooLargeException( "This record exceeds the maximum size. " @@ -702,7 +796,8 @@ public Future put( } if (callback instanceof ChunkAwareCallback) { - ((ChunkAwareCallback) callback).setChunkingInfo(serializedKey, null, null, null, null); + ((ChunkAwareCallback) callback) + .setChunkingInfo(serializedKey, null, null, null, null, oldValueManifest, oldRmdManifest); } KafkaKey kafkaKey = new KafkaKey(MessageType.PUT, serializedKey); @@ -719,7 +814,7 @@ public Future put( putPayload.replicationMetadataVersionId = putMetadata.getRmdVersionId(); putPayload.replicationMetadataPayload = putMetadata.getRmdPayload(); } - return sendMessage( + Future produceResultFuture = sendMessage( producerMetadata -> kafkaKey, MessageType.PUT, putPayload, @@ -727,6 +822,18 @@ public Future put( callback, leaderMetadataWrapper, logicalTs); + DeleteMetadata deleteMetadata = + new DeleteMetadata(valueSchemaId, putPayload.replicationMetadataVersionId, VeniceWriter.EMPTY_BYTE_BUFFER); + PubSubProducerCallback chunkCallback = callback == null ? null : new ErrorPropagationCallback(callback); + deleteDeprecatedChunksFromManifest( + oldValueManifest, + partition, + chunkCallback, + leaderMetadataWrapper, + deleteMetadata); + deleteDeprecatedChunksFromManifest(oldRmdManifest, partition, chunkCallback, leaderMetadataWrapper, deleteMetadata); + + return produceResultFuture; } /** @@ -752,7 +859,7 @@ public Future put( getKafkaMessageEnvelopeProvider(kafkaMessageEnvelope, leaderMetadataWrapper); if (callback instanceof ChunkAwareCallback) { - ((ChunkAwareCallback) callback).setChunkingInfo(serializedKey, null, null, null, null); + ((ChunkAwareCallback) callback).setChunkingInfo(serializedKey, null, null, null, null, null, null); } return sendMessage(producerMetadata -> kafkaKey, kafkaMessageEnvelopeProvider, upstreamPartition, callback, false); @@ -787,7 +894,7 @@ public Future delete( if (callback instanceof ChunkAwareCallback) { byte[] serializedKey = kafkaKey.getKey(); - ((ChunkAwareCallback) callback).setChunkingInfo(serializedKey, null, null, null, null); + ((ChunkAwareCallback) callback).setChunkingInfo(serializedKey, null, null, null, null, null, null); } return sendMessage(producerMetadata -> kafkaKey, kafkaMessageEnvelopeProvider, upstreamPartition, callback, false); @@ -1178,7 +1285,9 @@ private Future putLargeValue( int partition, LeaderMetadataWrapper leaderMetadataWrapper, long logicalTs, - PutMetadata putMetadata) { + PutMetadata putMetadata, + ChunkedValueManifest oldValueManifest, + ChunkedValueManifest oldRmdManifest) { int replicationMetadataPayloadSize = putMetadata == null ? 0 : putMetadata.getSerializedSize(); final Supplier reportSizeGenerator = () -> getSizeReport(serializedKey.length, serializedValue.length, replicationMetadataPayloadSize); @@ -1249,12 +1358,14 @@ private Future putLargeValue( valueChunksAndManifest.getPayloadChunks(), valueChunksAndManifest.getChunkedValueManifest(), rmdChunksAndManifest.getPayloadChunks(), - rmdChunksAndManifest.getChunkedValueManifest()); + rmdChunksAndManifest.getChunkedValueManifest(), + oldValueManifest, + oldRmdManifest); } // We only return the last future (the one for the manifest) and assume that once this one is finished, // all the chunks should also be finished, since they were sent first, and ordering should be guaranteed. - return sendMessage( + Future manifestProduceFuture = sendMessage( manifestKeyProvider, MessageType.PUT, putManifestsPayload, @@ -1262,6 +1373,39 @@ private Future putLargeValue( callback, leaderMetadataWrapper, logicalTs); + + DeleteMetadata deleteMetadata = new DeleteMetadata( + valueSchemaId, + putManifestsPayload.replicationMetadataVersionId, + VeniceWriter.EMPTY_BYTE_BUFFER); + deleteDeprecatedChunksFromManifest( + oldValueManifest, + partition, + chunkCallback, + leaderMetadataWrapper, + deleteMetadata); + deleteDeprecatedChunksFromManifest(oldRmdManifest, partition, chunkCallback, leaderMetadataWrapper, deleteMetadata); + + return manifestProduceFuture; + } + + /** + * This method iterates over a {@link ChunkedValueManifest} object's chunk key list and issue DELETE request for each + * chunk. + */ + private void deleteDeprecatedChunksFromManifest( + ChunkedValueManifest manifest, + int partition, + PubSubProducerCallback chunkCallback, + LeaderMetadataWrapper leaderMetadataWrapper, + DeleteMetadata deleteMetadata) { + if (manifest == null) { + return; + } + for (int i = 0; i < manifest.keysWithChunkIdSuffix.size(); i++) { + byte[] chunkKeyBytes = manifest.keysWithChunkIdSuffix.get(i).array(); + deleteDeprecatedChunk(chunkKeyBytes, partition, chunkCallback, leaderMetadataWrapper, deleteMetadata); + } } private String getSizeReport(int serializedKeySize, int serializedValueSize, int replicationMetadataPayloadSize) { diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/writer/WriterChunkingHelper.java b/internal/venice-common/src/main/java/com/linkedin/venice/writer/WriterChunkingHelper.java index ce717a5924..49ff957a67 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/writer/WriterChunkingHelper.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/writer/WriterChunkingHelper.java @@ -22,7 +22,7 @@ * This class is a helper class that contains writer side chunking logics. */ public class WriterChunkingHelper { - private static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.allocate(0); + public static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.allocate(0); /** * This method chunks payload and send each chunk out. diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/writer/VeniceWriterUnitTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/writer/VeniceWriterUnitTest.java index 56ab6db0ff..defc4ae71b 100644 --- a/internal/venice-common/src/test/java/com/linkedin/venice/writer/VeniceWriterUnitTest.java +++ b/internal/venice-common/src/test/java/com/linkedin/venice/writer/VeniceWriterUnitTest.java @@ -1,19 +1,42 @@ package com.linkedin.venice.writer; +import static com.linkedin.venice.writer.VeniceWriter.APP_DEFAULT_LOGICAL_TS; +import static com.linkedin.venice.writer.VeniceWriter.DEFAULT_MAX_SIZE_FOR_USER_PAYLOAD_PER_MESSAGE_IN_BYTES; +import static com.linkedin.venice.writer.VeniceWriter.VENICE_DEFAULT_LOGICAL_TS; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import com.linkedin.venice.kafka.protocol.Delete; +import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope; +import com.linkedin.venice.kafka.protocol.ProducerMetadata; +import com.linkedin.venice.kafka.protocol.Put; +import com.linkedin.venice.kafka.protocol.enums.MessageType; +import com.linkedin.venice.message.KafkaKey; import com.linkedin.venice.partitioner.DefaultVenicePartitioner; import com.linkedin.venice.pubsub.api.PubSubProducerAdapter; +import com.linkedin.venice.serialization.KeyWithChunkingSuffixSerializer; import com.linkedin.venice.serialization.VeniceKafkaSerializer; +import com.linkedin.venice.serialization.avro.AvroProtocolDefinition; +import com.linkedin.venice.serialization.avro.ChunkedValueManifestSerializer; import com.linkedin.venice.serialization.avro.VeniceAvroKafkaSerializer; +import com.linkedin.venice.storage.protocol.ChunkId; +import com.linkedin.venice.storage.protocol.ChunkedKeySuffix; +import com.linkedin.venice.storage.protocol.ChunkedValueManifest; import com.linkedin.venice.utils.DataProviderUtils; +import com.linkedin.venice.utils.SystemTime; import com.linkedin.venice.utils.VeniceProperties; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Properties; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import java.util.concurrent.TimeoutException; import org.mockito.ArgumentCaptor; import org.testng.Assert; import org.testng.annotations.Test; @@ -60,4 +83,349 @@ public void testTargetPartitionIsSameForAllOperationsWithTheSameKey(boolean isCh Assert.assertEquals(putPartitionArgumentCaptor.getValue(), deletePartitionArgumentCaptor.getValue()); Assert.assertEquals(putPartitionArgumentCaptor.getValue(), updatePartitionArgumentCaptor.getValue()); } + + @Test + public void testDeleteDeprecatedChunk() throws ExecutionException, InterruptedException, TimeoutException { + PubSubProducerAdapter mockedProducer = mock(PubSubProducerAdapter.class); + Future mockedFuture = mock(Future.class); + when(mockedProducer.getNumberOfPartitions(any())).thenReturn(1); + when(mockedProducer.getNumberOfPartitions(any(), anyInt(), any())).thenReturn(1); + when(mockedProducer.sendMessage(any(), any(), any(), any(), any(), any())).thenReturn(mockedFuture); + String stringSchema = "\"string\""; + VeniceKafkaSerializer serializer = new VeniceAvroKafkaSerializer(stringSchema); + String testTopic = "test"; + VeniceWriterOptions veniceWriterOptions = new VeniceWriterOptions.Builder(testTopic).setKeySerializer(serializer) + .setValueSerializer(serializer) + .setWriteComputeSerializer(serializer) + .setPartitioner(new DefaultVenicePartitioner()) + .setTime(SystemTime.INSTANCE) + .setChunkingEnabled(true) + .setRmdChunkingEnabled(true) + .build(); + VeniceWriter writer = + new VeniceWriter(veniceWriterOptions, VeniceProperties.empty(), mockedProducer); + byte[] serializedKeyBytes = new byte[] { 0xa, 0xb }; + writer.deleteDeprecatedChunk(serializedKeyBytes, 0, null, VeniceWriter.DEFAULT_LEADER_METADATA_WRAPPER, null); + writer.deleteDeprecatedChunk( + serializedKeyBytes, + 0, + null, + VeniceWriter.DEFAULT_LEADER_METADATA_WRAPPER, + new DeleteMetadata( + AvroProtocolDefinition.CHUNK.getCurrentProtocolVersion(), + 1, + WriterChunkingHelper.EMPTY_BYTE_BUFFER)); + + ArgumentCaptor keyArgumentCaptor = ArgumentCaptor.forClass(KafkaKey.class); + ArgumentCaptor kmeArgumentCaptor = ArgumentCaptor.forClass(KafkaMessageEnvelope.class); + verify(mockedProducer, atLeast(3)) + .sendMessage(any(), any(), keyArgumentCaptor.capture(), kmeArgumentCaptor.capture(), any(), any()); + Assert.assertEquals(kmeArgumentCaptor.getAllValues().size(), 3); + KafkaMessageEnvelope actualValue1 = kmeArgumentCaptor.getAllValues().get(1); + Assert.assertEquals(actualValue1.messageType, MessageType.DELETE.getValue()); + Assert.assertEquals(((Delete) actualValue1.payloadUnion).schemaId, -10); + Assert.assertEquals(((Delete) actualValue1.payloadUnion).replicationMetadataVersionId, -1); + Assert.assertEquals( + ((Delete) actualValue1.payloadUnion).replicationMetadataPayload, + WriterChunkingHelper.EMPTY_BYTE_BUFFER); + KafkaMessageEnvelope actualValue2 = kmeArgumentCaptor.getAllValues().get(2); + Assert.assertEquals(actualValue2.messageType, MessageType.DELETE.getValue()); + Assert.assertEquals(((Delete) actualValue2.payloadUnion).schemaId, -10); + Assert.assertEquals(((Delete) actualValue2.payloadUnion).replicationMetadataVersionId, 1); + Assert.assertEquals( + ((Delete) actualValue2.payloadUnion).replicationMetadataPayload, + WriterChunkingHelper.EMPTY_BYTE_BUFFER); + } + + @Test(timeOut = 10000) + public void testReplicationMetadataChunking() throws ExecutionException, InterruptedException, TimeoutException { + PubSubProducerAdapter mockedProducer = mock(PubSubProducerAdapter.class); + Future mockedFuture = mock(Future.class); + when(mockedProducer.getNumberOfPartitions(any())).thenReturn(1); + when(mockedProducer.getNumberOfPartitions(any(), anyInt(), any())).thenReturn(1); + when(mockedProducer.sendMessage(any(), any(), any(), any(), any(), any())).thenReturn(mockedFuture); + String stringSchema = "\"string\""; + VeniceKafkaSerializer serializer = new VeniceAvroKafkaSerializer(stringSchema); + String testTopic = "test"; + VeniceWriterOptions veniceWriterOptions = new VeniceWriterOptions.Builder(testTopic).setKeySerializer(serializer) + .setValueSerializer(serializer) + .setWriteComputeSerializer(serializer) + .setPartitioner(new DefaultVenicePartitioner()) + .setTime(SystemTime.INSTANCE) + .setChunkingEnabled(true) + .setRmdChunkingEnabled(true) + .build(); + VeniceWriter writer = + new VeniceWriter(veniceWriterOptions, VeniceProperties.empty(), mockedProducer); + + ByteBuffer replicationMetadata = ByteBuffer.wrap(new byte[] { 0xa, 0xb }); + PutMetadata putMetadata = new PutMetadata(1, replicationMetadata); + + StringBuilder stringBuilder = new StringBuilder(); + for (int i = 0; i < 50000; i++) { + stringBuilder.append("abcdefghabcdefghabcdefghabcdefgh"); + } + String valueString = stringBuilder.toString(); + + writer.put( + Integer.toString(1), + valueString, + 1, + null, + VeniceWriter.DEFAULT_LEADER_METADATA_WRAPPER, + APP_DEFAULT_LOGICAL_TS, + putMetadata); + ArgumentCaptor keyArgumentCaptor = ArgumentCaptor.forClass(KafkaKey.class); + ArgumentCaptor kmeArgumentCaptor = ArgumentCaptor.forClass(KafkaMessageEnvelope.class); + verify(mockedProducer, atLeast(2)) + .sendMessage(any(), any(), keyArgumentCaptor.capture(), kmeArgumentCaptor.capture(), any(), any()); + KeyWithChunkingSuffixSerializer keyWithChunkingSuffixSerializer = new KeyWithChunkingSuffixSerializer(); + byte[] serializedKey = serializer.serialize(testTopic, Integer.toString(1)); + byte[] serializedValue = serializer.serialize(testTopic, valueString); + byte[] serializedRmd = replicationMetadata.array(); + int availableMessageSize = DEFAULT_MAX_SIZE_FOR_USER_PAYLOAD_PER_MESSAGE_IN_BYTES - serializedKey.length; + + // The order should be SOS, valueChunk1, valueChunk2, replicationMetadataChunk1, manifest for value and RMD. + Assert.assertEquals(kmeArgumentCaptor.getAllValues().size(), 5); + + // Verify value of the 1st chunk. + KafkaMessageEnvelope actualValue1 = kmeArgumentCaptor.getAllValues().get(1); + Assert.assertEquals(actualValue1.messageType, MessageType.PUT.getValue()); + Assert.assertEquals(((Put) actualValue1.payloadUnion).schemaId, -10); + Assert.assertEquals(((Put) actualValue1.payloadUnion).replicationMetadataVersionId, -1); + Assert.assertEquals(((Put) actualValue1.payloadUnion).replicationMetadataPayload, ByteBuffer.allocate(0)); + Assert.assertEquals(((Put) actualValue1.payloadUnion).putValue.array().length, availableMessageSize + 4); + Assert.assertEquals(actualValue1.producerMetadata.logicalTimestamp, VENICE_DEFAULT_LOGICAL_TS); + + // Verify value of the 2nd chunk. + KafkaMessageEnvelope actualValue2 = kmeArgumentCaptor.getAllValues().get(2); + Assert.assertEquals(actualValue2.messageType, MessageType.PUT.getValue()); + Assert.assertEquals(((Put) actualValue2.payloadUnion).schemaId, -10); + Assert.assertEquals(((Put) actualValue2.payloadUnion).replicationMetadataVersionId, -1); + Assert.assertEquals(((Put) actualValue2.payloadUnion).replicationMetadataPayload, ByteBuffer.allocate(0)); + Assert.assertEquals( + ((Put) actualValue2.payloadUnion).putValue.array().length, + (serializedValue.length - availableMessageSize) + 4); + Assert.assertEquals(actualValue2.producerMetadata.logicalTimestamp, VENICE_DEFAULT_LOGICAL_TS); + + ChunkedValueManifestSerializer chunkedValueManifestSerializer = new ChunkedValueManifestSerializer(true); + + final ChunkedValueManifest chunkedValueManifest = new ChunkedValueManifest(); + chunkedValueManifest.schemaId = 1; + chunkedValueManifest.keysWithChunkIdSuffix = new ArrayList<>(2); + chunkedValueManifest.size = serializedValue.length; + + // Verify key of the 1st value chunk. + ChunkedKeySuffix chunkedKeySuffix = new ChunkedKeySuffix(); + chunkedKeySuffix.isChunk = true; + chunkedKeySuffix.chunkId = new ChunkId(); + chunkedKeySuffix.chunkId.chunkIndex = 0; + ProducerMetadata producerMetadata = actualValue1.producerMetadata; + chunkedKeySuffix.chunkId.producerGUID = producerMetadata.producerGUID; + chunkedKeySuffix.chunkId.segmentNumber = producerMetadata.segmentNumber; + chunkedKeySuffix.chunkId.messageSequenceNumber = producerMetadata.messageSequenceNumber; + + ByteBuffer keyWithSuffix = keyWithChunkingSuffixSerializer.serializeChunkedKey(serializedKey, chunkedKeySuffix); + chunkedValueManifest.keysWithChunkIdSuffix.add(keyWithSuffix); + KafkaKey expectedKey1 = new KafkaKey(MessageType.PUT, keyWithSuffix.array()); + KafkaKey actualKey1 = keyArgumentCaptor.getAllValues().get(1); + Assert.assertEquals(actualKey1.getKey(), expectedKey1.getKey()); + + // Verify key of the 2nd value chunk. + chunkedKeySuffix.chunkId.chunkIndex = 1; + keyWithSuffix = keyWithChunkingSuffixSerializer.serializeChunkedKey(serializedKey, chunkedKeySuffix); + chunkedValueManifest.keysWithChunkIdSuffix.add(keyWithSuffix); + KafkaKey expectedKey2 = new KafkaKey(MessageType.PUT, keyWithSuffix.array()); + KafkaKey actualKey2 = keyArgumentCaptor.getAllValues().get(2); + Assert.assertEquals(actualKey2.getKey(), expectedKey2.getKey()); + + // Check value of the 1st RMD chunk. + KafkaMessageEnvelope actualValue3 = kmeArgumentCaptor.getAllValues().get(3); + Assert.assertEquals(actualValue3.messageType, MessageType.PUT.getValue()); + Assert.assertEquals(((Put) actualValue3.payloadUnion).schemaId, -10); + Assert.assertEquals(((Put) actualValue3.payloadUnion).replicationMetadataVersionId, -1); + Assert.assertEquals(((Put) actualValue3.payloadUnion).putValue, ByteBuffer.allocate(0)); + Assert.assertEquals( + ((Put) actualValue3.payloadUnion).replicationMetadataPayload.array().length, + serializedRmd.length + 4); + Assert.assertEquals(actualValue3.producerMetadata.logicalTimestamp, VENICE_DEFAULT_LOGICAL_TS); + + // Check key of the 1st RMD chunk. + ChunkedValueManifest chunkedRmdManifest = new ChunkedValueManifest(); + chunkedRmdManifest.schemaId = 1; + chunkedRmdManifest.keysWithChunkIdSuffix = new ArrayList<>(1); + chunkedRmdManifest.size = serializedRmd.length; + chunkedKeySuffix = new ChunkedKeySuffix(); + chunkedKeySuffix.isChunk = true; + chunkedKeySuffix.chunkId = new ChunkId(); + producerMetadata = actualValue3.producerMetadata; + chunkedKeySuffix.chunkId.producerGUID = producerMetadata.producerGUID; + chunkedKeySuffix.chunkId.segmentNumber = producerMetadata.segmentNumber; + chunkedKeySuffix.chunkId.messageSequenceNumber = producerMetadata.messageSequenceNumber; + // The chunkIndex of the first RMD should be the number of value chunks so that key space of value chunk and RMD + // chunk will not collide. + chunkedKeySuffix.chunkId.chunkIndex = 2; + keyWithSuffix = keyWithChunkingSuffixSerializer.serializeChunkedKey(serializedKey, chunkedKeySuffix); + chunkedRmdManifest.keysWithChunkIdSuffix.add(keyWithSuffix); + KafkaKey expectedKey3 = new KafkaKey(MessageType.PUT, keyWithSuffix.array()); + KafkaKey actualKey3 = keyArgumentCaptor.getAllValues().get(3); + Assert.assertEquals(actualKey3.getKey(), expectedKey3.getKey()); + + // Check key of the manifest. + byte[] topLevelKey = keyWithChunkingSuffixSerializer.serializeNonChunkedKey(serializedKey); + KafkaKey expectedKey4 = new KafkaKey(MessageType.PUT, topLevelKey); + KafkaKey actualKey4 = keyArgumentCaptor.getAllValues().get(4); + Assert.assertEquals(actualKey4.getKey(), expectedKey4.getKey()); + + // Check manifest for both value and rmd. + KafkaMessageEnvelope actualValue4 = kmeArgumentCaptor.getAllValues().get(4); + Assert.assertEquals(actualValue4.messageType, MessageType.PUT.getValue()); + Assert.assertEquals( + ((Put) actualValue4.payloadUnion).schemaId, + AvroProtocolDefinition.CHUNKED_VALUE_MANIFEST.getCurrentProtocolVersion()); + Assert.assertEquals(((Put) actualValue4.payloadUnion).replicationMetadataVersionId, putMetadata.getRmdVersionId()); + Assert.assertEquals( + ((Put) actualValue4.payloadUnion).replicationMetadataPayload, + ByteBuffer.wrap(chunkedValueManifestSerializer.serialize(testTopic, chunkedRmdManifest))); + Assert.assertEquals( + ((Put) actualValue4.payloadUnion).putValue, + ByteBuffer.wrap(chunkedValueManifestSerializer.serialize(testTopic, chunkedValueManifest))); + Assert.assertEquals(actualValue4.producerMetadata.logicalTimestamp, APP_DEFAULT_LOGICAL_TS); + + } + + @Test + public void testReplicationMetadataWrittenCorrectly() + throws InterruptedException, ExecutionException, TimeoutException { + PubSubProducerAdapter mockedProducer = mock(PubSubProducerAdapter.class); + Future mockedFuture = mock(Future.class); + when(mockedProducer.getNumberOfPartitions(any())).thenReturn(1); + when(mockedProducer.getNumberOfPartitions(any(), anyInt(), any())).thenReturn(1); + when(mockedProducer.sendMessage(any(), any(), any(), any(), any(), any())).thenReturn(mockedFuture); + Properties writerProperties = new Properties(); + String stringSchema = "\"string\""; + VeniceKafkaSerializer serializer = new VeniceAvroKafkaSerializer(stringSchema); + String testTopic = "test"; + VeniceWriterOptions veniceWriterOptions = new VeniceWriterOptions.Builder(testTopic).setKeySerializer(serializer) + .setValueSerializer(serializer) + .setWriteComputeSerializer(serializer) + .setPartitioner(new DefaultVenicePartitioner()) + .setTime(SystemTime.INSTANCE) + .build(); + VeniceWriter writer = + new VeniceWriter(veniceWriterOptions, new VeniceProperties(writerProperties), mockedProducer); + + // verify the new veniceWriter API's are able to encode the A/A metadat info correctly. + long ctime = System.currentTimeMillis(); + ByteBuffer replicationMetadata = ByteBuffer.wrap(new byte[] { 0xa, 0xb }); + PutMetadata putMetadata = new PutMetadata(1, replicationMetadata); + DeleteMetadata deleteMetadata = new DeleteMetadata(1, 1, replicationMetadata); + + writer.put( + Integer.toString(1), + Integer.toString(1), + 1, + null, + VeniceWriter.DEFAULT_LEADER_METADATA_WRAPPER, + ctime, + null); + writer.put( + Integer.toString(2), + Integer.toString(2), + 1, + null, + VeniceWriter.DEFAULT_LEADER_METADATA_WRAPPER, + APP_DEFAULT_LOGICAL_TS, + putMetadata); + writer.update(Integer.toString(3), Integer.toString(2), 1, 1, null, ctime); + writer.delete(Integer.toString(4), null, VeniceWriter.DEFAULT_LEADER_METADATA_WRAPPER, ctime); + writer.delete(Integer.toString(5), null, VeniceWriter.DEFAULT_LEADER_METADATA_WRAPPER, deleteMetadata); + writer.put(Integer.toString(6), Integer.toString(1), 1, null, VeniceWriter.DEFAULT_LEADER_METADATA_WRAPPER); + + ArgumentCaptor kmeArgumentCaptor = ArgumentCaptor.forClass(KafkaMessageEnvelope.class); + verify(mockedProducer, atLeast(2)).sendMessage(any(), any(), any(), kmeArgumentCaptor.capture(), any(), any()); + + // first one will be control message SOS, there should not be any aa metadata. + KafkaMessageEnvelope value0 = kmeArgumentCaptor.getAllValues().get(0); + Assert.assertEquals(value0.producerMetadata.logicalTimestamp, VENICE_DEFAULT_LOGICAL_TS); + + // verify timestamp is encoded correctly. + KafkaMessageEnvelope value1 = kmeArgumentCaptor.getAllValues().get(1); + KafkaMessageEnvelope value3 = kmeArgumentCaptor.getAllValues().get(3); + KafkaMessageEnvelope value4 = kmeArgumentCaptor.getAllValues().get(4); + for (KafkaMessageEnvelope kme: Arrays.asList(value1, value3, value4)) { + Assert.assertEquals(kme.producerMetadata.logicalTimestamp, ctime); + } + + // verify default values for replicationMetadata are written correctly + Put put = (Put) value1.payloadUnion; + Assert.assertEquals(put.schemaId, 1); + Assert.assertEquals(put.replicationMetadataVersionId, VeniceWriter.VENICE_DEFAULT_TIMESTAMP_METADATA_VERSION_ID); + Assert.assertEquals(put.replicationMetadataPayload, ByteBuffer.wrap(new byte[0])); + + Delete delete = (Delete) value4.payloadUnion; + Assert.assertEquals(delete.schemaId, VeniceWriter.VENICE_DEFAULT_VALUE_SCHEMA_ID); + Assert.assertEquals(delete.replicationMetadataVersionId, VeniceWriter.VENICE_DEFAULT_TIMESTAMP_METADATA_VERSION_ID); + Assert.assertEquals(delete.replicationMetadataPayload, ByteBuffer.wrap(new byte[0])); + + // verify replicationMetadata is encoded correctly for Put. + KafkaMessageEnvelope value2 = kmeArgumentCaptor.getAllValues().get(2); + Assert.assertEquals(value2.messageType, MessageType.PUT.getValue()); + put = (Put) value2.payloadUnion; + Assert.assertEquals(put.schemaId, 1); + Assert.assertEquals(put.replicationMetadataVersionId, 1); + Assert.assertEquals(put.replicationMetadataPayload, ByteBuffer.wrap(new byte[] { 0xa, 0xb })); + Assert.assertEquals(value2.producerMetadata.logicalTimestamp, APP_DEFAULT_LOGICAL_TS); + + // verify replicationMetadata is encoded correctly for Delete. + KafkaMessageEnvelope value5 = kmeArgumentCaptor.getAllValues().get(5); + Assert.assertEquals(value5.messageType, MessageType.DELETE.getValue()); + delete = (Delete) value5.payloadUnion; + Assert.assertEquals(delete.schemaId, 1); + Assert.assertEquals(delete.replicationMetadataVersionId, 1); + Assert.assertEquals(delete.replicationMetadataPayload, ByteBuffer.wrap(new byte[] { 0xa, 0xb })); + Assert.assertEquals(value5.producerMetadata.logicalTimestamp, APP_DEFAULT_LOGICAL_TS); + + // verify default logical_ts is encoded correctly + KafkaMessageEnvelope value6 = kmeArgumentCaptor.getAllValues().get(6); + Assert.assertEquals(value6.messageType, MessageType.PUT.getValue()); + Assert.assertEquals(value6.producerMetadata.logicalTimestamp, APP_DEFAULT_LOGICAL_TS); + } + + @Test + public void testCloseSegmentBasedOnElapsedTime() throws InterruptedException, ExecutionException, TimeoutException { + PubSubProducerAdapter mockedProducer = mock(PubSubProducerAdapter.class); + Future mockedFuture = mock(Future.class); + when(mockedProducer.getNumberOfPartitions(any())).thenReturn(1); + when(mockedProducer.getNumberOfPartitions(any(), anyInt(), any())).thenReturn(1); + when(mockedProducer.sendMessage(any(), any(), any(), any(), any(), any())).thenReturn(mockedFuture); + Properties writerProperties = new Properties(); + writerProperties.put(VeniceWriter.MAX_ELAPSED_TIME_FOR_SEGMENT_IN_MS, 0); + String stringSchema = "\"string\""; + VeniceKafkaSerializer serializer = new VeniceAvroKafkaSerializer(stringSchema); + String testTopic = "test"; + VeniceWriterOptions veniceWriterOptions = new VeniceWriterOptions.Builder(testTopic).setKeySerializer(serializer) + .setValueSerializer(serializer) + .setWriteComputeSerializer(serializer) + .setPartitioner(new DefaultVenicePartitioner()) + .setTime(SystemTime.INSTANCE) + .build(); + VeniceWriter writer = + new VeniceWriter(veniceWriterOptions, new VeniceProperties(writerProperties), mockedProducer); + for (int i = 0; i < 1000; i++) { + writer.put(Integer.toString(i), Integer.toString(i), 1, null); + } + ArgumentCaptor kmeArgumentCaptor = ArgumentCaptor.forClass(KafkaMessageEnvelope.class); + verify(mockedProducer, atLeast(1000)).sendMessage(any(), any(), any(), kmeArgumentCaptor.capture(), any(), any()); + int segmentNumber = -1; + for (KafkaMessageEnvelope envelope: kmeArgumentCaptor.getAllValues()) { + if (segmentNumber == -1) { + segmentNumber = envelope.producerMetadata.segmentNumber; + } else { + // Segment number should not change since we disabled closing segment based on elapsed time. + Assert.assertEquals(envelope.producerMetadata.segmentNumber, segmentNumber); + } + } + } + } diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/PartialUpdateTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/PartialUpdateTest.java index c5c40ea9b6..8d1000aac6 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/PartialUpdateTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/PartialUpdateTest.java @@ -35,6 +35,7 @@ import com.linkedin.davinci.replication.RmdWithValueSchemaId; import com.linkedin.davinci.replication.merge.RmdSerDe; import com.linkedin.davinci.replication.merge.StringAnnotatedStoreSchemaCache; +import com.linkedin.davinci.storage.chunking.ChunkingUtils; import com.linkedin.davinci.storage.chunking.SingleGetChunkingAdapter; import com.linkedin.davinci.store.AbstractStorageEngine; import com.linkedin.davinci.store.record.ValueRecord; @@ -66,6 +67,9 @@ import com.linkedin.venice.schema.rmd.RmdSchemaGenerator; import com.linkedin.venice.schema.writecompute.DerivedSchemaEntry; import com.linkedin.venice.schema.writecompute.WriteComputeSchemaConverter; +import com.linkedin.venice.serialization.avro.AvroProtocolDefinition; +import com.linkedin.venice.serialization.avro.ChunkedValueManifestSerializer; +import com.linkedin.venice.storage.protocol.ChunkedValueManifest; import com.linkedin.venice.utils.DataProviderUtils; import com.linkedin.venice.utils.IntegrationTestPushUtils; import com.linkedin.venice.utils.TestUtils; @@ -78,6 +82,7 @@ import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -87,6 +92,7 @@ import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; import java.util.function.Consumer; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; @@ -113,6 +119,9 @@ public class PartialUpdateTest { private static final int TEST_TIMEOUT_MS = 120_000; private static final String CLUSTER_NAME = "venice-cluster0"; + private static final ChunkedValueManifestSerializer CHUNKED_VALUE_MANIFEST_SERIALIZER = + new ChunkedValueManifestSerializer(false); + private VeniceTwoLayerMultiRegionMultiClusterWrapper multiRegionMultiClusterWrapper; private VeniceControllerWrapper parentController; private List childDatacenters; @@ -338,6 +347,133 @@ public void testPartialUpdateOnBatchPushedKeys() throws IOException { } } + @Test(timeOut = TEST_TIMEOUT_MS * 4) + public void testNonAAPartialUpdateChunkDeletion() throws IOException { + final String storeName = Utils.getUniqueString("partialUpdateChunking"); + String parentControllerUrl = parentController.getControllerUrl(); + String keySchemaStr = "{\"type\" : \"string\"}"; + Schema valueSchema = AvroCompatibilityHelper.parse(loadFileAsString("CollectionRecordV1.avsc")); + Schema partialUpdateSchema = WriteComputeSchemaConverter.getInstance().convertFromValueRecordSchema(valueSchema); + ReadOnlySchemaRepository schemaRepo = mock(ReadOnlySchemaRepository.class); + when(schemaRepo.getDerivedSchema(storeName, 1, 1)).thenReturn(new DerivedSchemaEntry(1, 1, partialUpdateSchema)); + when(schemaRepo.getValueSchema(storeName, 1)).thenReturn(new SchemaEntry(1, valueSchema)); + + try (ControllerClient parentControllerClient = new ControllerClient(CLUSTER_NAME, parentControllerUrl)) { + assertCommand( + parentControllerClient.createNewStore(storeName, "test_owner", keySchemaStr, valueSchema.toString())); + UpdateStoreQueryParams updateStoreParams = + new UpdateStoreQueryParams().setStorageQuotaInByte(Store.UNLIMITED_STORAGE_QUOTA) + .setCompressionStrategy(CompressionStrategy.NO_OP) + .setWriteComputationEnabled(true) + .setActiveActiveReplicationEnabled(false) + .setChunkingEnabled(true) + .setRmdChunkingEnabled(false) + .setHybridRewindSeconds(10L) + .setHybridOffsetLagThreshold(2L); + ControllerResponse updateStoreResponse = + parentControllerClient.retryableRequest(5, c -> c.updateStore(storeName, updateStoreParams)); + assertFalse(updateStoreResponse.isError(), "Update store got error: " + updateStoreResponse.getError()); + + VersionCreationResponse response = parentControllerClient.emptyPush(storeName, "test_push_id", 1000); + assertEquals(response.getVersion(), 1); + assertFalse(response.isError(), "Empty push to parent colo should succeed"); + TestUtils.waitForNonDeterministicPushCompletion( + Version.composeKafkaTopic(storeName, 1), + parentControllerClient, + 30, + TimeUnit.SECONDS); + } + + VeniceClusterWrapper veniceCluster = childDatacenters.get(0).getClusters().get(CLUSTER_NAME); + SystemProducer veniceProducer = getSamzaProducer(veniceCluster, storeName, Version.PushType.STREAM); + + String key = "key1"; + String primitiveFieldName = "name"; + String mapFieldName = "stringMap"; + + // Insert large amount of Map entries to trigger RMD chunking. + int oldUpdateCount = 29; + int singleUpdateEntryCount = 10000; + try (AvroGenericStoreClient storeReader = ClientFactory.getAndStartGenericAvroClient( + ClientConfig.defaultGenericClientConfig(storeName).setVeniceURL(veniceCluster.getRandomRouterURL()))) { + for (int i = 0; i < oldUpdateCount; i++) { + producePartialUpdate( + storeName, + veniceProducer, + partialUpdateSchema, + key, + primitiveFieldName, + mapFieldName, + singleUpdateEntryCount, + i); + } + // Verify the value record has been partially updated. + TestUtils.waitForNonDeterministicAssertion(TEST_TIMEOUT_MS * 2, TimeUnit.MILLISECONDS, true, () -> { + try { + GenericRecord valueRecord = readValue(storeReader, key); + boolean nullRecord = (valueRecord == null); + assertFalse(nullRecord); + assertEquals(valueRecord.get(primitiveFieldName).toString(), "Tottenham"); // Updated field + Map mapFieldResult = new HashMap<>(); + ((Map) valueRecord.get(mapFieldName)) + .forEach((x, y) -> mapFieldResult.put(x.toString(), y.toString())); + assertEquals(mapFieldResult.size(), oldUpdateCount * singleUpdateEntryCount); + } catch (Exception e) { + throw new VeniceException(e); + } + }); + + String kafkaTopic_v1 = Version.composeKafkaTopic(storeName, 1); + validateValueChunks(kafkaTopic_v1, key, Assert::assertNotNull); + VeniceServerWrapper serverWrapper = multiRegionMultiClusterWrapper.getChildRegions() + .get(0) + .getClusters() + .get("venice-cluster0") + .getVeniceServers() + .get(0); + AbstractStorageEngine storageEngine = + serverWrapper.getVeniceServer().getStorageService().getStorageEngine(kafkaTopic_v1); + ChunkedValueManifest valueManifest = getChunkValueManifest(storageEngine, 0, key, false); + + int updateCount = 30; + producePartialUpdate( + storeName, + veniceProducer, + partialUpdateSchema, + key, + primitiveFieldName, + mapFieldName, + singleUpdateEntryCount, + updateCount - 1); + + // Verify the value record has been partially updated. + TestUtils.waitForNonDeterministicAssertion(TEST_TIMEOUT_MS * 2, TimeUnit.MILLISECONDS, true, () -> { + try { + GenericRecord valueRecord = readValue(storeReader, key); + boolean nullRecord = (valueRecord == null); + assertFalse(nullRecord); + assertEquals(valueRecord.get(primitiveFieldName).toString(), "Tottenham"); // Updated field + Map mapFieldResult = new HashMap<>(); + ((Map) valueRecord.get(mapFieldName)) + .forEach((x, y) -> mapFieldResult.put(x.toString(), y.toString())); + assertEquals(mapFieldResult.size(), updateCount * singleUpdateEntryCount); + } catch (Exception e) { + throw new VeniceException(e); + } + }); + + TestUtils.waitForNonDeterministicAssertion(TEST_TIMEOUT_MS, TimeUnit.MILLISECONDS, true, () -> { + Assert.assertNotNull(valueManifest); + validateChunksFromManifests(kafkaTopic_v1, 0, valueManifest, null, (valueChunkBytes, rmdChunkBytes) -> { + Assert.assertNull(valueChunkBytes); + }, false); + }); + } finally { + veniceProducer.stop(); + } + + } + /** * This integration test performs a few actions to test RMD chunking logic: * (1) Send a bunch of large UPDATE messages to make sure eventually the key's value + RMD size greater than 1MB and @@ -353,10 +489,10 @@ public void testReplicationMetadataChunkingE2E() throws IOException { String keySchemaStr = "{\"type\" : \"string\"}"; Schema valueSchema = AvroCompatibilityHelper.parse(loadFileAsString("CollectionRecordV1.avsc")); Schema rmdSchema = RmdSchemaGenerator.generateMetadataSchema(valueSchema); - Schema writeComputeSchema = WriteComputeSchemaConverter.getInstance().convertFromValueRecordSchema(valueSchema); + Schema partialUpdateSchema = WriteComputeSchemaConverter.getInstance().convertFromValueRecordSchema(valueSchema); ReadOnlySchemaRepository schemaRepo = mock(ReadOnlySchemaRepository.class); when(schemaRepo.getReplicationMetadataSchema(storeName, 1, 1)).thenReturn(new RmdSchemaEntry(1, 1, rmdSchema)); - when(schemaRepo.getDerivedSchema(storeName, 1, 1)).thenReturn(new DerivedSchemaEntry(1, 1, writeComputeSchema)); + when(schemaRepo.getDerivedSchema(storeName, 1, 1)).thenReturn(new DerivedSchemaEntry(1, 1, partialUpdateSchema)); when(schemaRepo.getValueSchema(storeName, 1)).thenReturn(new SchemaEntry(1, valueSchema)); StringAnnotatedStoreSchemaCache stringAnnotatedStoreSchemaCache = new StringAnnotatedStoreSchemaCache(storeName, schemaRepo); @@ -398,23 +534,60 @@ public void testReplicationMetadataChunkingE2E() throws IOException { String mapFieldName = "stringMap"; // Insert large amount of Map entries to trigger RMD chunking. - int updateCount = 30; + int oldUpdateCount = 29; int singleUpdateEntryCount = 10000; try (AvroGenericStoreClient storeReader = ClientFactory.getAndStartGenericAvroClient( ClientConfig.defaultGenericClientConfig(storeName).setVeniceURL(veniceCluster.getRandomRouterURL()))) { - Map newEntries = new HashMap<>(); - for (int i = 0; i < updateCount; i++) { - UpdateBuilder updateBuilder = new UpdateBuilderImpl(writeComputeSchema); - updateBuilder.setNewFieldValue(primitiveFieldName, "Tottenham"); - newEntries.clear(); - for (int j = 0; j < singleUpdateEntryCount; j++) { - String idx = String.valueOf(i * singleUpdateEntryCount + j); - newEntries.put("key_" + idx, "value_" + idx); - } - updateBuilder.setEntriesToAddToMapField(mapFieldName, newEntries); - GenericRecord partialUpdateRecord = updateBuilder.build(); - sendStreamingRecord(veniceProducer, storeName, key, partialUpdateRecord, i * 10L + 1); + for (int i = 0; i < oldUpdateCount; i++) { + producePartialUpdate( + storeName, + veniceProducer, + partialUpdateSchema, + key, + primitiveFieldName, + mapFieldName, + singleUpdateEntryCount, + i); } + // Verify the value record has been partially updated. + TestUtils.waitForNonDeterministicAssertion(TEST_TIMEOUT_MS * 2, TimeUnit.MILLISECONDS, true, () -> { + try { + GenericRecord valueRecord = readValue(storeReader, key); + boolean nullRecord = (valueRecord == null); + assertFalse(nullRecord); + assertEquals(valueRecord.get(primitiveFieldName).toString(), "Tottenham"); // Updated field + Map mapFieldResult = new HashMap<>(); + ((Map) valueRecord.get(mapFieldName)) + .forEach((x, y) -> mapFieldResult.put(x.toString(), y.toString())); + assertEquals(mapFieldResult.size(), oldUpdateCount * singleUpdateEntryCount); + } catch (Exception e) { + throw new VeniceException(e); + } + }); + + String kafkaTopic_v1 = Version.composeKafkaTopic(storeName, 1); + validateValueChunks(kafkaTopic_v1, key, Assert::assertNotNull); + VeniceServerWrapper serverWrapper = multiRegionMultiClusterWrapper.getChildRegions() + .get(0) + .getClusters() + .get("venice-cluster0") + .getVeniceServers() + .get(0); + AbstractStorageEngine storageEngine = + serverWrapper.getVeniceServer().getStorageService().getStorageEngine(kafkaTopic_v1); + ChunkedValueManifest valueManifest = getChunkValueManifest(storageEngine, 0, key, false); + ChunkedValueManifest rmdManifest = getChunkValueManifest(storageEngine, 0, key, true); + + int updateCount = 30; + producePartialUpdate( + storeName, + veniceProducer, + partialUpdateSchema, + key, + primitiveFieldName, + mapFieldName, + singleUpdateEntryCount, + updateCount - 1); // Verify the value record has been partially updated. TestUtils.waitForNonDeterministicAssertion(TEST_TIMEOUT_MS * 2, TimeUnit.MILLISECONDS, true, () -> { @@ -432,15 +605,24 @@ public void testReplicationMetadataChunkingE2E() throws IOException { } }); // Validate RMD bytes after PUT requests. - String kafkaTopic = Version.composeKafkaTopic(storeName, 1); - validateRmdData(rmdSerDe, kafkaTopic, key, rmdWithValueSchemaId -> { + validateRmdData(rmdSerDe, kafkaTopic_v1, key, rmdWithValueSchemaId -> { GenericRecord timestampRecord = (GenericRecord) rmdWithValueSchemaId.getRmdRecord().get("timestamp"); GenericRecord stringMapTimestampRecord = (GenericRecord) timestampRecord.get("stringMap"); List activeElementsTimestamps = (List) stringMapTimestampRecord.get("activeElementsTimestamps"); assertEquals(activeElementsTimestamps.size(), updateCount * singleUpdateEntryCount); }); - // Perform one time repush to make sure repush can handle RMD chunks data correctly. + TestUtils.waitForNonDeterministicAssertion(TEST_TIMEOUT_MS, TimeUnit.MILLISECONDS, true, () -> { + Assert.assertNotNull(valueManifest); + Assert.assertNotNull(rmdManifest); + validateChunksFromManifests(kafkaTopic_v1, 0, valueManifest, rmdManifest, (valueChunkBytes, rmdChunkBytes) -> { + Assert.assertNull(valueChunkBytes); + Assert.assertNotNull(rmdChunkBytes); + Assert.assertEquals(rmdChunkBytes.length, 4); + }, true); + }); + + // Properties props = IntegrationTestPushUtils.defaultVPJProps(multiRegionMultiClusterWrapper, "dummyInputPath", storeName); props.setProperty(SOURCE_KAFKA, "true"); @@ -474,8 +656,8 @@ public void testReplicationMetadataChunkingE2E() throws IOException { }); // Validate RMD bytes after PUT requests. - kafkaTopic = Version.composeKafkaTopic(storeName, 2); - validateRmdData(rmdSerDe, kafkaTopic, key, rmdWithValueSchemaId -> { + String kafkaTopic_v2 = Version.composeKafkaTopic(storeName, 2); + validateRmdData(rmdSerDe, kafkaTopic_v2, key, rmdWithValueSchemaId -> { GenericRecord timestampRecord = (GenericRecord) rmdWithValueSchemaId.getRmdRecord().get("timestamp"); GenericRecord stringMapTimestampRecord = (GenericRecord) timestampRecord.get("stringMap"); List activeElementsTimestamps = (List) stringMapTimestampRecord.get("activeElementsTimestamps"); @@ -496,7 +678,7 @@ public void testReplicationMetadataChunkingE2E() throws IOException { assertEquals(mapFieldResult.size(), singleUpdateEntryCount); }); - validateRmdData(rmdSerDe, kafkaTopic, key, rmdWithValueSchemaId -> { + validateRmdData(rmdSerDe, kafkaTopic_v2, key, rmdWithValueSchemaId -> { GenericRecord timestampRecord = (GenericRecord) rmdWithValueSchemaId.getRmdRecord().get("timestamp"); GenericRecord stringMapTimestampRecord = (GenericRecord) timestampRecord.get("stringMap"); List activeElementsTimestamps = (List) stringMapTimestampRecord.get("activeElementsTimestamps"); @@ -512,7 +694,7 @@ public void testReplicationMetadataChunkingE2E() throws IOException { boolean nullRecord = (valueRecord == null); assertTrue(nullRecord); }); - validateRmdData(rmdSerDe, kafkaTopic, key, rmdWithValueSchemaId -> { + validateRmdData(rmdSerDe, kafkaTopic_v2, key, rmdWithValueSchemaId -> { Assert.assertTrue( rmdWithValueSchemaId.getRmdRecord().get(RmdConstants.TIMESTAMP_FIELD_NAME) instanceof GenericRecord); GenericRecord timestampRecord = @@ -539,12 +721,13 @@ private void validateRmdData( serverWrapper.getVeniceServer().getStorageService().getStorageEngine(kafkaTopic); assertNotNull(storageEngine); ValueRecord result = SingleGetChunkingAdapter - .getReplicationMetadata(storageEngine, 0, serializeStringKeyToByteArray(key), true, null); + .getReplicationMetadata(storageEngine, 0, serializeStringKeyToByteArray(key), true, null, null); // Avoid assertion failure logging massive RMD record. boolean nullRmd = (result == null); assertFalse(nullRmd); byte[] value = result.serialize(); - RmdWithValueSchemaId rmdWithValueSchemaId = rmdSerDe.deserializeValueSchemaIdPrependedRmdBytes(value); + RmdWithValueSchemaId rmdWithValueSchemaId = new RmdWithValueSchemaId(); + rmdSerDe.deserializeValueSchemaIdPrependedRmdBytes(value, rmdWithValueSchemaId); rmdDataValidationFlow.accept(rmdWithValueSchemaId); } } @@ -1013,4 +1196,102 @@ private byte[] serializeStringKeyToByteArray(String key) { } return out.toByteArray(); } + + private void producePartialUpdate( + String storeName, + SystemProducer veniceProducer, + Schema partialUpdateSchema, + String key, + String primitiveFieldName, + String mapFieldName, + int singleUpdateEntryCount, + int updateCount) { + UpdateBuilder updateBuilder = new UpdateBuilderImpl(partialUpdateSchema); + updateBuilder.setNewFieldValue(primitiveFieldName, "Tottenham"); + Map newEntries = new HashMap<>(); + for (int j = 0; j < singleUpdateEntryCount; j++) { + String idx = String.valueOf(updateCount * singleUpdateEntryCount + j); + newEntries.put("key_" + idx, "value_" + idx); + } + updateBuilder.setEntriesToAddToMapField(mapFieldName, newEntries); + GenericRecord partialUpdateRecord = updateBuilder.build(); + sendStreamingRecord(veniceProducer, storeName, key, partialUpdateRecord, updateCount * 10L + 1); + } + + private void validateValueChunks(String kafkaTopic, String key, Consumer validationFlow) { + for (VeniceServerWrapper serverWrapper: multiRegionMultiClusterWrapper.getChildRegions() + .get(0) + .getClusters() + .get("venice-cluster0") + .getVeniceServers()) { + AbstractStorageEngine storageEngine = + serverWrapper.getVeniceServer().getStorageService().getStorageEngine(kafkaTopic); + assertNotNull(storageEngine); + + ChunkedValueManifest manifest = getChunkValueManifest(storageEngine, 0, key, false); + Assert.assertNotNull(manifest); + + for (ByteBuffer chunkedKey: manifest.keysWithChunkIdSuffix) { + byte[] chunkValueBytes = storageEngine.get(0, chunkedKey.array()); + validationFlow.accept(chunkValueBytes); + } + } + } + + private void validateChunksFromManifests( + String kafkaTopic, + int partition, + ChunkedValueManifest valueManifest, + ChunkedValueManifest rmdManifest, + BiConsumer validationFlow, + boolean isAAEnabled) { + for (VeniceServerWrapper serverWrapper: multiRegionMultiClusterWrapper.getChildRegions() + .get(0) + .getClusters() + .get("venice-cluster0") + .getVeniceServers()) { + AbstractStorageEngine storageEngine = + serverWrapper.getVeniceServer().getStorageService().getStorageEngine(kafkaTopic); + assertNotNull(storageEngine); + + validateChunkDataFromManifest(storageEngine, partition, valueManifest, validationFlow, isAAEnabled); + validateChunkDataFromManifest(storageEngine, partition, rmdManifest, validationFlow, isAAEnabled); + } + } + + private ChunkedValueManifest getChunkValueManifest( + AbstractStorageEngine storageEngine, + int partition, + String key, + boolean isRmd) { + byte[] serializedKeyBytes = + ChunkingUtils.KEY_WITH_CHUNKING_SUFFIX_SERIALIZER.serializeNonChunkedKey(serializeStringKeyToByteArray(key)); + byte[] manifestValueBytes = isRmd + ? storageEngine.getReplicationMetadata(partition, serializedKeyBytes) + : storageEngine.get(partition, serializedKeyBytes); + if (manifestValueBytes == null) { + return null; + } + int schemaId = ValueRecord.parseSchemaId(manifestValueBytes); + Assert.assertEquals(schemaId, AvroProtocolDefinition.CHUNKED_VALUE_MANIFEST.getCurrentProtocolVersion()); + return CHUNKED_VALUE_MANIFEST_SERIALIZER.deserialize(manifestValueBytes, schemaId); + } + + private void validateChunkDataFromManifest( + AbstractStorageEngine storageEngine, + int partition, + ChunkedValueManifest manifest, + BiConsumer validationFlow, + boolean isAAEnabled) { + if (manifest == null) { + return; + } + for (int i = 0; i < manifest.keysWithChunkIdSuffix.size(); i++) { + byte[] chunkKeyBytes = manifest.keysWithChunkIdSuffix.get(i).array(); + byte[] valueBytes = storageEngine.get(partition, chunkKeyBytes); + byte[] rmdBytes = isAAEnabled ? storageEngine.getReplicationMetadata(partition, chunkKeyBytes) : null; + validationFlow.accept(valueBytes, rmdBytes); + } + } + } diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/writer/VeniceWriterTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/writer/VeniceWriterTest.java index b2ffbbbb0a..1c9f8ce377 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/writer/VeniceWriterTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/writer/VeniceWriterTest.java @@ -1,24 +1,13 @@ package com.linkedin.venice.writer; import static com.linkedin.venice.kafka.TopicManager.DEFAULT_KAFKA_OPERATION_TIMEOUT_MS; -import static com.linkedin.venice.writer.VeniceWriter.APP_DEFAULT_LOGICAL_TS; -import static com.linkedin.venice.writer.VeniceWriter.DEFAULT_MAX_SIZE_FOR_USER_PAYLOAD_PER_MESSAGE_IN_BYTES; -import static com.linkedin.venice.writer.VeniceWriter.VENICE_DEFAULT_LOGICAL_TS; -import static org.mockito.Mockito.any; -import static org.mockito.Mockito.anyInt; -import static org.mockito.Mockito.atLeast; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; import com.linkedin.venice.ConfigKeys; import com.linkedin.venice.integration.utils.PubSubBrokerWrapper; import com.linkedin.venice.integration.utils.ServiceFactory; import com.linkedin.venice.kafka.TopicManager; -import com.linkedin.venice.kafka.protocol.Delete; import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope; import com.linkedin.venice.kafka.protocol.ProducerMetadata; -import com.linkedin.venice.kafka.protocol.Put; import com.linkedin.venice.kafka.protocol.enums.MessageType; import com.linkedin.venice.message.KafkaKey; import com.linkedin.venice.partitioner.DefaultVenicePartitioner; @@ -28,30 +17,17 @@ import com.linkedin.venice.pubsub.api.PubSubConsumerAdapterFactory; import com.linkedin.venice.pubsub.api.PubSubMessage; import com.linkedin.venice.pubsub.api.PubSubMessageDeserializer; -import com.linkedin.venice.pubsub.api.PubSubProducerAdapter; import com.linkedin.venice.pubsub.api.PubSubTopic; import com.linkedin.venice.pubsub.api.PubSubTopicPartition; -import com.linkedin.venice.serialization.KeyWithChunkingSuffixSerializer; -import com.linkedin.venice.serialization.VeniceKafkaSerializer; -import com.linkedin.venice.serialization.avro.AvroProtocolDefinition; -import com.linkedin.venice.serialization.avro.ChunkedValueManifestSerializer; import com.linkedin.venice.serialization.avro.KafkaValueSerializer; import com.linkedin.venice.serialization.avro.OptimizedKafkaValueSerializer; -import com.linkedin.venice.serialization.avro.VeniceAvroKafkaSerializer; -import com.linkedin.venice.storage.protocol.ChunkId; -import com.linkedin.venice.storage.protocol.ChunkedKeySuffix; -import com.linkedin.venice.storage.protocol.ChunkedValueManifest; import com.linkedin.venice.utils.IntegrationTestPushUtils; -import com.linkedin.venice.utils.SystemTime; import com.linkedin.venice.utils.TestUtils; import com.linkedin.venice.utils.Time; import com.linkedin.venice.utils.Utils; import com.linkedin.venice.utils.VeniceProperties; import com.linkedin.venice.utils.pools.LandFillObjectPool; import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Properties; @@ -59,8 +35,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.concurrent.TimeoutException; -import org.mockito.ArgumentCaptor; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -167,295 +141,4 @@ public void testThreadSafetyForPutMessages() throws ExecutionException, Interrup 100, veniceWriter -> veniceWriter.put(new KafkaKey(MessageType.PUT, "blah".getBytes()), "blah".getBytes(), 1, null)); } - - @Test - public void testCloseSegmentBasedOnElapsedTime() throws InterruptedException, ExecutionException, TimeoutException { - PubSubProducerAdapter mockedProducer = mock(PubSubProducerAdapter.class); - Future mockedFuture = mock(Future.class); - when(mockedProducer.getNumberOfPartitions(any())).thenReturn(1); - when(mockedProducer.getNumberOfPartitions(any(), anyInt(), any())).thenReturn(1); - when(mockedProducer.sendMessage(any(), any(), any(), any(), any(), any())).thenReturn(mockedFuture); - Properties writerProperties = new Properties(); - writerProperties.put(VeniceWriter.MAX_ELAPSED_TIME_FOR_SEGMENT_IN_MS, 0); - String stringSchema = "\"string\""; - VeniceKafkaSerializer serializer = new VeniceAvroKafkaSerializer(stringSchema); - String testTopic = "test"; - VeniceWriterOptions veniceWriterOptions = new VeniceWriterOptions.Builder(testTopic).setKeySerializer(serializer) - .setValueSerializer(serializer) - .setWriteComputeSerializer(serializer) - .setPartitioner(new DefaultVenicePartitioner()) - .setTime(SystemTime.INSTANCE) - .build(); - VeniceWriter writer = - new VeniceWriter(veniceWriterOptions, new VeniceProperties(writerProperties), mockedProducer); - for (int i = 0; i < 1000; i++) { - writer.put(Integer.toString(i), Integer.toString(i), 1, null); - } - ArgumentCaptor kmeArgumentCaptor = ArgumentCaptor.forClass(KafkaMessageEnvelope.class); - verify(mockedProducer, atLeast(1000)).sendMessage(any(), any(), any(), kmeArgumentCaptor.capture(), any(), any()); - int segmentNumber = -1; - for (KafkaMessageEnvelope envelope: kmeArgumentCaptor.getAllValues()) { - if (segmentNumber == -1) { - segmentNumber = envelope.producerMetadata.segmentNumber; - } else { - // Segment number should not change since we disabled closing segment based on elapsed time. - Assert.assertEquals(envelope.producerMetadata.segmentNumber, segmentNumber); - } - } - } - - @Test - public void testReplicationMetadataWrittenCorrectly() - throws InterruptedException, ExecutionException, TimeoutException { - PubSubProducerAdapter mockedProducer = mock(PubSubProducerAdapter.class); - Future mockedFuture = mock(Future.class); - when(mockedProducer.getNumberOfPartitions(any())).thenReturn(1); - when(mockedProducer.getNumberOfPartitions(any(), anyInt(), any())).thenReturn(1); - when(mockedProducer.sendMessage(any(), any(), any(), any(), any(), any())).thenReturn(mockedFuture); - Properties writerProperties = new Properties(); - String stringSchema = "\"string\""; - VeniceKafkaSerializer serializer = new VeniceAvroKafkaSerializer(stringSchema); - String testTopic = "test"; - VeniceWriterOptions veniceWriterOptions = new VeniceWriterOptions.Builder(testTopic).setKeySerializer(serializer) - .setValueSerializer(serializer) - .setWriteComputeSerializer(serializer) - .setPartitioner(new DefaultVenicePartitioner()) - .setTime(SystemTime.INSTANCE) - .build(); - VeniceWriter writer = - new VeniceWriter(veniceWriterOptions, new VeniceProperties(writerProperties), mockedProducer); - - // verify the new veniceWriter API's are able to encode the A/A metadat info correctly. - long ctime = System.currentTimeMillis(); - ByteBuffer replicationMetadata = ByteBuffer.wrap(new byte[] { 0xa, 0xb }); - PutMetadata putMetadata = new PutMetadata(1, replicationMetadata); - DeleteMetadata deleteMetadata = new DeleteMetadata(1, 1, replicationMetadata); - - writer.put( - Integer.toString(1), - Integer.toString(1), - 1, - null, - VeniceWriter.DEFAULT_LEADER_METADATA_WRAPPER, - ctime, - null); - writer.put( - Integer.toString(2), - Integer.toString(2), - 1, - null, - VeniceWriter.DEFAULT_LEADER_METADATA_WRAPPER, - APP_DEFAULT_LOGICAL_TS, - putMetadata); - writer.update(Integer.toString(3), Integer.toString(2), 1, 1, null, ctime); - writer.delete(Integer.toString(4), null, VeniceWriter.DEFAULT_LEADER_METADATA_WRAPPER, ctime); - writer.delete(Integer.toString(5), null, VeniceWriter.DEFAULT_LEADER_METADATA_WRAPPER, deleteMetadata); - writer.put(Integer.toString(6), Integer.toString(1), 1, null, VeniceWriter.DEFAULT_LEADER_METADATA_WRAPPER); - - ArgumentCaptor kmeArgumentCaptor = ArgumentCaptor.forClass(KafkaMessageEnvelope.class); - verify(mockedProducer, atLeast(2)).sendMessage(any(), any(), any(), kmeArgumentCaptor.capture(), any(), any()); - - // first one will be control message SOS, there should not be any aa metadata. - KafkaMessageEnvelope value0 = kmeArgumentCaptor.getAllValues().get(0); - Assert.assertEquals(value0.producerMetadata.logicalTimestamp, VENICE_DEFAULT_LOGICAL_TS); - - // verify timestamp is encoded correctly. - KafkaMessageEnvelope value1 = kmeArgumentCaptor.getAllValues().get(1); - KafkaMessageEnvelope value3 = kmeArgumentCaptor.getAllValues().get(3); - KafkaMessageEnvelope value4 = kmeArgumentCaptor.getAllValues().get(4); - for (KafkaMessageEnvelope kme: Arrays.asList(value1, value3, value4)) { - Assert.assertEquals(kme.producerMetadata.logicalTimestamp, ctime); - } - - // verify default values for replicationMetadata are written correctly - Put put = (Put) value1.payloadUnion; - Assert.assertEquals(put.schemaId, 1); - Assert.assertEquals(put.replicationMetadataVersionId, VeniceWriter.VENICE_DEFAULT_TIMESTAMP_METADATA_VERSION_ID); - Assert.assertEquals(put.replicationMetadataPayload, ByteBuffer.wrap(new byte[0])); - - Delete delete = (Delete) value4.payloadUnion; - Assert.assertEquals(delete.schemaId, VeniceWriter.VENICE_DEFAULT_VALUE_SCHEMA_ID); - Assert.assertEquals(delete.replicationMetadataVersionId, VeniceWriter.VENICE_DEFAULT_TIMESTAMP_METADATA_VERSION_ID); - Assert.assertEquals(delete.replicationMetadataPayload, ByteBuffer.wrap(new byte[0])); - - // verify replicationMetadata is encoded correctly for Put. - KafkaMessageEnvelope value2 = kmeArgumentCaptor.getAllValues().get(2); - Assert.assertEquals(value2.messageType, MessageType.PUT.getValue()); - put = (Put) value2.payloadUnion; - Assert.assertEquals(put.schemaId, 1); - Assert.assertEquals(put.replicationMetadataVersionId, 1); - Assert.assertEquals(put.replicationMetadataPayload, ByteBuffer.wrap(new byte[] { 0xa, 0xb })); - Assert.assertEquals(value2.producerMetadata.logicalTimestamp, APP_DEFAULT_LOGICAL_TS); - - // verify replicationMetadata is encoded correctly for Delete. - KafkaMessageEnvelope value5 = kmeArgumentCaptor.getAllValues().get(5); - Assert.assertEquals(value5.messageType, MessageType.DELETE.getValue()); - delete = (Delete) value5.payloadUnion; - Assert.assertEquals(delete.schemaId, 1); - Assert.assertEquals(delete.replicationMetadataVersionId, 1); - Assert.assertEquals(delete.replicationMetadataPayload, ByteBuffer.wrap(new byte[] { 0xa, 0xb })); - Assert.assertEquals(value5.producerMetadata.logicalTimestamp, APP_DEFAULT_LOGICAL_TS); - - // verify default logical_ts is encoded correctly - KafkaMessageEnvelope value6 = kmeArgumentCaptor.getAllValues().get(6); - Assert.assertEquals(value6.messageType, MessageType.PUT.getValue()); - Assert.assertEquals(value6.producerMetadata.logicalTimestamp, APP_DEFAULT_LOGICAL_TS); - } - - @Test(timeOut = 10000) - public void testReplicationMetadataChunking() throws ExecutionException, InterruptedException, TimeoutException { - PubSubProducerAdapter mockedProducer = mock(PubSubProducerAdapter.class); - Future mockedFuture = mock(Future.class); - when(mockedProducer.getNumberOfPartitions(any())).thenReturn(1); - when(mockedProducer.getNumberOfPartitions(any(), anyInt(), any())).thenReturn(1); - when(mockedProducer.sendMessage(any(), any(), any(), any(), any(), any())).thenReturn(mockedFuture); - String stringSchema = "\"string\""; - VeniceKafkaSerializer serializer = new VeniceAvroKafkaSerializer(stringSchema); - String testTopic = "test"; - VeniceWriterOptions veniceWriterOptions = new VeniceWriterOptions.Builder(testTopic).setKeySerializer(serializer) - .setValueSerializer(serializer) - .setWriteComputeSerializer(serializer) - .setPartitioner(new DefaultVenicePartitioner()) - .setTime(SystemTime.INSTANCE) - .setChunkingEnabled(true) - .setRmdChunkingEnabled(true) - .build(); - VeniceWriter writer = - new VeniceWriter(veniceWriterOptions, VeniceProperties.empty(), mockedProducer); - - ByteBuffer replicationMetadata = ByteBuffer.wrap(new byte[] { 0xa, 0xb }); - PutMetadata putMetadata = new PutMetadata(1, replicationMetadata); - - StringBuilder stringBuilder = new StringBuilder(); - for (int i = 0; i < 50000; i++) { - stringBuilder.append("abcdefghabcdefghabcdefghabcdefgh"); - } - String valueString = stringBuilder.toString(); - - writer.put( - Integer.toString(1), - valueString, - 1, - null, - VeniceWriter.DEFAULT_LEADER_METADATA_WRAPPER, - APP_DEFAULT_LOGICAL_TS, - putMetadata); - ArgumentCaptor keyArgumentCaptor = ArgumentCaptor.forClass(KafkaKey.class); - ArgumentCaptor kmeArgumentCaptor = ArgumentCaptor.forClass(KafkaMessageEnvelope.class); - verify(mockedProducer, atLeast(2)) - .sendMessage(any(), any(), keyArgumentCaptor.capture(), kmeArgumentCaptor.capture(), any(), any()); - KeyWithChunkingSuffixSerializer keyWithChunkingSuffixSerializer = new KeyWithChunkingSuffixSerializer(); - byte[] serializedKey = serializer.serialize(testTopic, Integer.toString(1)); - byte[] serializedValue = serializer.serialize(testTopic, valueString); - byte[] serializedRmd = replicationMetadata.array(); - int availableMessageSize = DEFAULT_MAX_SIZE_FOR_USER_PAYLOAD_PER_MESSAGE_IN_BYTES - serializedKey.length; - - // The order should be SOS, valueChunk1, valueChunk2, replicationMetadataChunk1, manifest for value and RMD. - Assert.assertEquals(kmeArgumentCaptor.getAllValues().size(), 5); - - // Verify value of the 1st chunk. - KafkaMessageEnvelope actualValue1 = kmeArgumentCaptor.getAllValues().get(1); - Assert.assertEquals(actualValue1.messageType, MessageType.PUT.getValue()); - Assert.assertEquals(((Put) actualValue1.payloadUnion).schemaId, -10); - Assert.assertEquals(((Put) actualValue1.payloadUnion).replicationMetadataVersionId, -1); - Assert.assertEquals(((Put) actualValue1.payloadUnion).replicationMetadataPayload, ByteBuffer.allocate(0)); - Assert.assertEquals(((Put) actualValue1.payloadUnion).putValue.array().length, availableMessageSize + 4); - Assert.assertEquals(actualValue1.producerMetadata.logicalTimestamp, VENICE_DEFAULT_LOGICAL_TS); - - // Verify value of the 2nd chunk. - KafkaMessageEnvelope actualValue2 = kmeArgumentCaptor.getAllValues().get(2); - Assert.assertEquals(actualValue2.messageType, MessageType.PUT.getValue()); - Assert.assertEquals(((Put) actualValue2.payloadUnion).schemaId, -10); - Assert.assertEquals(((Put) actualValue2.payloadUnion).replicationMetadataVersionId, -1); - Assert.assertEquals(((Put) actualValue2.payloadUnion).replicationMetadataPayload, ByteBuffer.allocate(0)); - Assert.assertEquals( - ((Put) actualValue2.payloadUnion).putValue.array().length, - (serializedValue.length - availableMessageSize) + 4); - Assert.assertEquals(actualValue2.producerMetadata.logicalTimestamp, VENICE_DEFAULT_LOGICAL_TS); - - ChunkedValueManifestSerializer chunkedValueManifestSerializer = new ChunkedValueManifestSerializer(true); - - final ChunkedValueManifest chunkedValueManifest = new ChunkedValueManifest(); - chunkedValueManifest.schemaId = 1; - chunkedValueManifest.keysWithChunkIdSuffix = new ArrayList<>(2); - chunkedValueManifest.size = serializedValue.length; - - // Verify key of the 1st value chunk. - ChunkedKeySuffix chunkedKeySuffix = new ChunkedKeySuffix(); - chunkedKeySuffix.isChunk = true; - chunkedKeySuffix.chunkId = new ChunkId(); - chunkedKeySuffix.chunkId.chunkIndex = 0; - ProducerMetadata producerMetadata = actualValue1.producerMetadata; - chunkedKeySuffix.chunkId.producerGUID = producerMetadata.producerGUID; - chunkedKeySuffix.chunkId.segmentNumber = producerMetadata.segmentNumber; - chunkedKeySuffix.chunkId.messageSequenceNumber = producerMetadata.messageSequenceNumber; - - ByteBuffer keyWithSuffix = keyWithChunkingSuffixSerializer.serializeChunkedKey(serializedKey, chunkedKeySuffix); - chunkedValueManifest.keysWithChunkIdSuffix.add(keyWithSuffix); - KafkaKey expectedKey1 = new KafkaKey(MessageType.PUT, keyWithSuffix.array()); - KafkaKey actualKey1 = keyArgumentCaptor.getAllValues().get(1); - Assert.assertEquals(actualKey1.getKey(), expectedKey1.getKey()); - - // Verify key of the 2nd value chunk. - chunkedKeySuffix.chunkId.chunkIndex = 1; - keyWithSuffix = keyWithChunkingSuffixSerializer.serializeChunkedKey(serializedKey, chunkedKeySuffix); - chunkedValueManifest.keysWithChunkIdSuffix.add(keyWithSuffix); - KafkaKey expectedKey2 = new KafkaKey(MessageType.PUT, keyWithSuffix.array()); - KafkaKey actualKey2 = keyArgumentCaptor.getAllValues().get(2); - Assert.assertEquals(actualKey2.getKey(), expectedKey2.getKey()); - - // Check value of the 1st RMD chunk. - KafkaMessageEnvelope actualValue3 = kmeArgumentCaptor.getAllValues().get(3); - Assert.assertEquals(actualValue3.messageType, MessageType.PUT.getValue()); - Assert.assertEquals(((Put) actualValue3.payloadUnion).schemaId, -10); - Assert.assertEquals(((Put) actualValue3.payloadUnion).replicationMetadataVersionId, -1); - Assert.assertEquals(((Put) actualValue3.payloadUnion).putValue, ByteBuffer.allocate(0)); - Assert.assertEquals( - ((Put) actualValue3.payloadUnion).replicationMetadataPayload.array().length, - serializedRmd.length + 4); - Assert.assertEquals(actualValue3.producerMetadata.logicalTimestamp, VENICE_DEFAULT_LOGICAL_TS); - - // Check key of the 1st RMD chunk. - ChunkedValueManifest chunkedRmdManifest = new ChunkedValueManifest(); - chunkedRmdManifest.schemaId = 1; - chunkedRmdManifest.keysWithChunkIdSuffix = new ArrayList<>(1); - chunkedRmdManifest.size = serializedRmd.length; - chunkedKeySuffix = new ChunkedKeySuffix(); - chunkedKeySuffix.isChunk = true; - chunkedKeySuffix.chunkId = new ChunkId(); - producerMetadata = actualValue3.producerMetadata; - chunkedKeySuffix.chunkId.producerGUID = producerMetadata.producerGUID; - chunkedKeySuffix.chunkId.segmentNumber = producerMetadata.segmentNumber; - chunkedKeySuffix.chunkId.messageSequenceNumber = producerMetadata.messageSequenceNumber; - // The chunkIndex of the first RMD should be the number of value chunks so that key space of value chunk and RMD - // chunk will not collide. - chunkedKeySuffix.chunkId.chunkIndex = 2; - keyWithSuffix = keyWithChunkingSuffixSerializer.serializeChunkedKey(serializedKey, chunkedKeySuffix); - chunkedRmdManifest.keysWithChunkIdSuffix.add(keyWithSuffix); - KafkaKey expectedKey3 = new KafkaKey(MessageType.PUT, keyWithSuffix.array()); - KafkaKey actualKey3 = keyArgumentCaptor.getAllValues().get(3); - Assert.assertEquals(actualKey3.getKey(), expectedKey3.getKey()); - - // Check key of the manifest. - byte[] topLevelKey = keyWithChunkingSuffixSerializer.serializeNonChunkedKey(serializedKey); - KafkaKey expectedKey4 = new KafkaKey(MessageType.PUT, topLevelKey); - KafkaKey actualKey4 = keyArgumentCaptor.getAllValues().get(4); - Assert.assertEquals(actualKey4.getKey(), expectedKey4.getKey()); - - // Check manifest for both value and rmd. - KafkaMessageEnvelope actualValue4 = kmeArgumentCaptor.getAllValues().get(4); - Assert.assertEquals(actualValue4.messageType, MessageType.PUT.getValue()); - Assert.assertEquals( - ((Put) actualValue4.payloadUnion).schemaId, - AvroProtocolDefinition.CHUNKED_VALUE_MANIFEST.getCurrentProtocolVersion()); - Assert.assertEquals(((Put) actualValue4.payloadUnion).replicationMetadataVersionId, putMetadata.getRmdVersionId()); - Assert.assertEquals( - ((Put) actualValue4.payloadUnion).replicationMetadataPayload, - ByteBuffer.wrap(chunkedValueManifestSerializer.serialize(testTopic, chunkedRmdManifest))); - Assert.assertEquals( - ((Put) actualValue4.payloadUnion).putValue, - ByteBuffer.wrap(chunkedValueManifestSerializer.serialize(testTopic, chunkedValueManifest))); - Assert.assertEquals(actualValue4.producerMetadata.logicalTimestamp, APP_DEFAULT_LOGICAL_TS); - - } }