Skip to content

Commit

Permalink
[server] Use chunk manifest schema id for writer when performing upda…
Browse files Browse the repository at this point in the history
…tes to chunked records (linkedin#1101)

* [server] Fix A/A partial update on chunked batch pushed key

This is to resolve the ingestion issues we've seen for chunked records in batch push combined with write compute

---------

Co-authored-by: Jialin Liu <[email protected]>
  • Loading branch information
ZacAttack and sixpluszero authored Aug 12, 2024
1 parent 60b13a4 commit 03a4172
Show file tree
Hide file tree
Showing 9 changed files with 284 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,8 @@ protected void processMessageAndMaybeProduceToKafka(
writeTimestamp,
sourceOffset,
kafkaClusterId,
kafkaClusterId);
kafkaClusterId,
valueManifestContainer);
getHostLevelIngestionStats()
.recordIngestionActiveActiveUpdateLatency(LatencyUtils.getElapsedTimeFromNSToMS(beforeDCRTimestampInNs));
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.linkedin.davinci.schema.merge.ValueAndRmd;
import com.linkedin.davinci.serializer.avro.MapOrderPreservingSerDeFactory;
import com.linkedin.davinci.serializer.avro.fast.MapOrderPreservingFastSerDeFactory;
import com.linkedin.davinci.storage.chunking.ChunkedValueManifestContainer;
import com.linkedin.davinci.store.record.ValueRecord;
import com.linkedin.venice.annotation.Threadsafe;
import com.linkedin.venice.exceptions.VeniceException;
Expand Down Expand Up @@ -227,15 +228,16 @@ public MergeConflictResult delete(
}

public MergeConflictResult update(
Lazy<ByteBuffer> oldValueBytesProvider,
Lazy<ByteBuffer> oldValueBytes,
RmdWithValueSchemaId rmdWithValueSchemaId,
ByteBuffer updateBytes,
final int incomingValueSchemaId,
final int incomingUpdateProtocolVersion,
final long updateOperationTimestamp,
final long newValueSourceOffset,
final int newValueSourceBrokerID,
final int newValueColoID) {
final int newValueColoID,
ChunkedValueManifestContainer oldValueManifest) {
final SchemaEntry supersetValueSchemaEntry = storeSchemaCache.getSupersetSchema();
if (supersetValueSchemaEntry == null) {
throw new IllegalStateException("Expect to get superset value schema for store: " + storeName);
Expand All @@ -249,8 +251,11 @@ public MergeConflictResult update(
if (ignoreNewUpdate(updateOperationTimestamp, writeComputeRecord, rmdWithValueSchemaId)) {
return MergeConflictResult.getIgnoredResult();
}
ValueAndRmd<GenericRecord> oldValueAndRmd =
prepareValueAndRmdForUpdate(oldValueBytesProvider.get(), rmdWithValueSchemaId, supersetValueSchemaEntry);
ValueAndRmd<GenericRecord> oldValueAndRmd = prepareValueAndRmdForUpdate(
oldValueBytes.get(),
rmdWithValueSchemaId,
supersetValueSchemaEntry,
oldValueManifest);

int oldValueSchemaID = oldValueAndRmd.getValueSchemaId();
if (oldValueSchemaID == -1) {
Expand Down Expand Up @@ -616,32 +621,41 @@ private GenericRecord deserializeWriteComputeBytes(
private ValueAndRmd<GenericRecord> prepareValueAndRmdForUpdate(
ByteBuffer oldValueBytes,
RmdWithValueSchemaId rmdWithValueSchemaId,
SchemaEntry readerValueSchemaEntry) {
SchemaEntry readerValueSchemaSchemaEntry,
ChunkedValueManifestContainer oldValueManifest) {

if (rmdWithValueSchemaId == null) {
GenericRecord newValue;
if (oldValueBytes == null) {
// Value and RMD both never existed
newValue = AvroSchemaUtils.createGenericRecord(readerValueSchemaEntry.getSchema());
newValue = AvroSchemaUtils.createGenericRecord(readerValueSchemaSchemaEntry.getSchema());
} else {
/**
* RMD does not exist. This means the value is written in Batch phase and does not have RMD associated. In this
* case, the value must be retrieved from storage engine, and is prepended with schema ID.
* RMD does not exist. This means the value is written in Batch phase and does not have RMD associated. Records
* should have the schema id in the first few bytes unless they are assembled from a chunked value. In order
* to provide coverage for both cases, we just utilize the supersetSchema entry which should be the latest
* schema (and therefore should not drop fields after the update).
*/
int schemaId = ValueRecord.parseSchemaId(oldValueBytes.array());
newValue =
deserializerCacheForFullValue.get(schemaId, readerValueSchemaEntry.getId()).deserialize(oldValueBytes);

int schemaId;
if (oldValueManifest != null && oldValueManifest.getManifest() != null) {
schemaId = oldValueManifest.getManifest().getSchemaId();
} else {
schemaId = ValueRecord.parseSchemaId(oldValueBytes.array());
}
newValue = deserializerCacheForFullValue.get(schemaId, readerValueSchemaSchemaEntry.getId())
.deserialize(oldValueBytes);
}
GenericRecord newRmd = newRmdCreator.apply(readerValueSchemaEntry.getId());
GenericRecord newRmd = newRmdCreator.apply(readerValueSchemaSchemaEntry.getId());
newRmd.put(TIMESTAMP_FIELD_POS, createPerFieldTimestampRecord(newRmd.getSchema(), 0L, newValue));
newRmd.put(REPLICATION_CHECKPOINT_VECTOR_FIELD_POS, new ArrayList<Long>());
return new ValueAndRmd<>(Lazy.of(() -> newValue), newRmd);
}

int oldValueWriterSchemaId = rmdWithValueSchemaId.getValueSchemaId();
return createOldValueAndRmd(
readerValueSchemaEntry.getSchema(),
readerValueSchemaEntry.getId(),
readerValueSchemaSchemaEntry.getSchema(),
readerValueSchemaSchemaEntry.getId(),
oldValueWriterSchemaId,
Lazy.of(() -> oldValueBytes),
rmdWithValueSchemaId.getRmdRecord());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@
import static com.linkedin.venice.schema.rmd.v1.CollectionRmdTimestamp.TOP_LEVEL_TS_FIELD_NAME;

import com.linkedin.davinci.replication.RmdWithValueSchemaId;
import com.linkedin.davinci.storage.chunking.ChunkedValueManifestContainer;
import com.linkedin.davinci.utils.IndexedHashMap;
import com.linkedin.venice.schema.rmd.RmdConstants;
import com.linkedin.venice.storage.protocol.ChunkedValueManifest;
import com.linkedin.venice.utils.DataProviderUtils;
import com.linkedin.venice.utils.lazy.Lazy;
import com.linkedin.venice.writer.update.UpdateBuilderImpl;
Expand Down Expand Up @@ -56,7 +58,8 @@ public void testRegularFieldUpdateIgnored(boolean rmdStartsWithFieldLevelTs, boo
updateTs,
1L,
0,
0);
0,
null);
Assert.assertTrue(result.isUpdateIgnored());
}

Expand Down Expand Up @@ -101,7 +104,8 @@ public void testCollectionFieldSetFieldUpdateIgnored(
updateTs,
1L,
0,
updateColoId);
updateColoId,
null);
Assert.assertTrue(result.isUpdateIgnored());
}

Expand Down Expand Up @@ -161,7 +165,8 @@ public void testCollectionFieldAddElementsUpdateIgnoredOnFieldLevelTs(
updateTs,
1L,
0,
-2);
-2,
null);
Assert.assertTrue(result.isUpdateIgnored());
}

Expand Down Expand Up @@ -219,7 +224,8 @@ public void testCollectionFieldRemoveElementsUpdateIgnoredOnFieldLevelTs(boolean
updateTs,
1L,
0,
-2);
-2,
null);
Assert.assertTrue(result.isUpdateIgnored());
}

Expand Down Expand Up @@ -258,7 +264,8 @@ public void testMapFieldAddEntriesUpdateIgnoredOnValueLevelTs(boolean updateTsSm
updateTs,
1L,
0,
-2);
-2,
null);
Assert.assertTrue(result.isUpdateIgnored());
}

Expand Down Expand Up @@ -329,7 +336,8 @@ public void testSetCollectionFieldOnFieldValueTs() {
2L,
1L,
0,
0);
0,
null);

GenericRecord updatedValueRecord = deserializeValueRecord(result.getNewValue());
Assert.assertEquals(
Expand Down Expand Up @@ -395,7 +403,8 @@ public void testSetCollectionFieldOnFieldValueTs() {
10L,
1L,
0,
0);
0,
null);

GenericRecord newUpdatedValueRecord = deserializeValueRecord(result.getNewValue());
Assert.assertNull(newUpdatedValueRecord.get(NULLABLE_STRING_ARRAY_FIELD_NAME));
Expand All @@ -416,6 +425,48 @@ public void testSetCollectionFieldOnFieldValueTs() {
Assert.assertEquals(updatedNullableMapTsRecord.get(ACTIVE_ELEM_TS_FIELD_NAME), Collections.emptyList());
Assert.assertEquals(updatedNullableMapTsRecord.get(DELETED_ELEM_FIELD_NAME), Collections.emptyList());
Assert.assertEquals(updatedNullableMapTsRecord.get(DELETED_ELEM_TS_FIELD_NAME), Collections.emptyList());

// Set nullable collection field to NULL value by updating it in a large enough TS.
partialUpdateRecord =
new UpdateBuilderImpl(schemaSet.getUpdateSchema()).setNewFieldValue(NULLABLE_STRING_MAP_FIELD_NAME, null)
.setNewFieldValue(NULLABLE_STRING_ARRAY_FIELD_NAME, null)
.build();
ChunkedValueManifest chunkedValueManifest = new ChunkedValueManifest();
chunkedValueManifest.setSchemaId(schemaSet.getValueSchemaId());
ChunkedValueManifestContainer container = new ChunkedValueManifestContainer();
container.setManifest(chunkedValueManifest);
result = mergeConflictResolver.update(
Lazy.of(() -> serializeValueRecord(updatedValueRecord)),
null,
// new RmdWithValueSchemaId(schemaSet.getValueSchemaId(), RMD_VERSION_ID, updateRmdRecord),
serializeUpdateRecord(partialUpdateRecord),
schemaSet.getValueSchemaId(),
schemaSet.getUpdateSchemaProtocolVersion(),
10L,
1L,
0,
0,
container);

newUpdatedValueRecord = deserializeValueRecord(result.getNewValue());
Assert.assertNull(newUpdatedValueRecord.get(NULLABLE_STRING_ARRAY_FIELD_NAME));
Assert.assertNull(newUpdatedValueRecord.get(NULLABLE_STRING_MAP_FIELD_NAME));
newUpdatedRmdRecord = result.getRmdRecord();
newUpdatedRmdTsRecord = (GenericRecord) newUpdatedRmdRecord.get(RmdConstants.TIMESTAMP_FIELD_NAME);

updatedNullableListTsRecord = (GenericRecord) newUpdatedRmdTsRecord.get(NULLABLE_STRING_ARRAY_FIELD_NAME);
Assert.assertEquals(updatedNullableListTsRecord.get(TOP_LEVEL_TS_FIELD_NAME), 10L);
Assert.assertEquals(updatedNullableListTsRecord.get(PUT_ONLY_PART_LENGTH_FIELD_NAME), 0);
Assert.assertEquals(updatedNullableListTsRecord.get(ACTIVE_ELEM_TS_FIELD_NAME), Collections.emptyList());
Assert.assertEquals(updatedNullableListTsRecord.get(DELETED_ELEM_FIELD_NAME), Collections.emptyList());
Assert.assertEquals(updatedNullableListTsRecord.get(DELETED_ELEM_TS_FIELD_NAME), Collections.emptyList());

updatedNullableMapTsRecord = (GenericRecord) newUpdatedRmdTsRecord.get(NULLABLE_STRING_MAP_FIELD_NAME);
Assert.assertEquals(updatedNullableMapTsRecord.get(TOP_LEVEL_TS_FIELD_NAME), 10L);
Assert.assertEquals(updatedNullableMapTsRecord.get(PUT_ONLY_PART_LENGTH_FIELD_NAME), 0);
Assert.assertEquals(updatedNullableMapTsRecord.get(ACTIVE_ELEM_TS_FIELD_NAME), Collections.emptyList());
Assert.assertEquals(updatedNullableMapTsRecord.get(DELETED_ELEM_FIELD_NAME), Collections.emptyList());
Assert.assertEquals(updatedNullableMapTsRecord.get(DELETED_ELEM_TS_FIELD_NAME), Collections.emptyList());
}

@Test
Expand Down Expand Up @@ -474,7 +525,8 @@ public void testAddToCollectionFieldOnFieldValueTs() {
2L,
1L,
0,
0);
0,
null);

GenericRecord updatedValueRecord = deserializeValueRecord(result.getNewValue());
Assert.assertEquals(
Expand Down Expand Up @@ -552,7 +604,8 @@ public void testRemoveFromCollectionFieldOnFieldLevelTs() {
3L,
1L,
0,
0);
0,
null);
GenericRecord updatedValueRecord = deserializeValueRecord(result.getNewValue());
Assert.assertEquals(updatedValueRecord.get(NULLABLE_STRING_ARRAY_FIELD_NAME), Collections.emptyList());
IndexedHashMap<Utf8, Utf8> expectedNullableMap = new IndexedHashMap<>();
Expand Down Expand Up @@ -762,7 +815,8 @@ private MergeConflictResult updateNullableCollection(
timestamp,
1L,
0,
0);
0,
null);
}

private void validateNullableCollectionUpdateResult(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ public void testUpdateIgnoredFieldUpdate() {
valueLevelTimestamp - 1, // Slightly lower than existing timestamp. Thus update should be ignored.
1,
1,
1);
1,
null);
Assert.assertEquals(mergeConflictResult, MergeConflictResult.getIgnoredResult());
Assert.assertTrue(
((List<?>) rmdRecord.get(RmdConstants.REPLICATION_CHECKPOINT_VECTOR_FIELD_NAME)).isEmpty(),
Expand All @@ -107,8 +108,6 @@ public void testWholeFieldUpdate() {
oldValueRecord.put("age", 30);
oldValueRecord.put("name", "Kafka");
oldValueRecord.put("intArray", Arrays.asList(1, 2, 3));
ByteBuffer oldValueBytes =
ByteBuffer.wrap(MapOrderPreservingSerDeFactory.getSerializer(personSchemaV2).serialize(oldValueRecord));

// Set up Write Compute request.
Schema writeComputeSchema = WriteComputeSchemaConverter.getInstance().convertFromValueRecordSchema(personSchemaV2);
Expand Down Expand Up @@ -146,15 +145,16 @@ public void testWholeFieldUpdate() {
ByteBuffer writeComputeBytes1 = ByteBuffer.wrap(
MapOrderPreservingSerDeFactory.getSerializer(writeComputeSchema).serialize(updateFieldPartialUpdateRecord1));
MergeConflictResult mergeConflictResult = mergeConflictResolver.update(
Lazy.of(() -> oldValueBytes),
Lazy.of(() -> null),
rmdWithValueSchemaId,
writeComputeBytes1,
incomingValueSchemaId,
incomingWriteComputeSchemaId,
valueLevelTimestamp + 1,
1,
1,
1);
1,
null);

GenericRecord updateFieldPartialUpdateRecord2 = AvroSchemaUtils.createGenericRecord(writeComputeSchema);
updateFieldPartialUpdateRecord2.put("intArray", Arrays.asList(10, 20, 30, 40));
Expand All @@ -171,7 +171,8 @@ public void testWholeFieldUpdate() {
valueLevelTimestamp + 2,
2,
0,
0);
0,
null);

// Validate updated replication metadata.
Assert.assertFalse(mergeConflictResult.isUpdateIgnored());
Expand Down Expand Up @@ -276,7 +277,8 @@ public void testCollectionMerge() {
valueLevelTimestamp + 1, // Slightly higher than existing timestamp. Thus update is NOT ignored.
1,
1,
newColoID);
newColoID,
null);

// Validate updated replication metadata.
Assert.assertNotEquals(mergeConflictResult, MergeConflictResult.getIgnoredResult());
Expand Down
Loading

0 comments on commit 03a4172

Please sign in to comment.