Skip to content

Commit

Permalink
store non canonical blob sidecars (#7258)
Browse files Browse the repository at this point in the history
* store non canonical blob sidecars
  • Loading branch information
mehdi-aouadi authored Aug 21, 2023
1 parent 7e5e6bc commit bb9c51d
Show file tree
Hide file tree
Showing 20 changed files with 675 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ protected SafeFuture<?> doStart() {
blobSidecarsStorageCountersEnabled,
"blob_sidecar",
pruningTimingsLabelledGauge,
pruningActiveLabelledGauge));
pruningActiveLabelledGauge,
config.isStoreNonCanonicalBlocksEnabled()));
}
final EventChannels eventChannels = serviceConfig.getEventChannels();
chainStorage = ChainStorage.create(database, config.getSpec());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ SafeFuture<List<BlobSidecar>> getBlobSidecarsBySlotAndBlockRoot(

SafeFuture<Optional<BlobSidecar>> getBlobSidecar(SlotAndBlockRootAndBlobIndex key);

SafeFuture<Optional<BlobSidecar>> getNonCanonicalBlobSidecar(SlotAndBlockRootAndBlobIndex key);

/** This method could return non-canonical blob sidecar keys if the slot is not finalized */
SafeFuture<List<SlotAndBlockRootAndBlobIndex>> getBlobSidecarKeys(UInt64 slot);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,143 @@ public void verifyBlobsLifecycle(final DatabaseContext context) throws IOExcepti
assertThat(database.getBlobSidecarColumnCount()).isEqualTo(0L);
}

@TestTemplate
public void verifyNonCanonicalBlobsLifecycle(final DatabaseContext context) throws IOException {
initialize(context);

final BlobSidecar blobSidecar1 =
dataStructureUtil.randomBlobSidecar(
UInt64.valueOf(1), dataStructureUtil.randomBytes32(), UInt64.valueOf(0));
final BlobSidecar blobSidecar2 =
dataStructureUtil.randomBlobSidecar(UInt64.valueOf(2), Bytes32.ZERO, UInt64.valueOf(0));
final BlobSidecar blobSidecar2bis =
dataStructureUtil.randomBlobSidecar(
UInt64.valueOf(2), Bytes32.fromHexString("0x01"), UInt64.valueOf(1));
final BlobSidecar blobSidecar3 =
dataStructureUtil.randomBlobSidecar(
UInt64.valueOf(3), dataStructureUtil.randomBytes32(), UInt64.valueOf(0));
final BlobSidecar blobSidecar5 =
dataStructureUtil.randomBlobSidecar(
UInt64.valueOf(5), dataStructureUtil.randomBytes32(), UInt64.valueOf(0));
final BlobSidecar blobSidecarNotAdded = dataStructureUtil.randomBlobSidecar();

// add non-canonical blobs out of order
database.storeNonCanonicalBlobSidecar(blobSidecar2);
database.storeNonCanonicalBlobSidecar(blobSidecar1);
database.storeNonCanonicalBlobSidecar(blobSidecar2bis);
database.storeNonCanonicalBlobSidecar(blobSidecar5);
database.storeNonCanonicalBlobSidecar(blobSidecar3);

assertThat(database.getNonCanonicalBlobSidecarColumnCount()).isEqualTo(5L);

database.update(
new StorageUpdate(
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Map.of(),
Map.of(),
Map.of(),
Optional.empty(),
Map.of(blobSidecar1.getBlockRoot(), blobSidecar1.getSlot()),
Map.of(),
false,
Optional.empty(),
true));
database.update(
new StorageUpdate(
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Map.of(),
Map.of(),
Map.of(),
Optional.empty(),
Map.of(blobSidecar3.getBlockRoot(), blobSidecar3.getSlot()),
Map.of(),
false,
Optional.empty(),
true));

// check all non-canonical blobs are present
List.of(blobSidecar1, blobSidecar2, blobSidecar2bis, blobSidecar3, blobSidecar5)
.forEach(
blobSidecar ->
assertThat(database.getNonCanonicalBlobSidecar(blobSidecarToKey(blobSidecar)))
.contains(blobSidecar));

// check blobSidecarNotAdded is not present
assertThat(database.getNonCanonicalBlobSidecar(blobSidecarToKey(blobSidecarNotAdded)))
.isEmpty();

// all non-canonical blobs must be streamed ordered by slot
assertNonCanonicalBlobSidecarKeys(
blobSidecar1.getSlot(),
blobSidecar5.getSlot(),
blobSidecarToKey(blobSidecar1),
blobSidecarToKey(blobSidecar2),
blobSidecarToKey(blobSidecar2bis),
blobSidecarToKey(blobSidecar3),
blobSidecarToKey(blobSidecar5));
assertNonCanonicalBlobSidecars(
Map.of(
blobSidecar1.getSlot(),
List.of(blobSidecar1),
blobSidecar2.getSlot(),
List.of(blobSidecar2, blobSidecar2bis),
blobSidecar3.getSlot(),
List.of(blobSidecar3),
blobSidecar5.getSlot(),
List.of(blobSidecar5)));

// Pruning with a prune limit set to 1: Only blobSidecar1 will be pruned
assertThat(database.pruneOldestNonCanonicalBlobSidecars(UInt64.MAX_VALUE, 1)).isTrue();
assertNonCanonicalBlobSidecarKeys(
blobSidecar2.getSlot(),
blobSidecar5.getSlot(),
blobSidecarToKey(blobSidecar2),
blobSidecarToKey(blobSidecar2bis),
blobSidecarToKey(blobSidecar3),
blobSidecarToKey(blobSidecar5));
assertNonCanonicalBlobSidecars(
Map.of(
blobSidecar2.getSlot(), List.of(blobSidecar2, blobSidecar2bis),
blobSidecar3.getSlot(), List.of(blobSidecar3),
blobSidecar5.getSlot(), List.of(blobSidecar5)));
assertThat(database.getNonCanonicalBlobSidecarColumnCount()).isEqualTo(4L);

// Pruning up to slot 1: No blobs pruned
assertThat(database.pruneOldestNonCanonicalBlobSidecars(ONE, 10)).isFalse();
assertNonCanonicalBlobSidecarKeys(
blobSidecar2.getSlot(),
blobSidecar5.getSlot(),
blobSidecarToKey(blobSidecar2),
blobSidecarToKey(blobSidecar2bis),
blobSidecarToKey(blobSidecar3),
blobSidecarToKey(blobSidecar5));
assertNonCanonicalBlobSidecars(
Map.of(
blobSidecar2.getSlot(), List.of(blobSidecar2, blobSidecar2bis),
blobSidecar3.getSlot(), List.of(blobSidecar3),
blobSidecar5.getSlot(), List.of(blobSidecar5)));
assertThat(database.getNonCanonicalBlobSidecarColumnCount()).isEqualTo(4L);

// Prune blobs up to slot 3
assertThat(database.pruneOldestNonCanonicalBlobSidecars(UInt64.valueOf(3), 10)).isFalse();
assertNonCanonicalBlobSidecarKeys(
blobSidecar1.getSlot(), blobSidecar5.getSlot(), blobSidecarToKey(blobSidecar5));
assertNonCanonicalBlobSidecars(Map.of(blobSidecar5.getSlot(), List.of(blobSidecar5)));
assertThat(database.getNonCanonicalBlobSidecarColumnCount()).isEqualTo(1L);

// Pruning all blobs
assertThat(database.pruneOldestNonCanonicalBlobSidecars(UInt64.valueOf(5), 1)).isTrue();
// No blobs should be left
assertNonCanonicalBlobSidecarKeys(ZERO, UInt64.valueOf(10));
assertThat(database.getNonCanonicalBlobSidecarColumnCount()).isEqualTo(0L);
}

@TestTemplate
public void updateWeakSubjectivityState_clearValue(final DatabaseContext context)
throws IOException {
Expand Down Expand Up @@ -1169,6 +1306,117 @@ public void orphanedBlockStorageTest_multiple(final DatabaseContext context) thr
assertThat(database.getNonCanonicalBlocksAtSlot(UInt64.valueOf(6)).size()).isEqualTo(2);
}

@TestTemplate
public void storeNonCanonicalBlobsTest_multiple(final DatabaseContext context)
throws IOException {
createStorageSystem(context, StateStorageMode.ARCHIVE, StoreConfig.createDefault(), true);
final BlockOptions randomBlobsOptions =
BlockOptions.create()
.setGenerateRandomBlobs(true)
.setGenerateRandomBlobsCount(Optional.of(2));
final ChainBuilder primaryChain = ChainBuilder.create(spec, VALIDATOR_KEYS);
primaryChain.generateGenesis(genesisTime, true);
primaryChain.generateBlocksUpToSlot(3, randomBlobsOptions);
final ChainBuilder forkChain = primaryChain.fork();
// Primary chain's next block is at 5
primaryChain.generateBlockAtSlot(5, randomBlobsOptions);
final ChainBuilder secondFork = primaryChain.fork();

final BlobSidecar blobSidecarPrimary2Slot2Index0 =
dataStructureUtil.randomBlobSidecar(
UInt64.valueOf(2), primaryChain.getBlockAtSlot(2).getRoot(), UInt64.valueOf(0));
database.storeBlobSidecar(blobSidecarPrimary2Slot2Index0);
final BlobSidecar blobSidecarPrimary2Slot2Index1 =
dataStructureUtil.randomBlobSidecar(
UInt64.valueOf(2), primaryChain.getBlockAtSlot(2).getRoot(), UInt64.valueOf(1));
database.storeBlobSidecar(blobSidecarPrimary2Slot2Index1);

// Primary chain's next block is at 7
final SignedBlockAndState finalizedBlock =
primaryChain.generateBlockAtSlot(7, randomBlobsOptions);
final Checkpoint finalizedCheckpoint = getCheckpointForBlock(primaryChain.getBlockAtSlot(7));
final UInt64 firstHotBlockSlot = finalizedCheckpoint.getEpochStartSlot(spec).plus(UInt64.ONE);
primaryChain.generateBlockAtSlot(firstHotBlockSlot, randomBlobsOptions);
// Fork chain's next block is at 6
forkChain.generateBlockAtSlot(6, randomBlobsOptions);
forkChain.generateBlockAtSlot(firstHotBlockSlot, randomBlobsOptions);
secondFork.generateBlockAtSlot(6, randomBlobsOptions);
secondFork.generateBlockAtSlot(firstHotBlockSlot, randomBlobsOptions);

initGenesis();

final Set<SignedBlockAndState> allBlocksAndStates =
Streams.concat(
primaryChain.streamBlocksAndStates(),
forkChain.streamBlocksAndStates(),
secondFork.streamBlocksAndStates())
.collect(Collectors.toSet());

final List<BlobSidecar> allBlocksSidecars =
Streams.concat(
primaryChain.streamBlobSidecars(),
forkChain.streamBlobSidecars(),
secondFork.streamBlobSidecars())
.collect(Collectors.toList());

// Finalize at block 7, making the fork blocks unavailable
add(allBlocksAndStates, allBlocksSidecars);

justifyAndFinalizeEpoch(finalizedCheckpoint.getEpoch(), finalizedBlock);

final List<SignedBeaconBlock> nonCanonicalBlocksAt6 =
database.getNonCanonicalBlocksAtSlot(UInt64.valueOf(6));
final List<SignedBeaconBlock> nonCanonicalBlocksAt9 =
database.getNonCanonicalBlocksAtSlot(UInt64.valueOf(9));

// check that streamNonCanonicalBlobSidecarKeys for slot 6 and 9 returns 4 blobs (2 * 2) each
assertThat(nonCanonicalBlocksAt6.size()).isEqualTo(2);
nonCanonicalBlocksAt6.forEach(
block -> {
try (Stream<SlotAndBlockRootAndBlobIndex> slotAndBlockRootAndBlobIndexStream =
database.streamNonCanonicalBlobSidecarKeys(UInt64.valueOf(6), UInt64.valueOf(6))) {
assertThat(slotAndBlockRootAndBlobIndexStream.count()).isEqualTo(2 * 2);
}
});
assertThat(nonCanonicalBlocksAt9.size()).isEqualTo(2);
nonCanonicalBlocksAt9.forEach(
block -> {
try (Stream<SlotAndBlockRootAndBlobIndex> slotAndBlockRootAndBlobIndexStream =
database.streamNonCanonicalBlobSidecarKeys(UInt64.valueOf(9), UInt64.valueOf(9))) {
assertThat(slotAndBlockRootAndBlobIndexStream.count()).isEqualTo(2 * 2);
}
});

// check that all blobs for slot 6 and 9 are stored as non canonical
Streams.concat(nonCanonicalBlocksAt6.stream(), nonCanonicalBlocksAt9.stream())
.flatMap(
block ->
Stream.of(
new SlotAndBlockRootAndBlobIndex(block.getSlot(), block.getRoot(), ZERO),
new SlotAndBlockRootAndBlobIndex(block.getSlot(), block.getRoot(), ONE)))
.forEach(
key -> {
// retrieve expected blobSidecar from allBlocksSidecars
final BlobSidecar blobSidecar =
allBlocksSidecars.stream()
.filter(sidecar -> blobSidecarToKey(sidecar).equals(key))
.findFirst()
.orElseThrow();

assertThat(database.getNonCanonicalBlobSidecar(key)).contains(blobSidecar);
});

SlotAndBlockRootAndBlobIndex key =
new SlotAndBlockRootAndBlobIndex(
UInt64.valueOf(2), primaryChain.getBlockAtSlot(2).getRoot(), ZERO);
assertThat(database.getNonCanonicalBlobSidecar(key)).isEmpty();

key =
new SlotAndBlockRootAndBlobIndex(
UInt64.valueOf(2), primaryChain.getBlockAtSlot(2).getRoot(), ONE);
assertThat(database.getNonCanonicalBlobSidecar(key)).isEmpty();
}

@TestTemplate
public void orphanedBlockStorageTest_noCanonicalBlocks(final DatabaseContext context)
throws IOException {
Expand Down Expand Up @@ -2363,6 +2611,15 @@ private void assertBlobSidecarKeys(
}
}

private void assertNonCanonicalBlobSidecarKeys(
final UInt64 from, final UInt64 to, SlotAndBlockRootAndBlobIndex... keys) {
try (final Stream<SlotAndBlockRootAndBlobIndex> blobSidecarsStream =
database.streamNonCanonicalBlobSidecarKeys(from, to)) {
final List<SlotAndBlockRootAndBlobIndex> keysFromDb = blobSidecarsStream.collect(toList());
assertThat(keysFromDb).isEqualTo(Arrays.asList(keys));
}
}

private void assertBlobSidecars(final Map<UInt64, List<BlobSidecar>> blobSidecars) {
final List<UInt64> slots = blobSidecars.keySet().stream().sorted().collect(toList());
if (slots.isEmpty()) {
Expand All @@ -2385,6 +2642,28 @@ private void assertBlobSidecars(final Map<UInt64, List<BlobSidecar>> blobSidecar
assertThat(blobSidecarsDb).isEqualTo(blobSidecars);
}

private void assertNonCanonicalBlobSidecars(final Map<UInt64, List<BlobSidecar>> blobSidecars) {
final List<UInt64> slots = blobSidecars.keySet().stream().sorted().collect(toList());
if (slots.isEmpty()) {
return;
}

final Map<UInt64, List<BlobSidecar>> blobSidecarsDb = new HashMap<>();
try (final Stream<SlotAndBlockRootAndBlobIndex> blobSidecarsStream =
database.streamNonCanonicalBlobSidecarKeys(slots.get(0), slots.get(slots.size() - 1))) {

for (final Iterator<SlotAndBlockRootAndBlobIndex> iterator = blobSidecarsStream.iterator();
iterator.hasNext(); ) {
final SlotAndBlockRootAndBlobIndex key = iterator.next();
final List<BlobSidecar> currentSlotBlobs =
blobSidecarsDb.computeIfAbsent(key.getSlot(), __ -> new ArrayList<>());
currentSlotBlobs.add(database.getNonCanonicalBlobSidecar(key).orElseThrow());
}
}

assertThat(blobSidecarsDb).isEqualTo(blobSidecars);
}

public static class CreateForkChainResult {
private final ChainBuilder forkChain;
private final UInt64 firstHotBlockSlot;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,12 @@ public SafeFuture<Optional<BlobSidecar>> getBlobSidecar(final SlotAndBlockRootAn
return SafeFuture.of(() -> database.getBlobSidecar(key));
}

@Override
public SafeFuture<Optional<BlobSidecar>> getNonCanonicalBlobSidecar(
final SlotAndBlockRootAndBlobIndex key) {
return SafeFuture.of(() -> database.getNonCanonicalBlobSidecar(key));
}

@Override
public SafeFuture<List<SlotAndBlockRootAndBlobIndex>> getBlobSidecarKeys(final UInt64 slot) {
return SafeFuture.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,12 @@ public SafeFuture<Optional<BlobSidecar>> getBlobSidecar(final SlotAndBlockRootAn
return asyncRunner.runAsync(() -> queryDelegate.getBlobSidecar(key));
}

@Override
public SafeFuture<Optional<BlobSidecar>> getNonCanonicalBlobSidecar(
final SlotAndBlockRootAndBlobIndex key) {
return asyncRunner.runAsync(() -> queryDelegate.getNonCanonicalBlobSidecar(key));
}

@Override
public SafeFuture<List<SlotAndBlockRootAndBlobIndex>> getBlobSidecarKeys(final UInt64 slot) {
return asyncRunner.runAsync(() -> queryDelegate.getBlobSidecarKeys(slot));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,12 @@ void storeFinalizedBlocks(

void storeBlobSidecar(BlobSidecar blobSidecar);

void storeNonCanonicalBlobSidecar(BlobSidecar blobSidecar);

Optional<BlobSidecar> getBlobSidecar(SlotAndBlockRootAndBlobIndex key);

Optional<BlobSidecar> getNonCanonicalBlobSidecar(SlotAndBlockRootAndBlobIndex key);

void removeBlobSidecars(SlotAndBlockRoot slotAndBlockRoot);

/**
Expand All @@ -80,9 +84,15 @@ void storeFinalizedBlocks(
*/
boolean pruneOldestBlobSidecars(UInt64 lastSlotToPrune, int pruneLimit);

boolean pruneOldestNonCanonicalBlobSidecars(UInt64 lastSlotToPrune, int pruneLimit);

@MustBeClosed
Stream<SlotAndBlockRootAndBlobIndex> streamBlobSidecarKeys(UInt64 startSlot, UInt64 endSlot);

@MustBeClosed
Stream<SlotAndBlockRootAndBlobIndex> streamNonCanonicalBlobSidecarKeys(
UInt64 startSlot, UInt64 endSlot);

@MustBeClosed
default Stream<SlotAndBlockRootAndBlobIndex> streamBlobSidecarKeys(UInt64 slot) {
return streamBlobSidecarKeys(slot, slot);
Expand Down Expand Up @@ -200,6 +210,8 @@ default Stream<SlotAndBlockRootAndBlobIndex> streamBlobSidecarKeys(UInt64 slot)

long getBlobSidecarColumnCount();

long getNonCanonicalBlobSidecarColumnCount();

void migrate();

Optional<Checkpoint> getAnchor();
Expand Down
Loading

0 comments on commit bb9c51d

Please sign in to comment.