Skip to content

Commit

Permalink
move non canonical blob sidecars
Browse files Browse the repository at this point in the history
  • Loading branch information
mehdi-aouadi committed Jun 16, 2023
1 parent aea27cd commit f3bf37e
Show file tree
Hide file tree
Showing 8 changed files with 150 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1176,6 +1176,113 @@ public void orphanedBlockStorageTest_multiple(final DatabaseContext context) thr
assertThat(database.getNonCanonicalBlocksAtSlot(UInt64.valueOf(6)).size()).isEqualTo(2);
}

@TestTemplate
public void storeNonCanonicalBlobsTest_multiple(final DatabaseContext context)
throws IOException {
createStorageSystem(context, StateStorageMode.ARCHIVE, StoreConfig.createDefault(), true);
final ChainBuilder primaryChain = ChainBuilder.create(spec, VALIDATOR_KEYS);
primaryChain.generateGenesis(genesisTime, true);
primaryChain.generateBlocksUpToSlot(3);
final ChainBuilder forkChain = primaryChain.fork();
// Primary chain's next block is at 5
primaryChain.generateBlockAtSlot(5);
final ChainBuilder secondFork = primaryChain.fork();

final BlobSidecar blobSidecarPrimary2Slot2Index0 =
dataStructureUtil.randomBlobSidecar(
UInt64.valueOf(2), primaryChain.getBlockAtSlot(2).getRoot(), UInt64.valueOf(0));
database.storeBlobSidecar(blobSidecarPrimary2Slot2Index0);
final BlobSidecar blobSidecarPrimary2Slot2Index1 =
dataStructureUtil.randomBlobSidecar(
UInt64.valueOf(2), primaryChain.getBlockAtSlot(2).getRoot(), UInt64.valueOf(1));
database.storeBlobSidecar(blobSidecarPrimary2Slot2Index1);

// Primary chain's next block is at 7
final SignedBlockAndState finalizedBlock = primaryChain.generateBlockAtSlot(7);
final Checkpoint finalizedCheckpoint = getCheckpointForBlock(primaryChain.getBlockAtSlot(7));
final UInt64 firstHotBlockSlot = finalizedCheckpoint.getEpochStartSlot(spec).plus(UInt64.ONE);
primaryChain.generateBlockAtSlot(firstHotBlockSlot);
// Fork chain's next block is at 6
forkChain.generateBlockAtSlot(6);
forkChain.generateBlockAtSlot(firstHotBlockSlot);
secondFork.generateBlockAtSlot(6);
secondFork.generateBlockAtSlot(firstHotBlockSlot);

final BlobSidecar blobSidecarFork2Slot6Index0 =
dataStructureUtil.randomBlobSidecar(
UInt64.valueOf(6), forkChain.getBlockAtSlot(6).getRoot(), UInt64.valueOf(0));
database.storeBlobSidecar(blobSidecarFork2Slot6Index0);
final BlobSidecar blobSidecarFork2Slot6Index1 =
dataStructureUtil.randomBlobSidecar(
UInt64.valueOf(6), forkChain.getBlockAtSlot(6).getRoot(), UInt64.valueOf(1));
database.storeBlobSidecar(blobSidecarFork2Slot6Index1);
final BlobSidecar blobSidecarFork2Slot9Index0 =
dataStructureUtil.randomBlobSidecar(
UInt64.valueOf(9), forkChain.getBlockAtSlot(9).getRoot(), UInt64.valueOf(0));
database.storeBlobSidecar(blobSidecarFork2Slot9Index0);
final BlobSidecar blobSidecarFork2Slot9Index1 =
dataStructureUtil.randomBlobSidecar(
UInt64.valueOf(9), forkChain.getBlockAtSlot(9).getRoot(), UInt64.valueOf(1));
database.storeBlobSidecar(blobSidecarFork2Slot9Index1);

initGenesis();

final Set<SignedBlockAndState> allBlocksAndStates =
Streams.concat(
primaryChain.streamBlocksAndStates(),
forkChain.streamBlocksAndStates(),
secondFork.streamBlocksAndStates())
.collect(Collectors.toSet());

// Finalize at block 7, making the fork blocks unavailable
add(allBlocksAndStates);
justifyAndFinalizeEpoch(finalizedCheckpoint.getEpoch(), finalizedBlock);

assertThat(database.getNonCanonicalBlocksAtSlot(UInt64.valueOf(6)).size()).isEqualTo(2);

SlotAndBlockRootAndBlobIndex key =
new SlotAndBlockRootAndBlobIndex(
UInt64.valueOf(6), forkChain.getBlockAtSlot(6).getRoot(), ZERO);
assertThat(database.getNonCanonicalBlobSidecar(key)).isPresent();
assertThat(database.getNonCanonicalBlobSidecar(key))
.get()
.isEqualTo(blobSidecarFork2Slot6Index0);

key =
new SlotAndBlockRootAndBlobIndex(
UInt64.valueOf(6), forkChain.getBlockAtSlot(6).getRoot(), ONE);
assertThat(database.getNonCanonicalBlobSidecar(key)).isPresent();
assertThat(database.getNonCanonicalBlobSidecar(key))
.get()
.isEqualTo(blobSidecarFork2Slot6Index1);

key =
new SlotAndBlockRootAndBlobIndex(
UInt64.valueOf(9), forkChain.getBlockAtSlot(9).getRoot(), ZERO);
assertThat(database.getNonCanonicalBlobSidecar(key)).isPresent();
assertThat(database.getNonCanonicalBlobSidecar(key))
.get()
.isEqualTo(blobSidecarFork2Slot9Index0);

key =
new SlotAndBlockRootAndBlobIndex(
UInt64.valueOf(9), forkChain.getBlockAtSlot(9).getRoot(), ONE);
assertThat(database.getNonCanonicalBlobSidecar(key)).isPresent();
assertThat(database.getNonCanonicalBlobSidecar(key))
.get()
.isEqualTo(blobSidecarFork2Slot9Index1);

key =
new SlotAndBlockRootAndBlobIndex(
UInt64.valueOf(2), primaryChain.getBlockAtSlot(2).getRoot(), ZERO);
assertThat(database.getNonCanonicalBlobSidecar(key)).isEmpty();

key =
new SlotAndBlockRootAndBlobIndex(
UInt64.valueOf(2), primaryChain.getBlockAtSlot(2).getRoot(), ONE);
assertThat(database.getNonCanonicalBlobSidecar(key)).isEmpty();
}

@TestTemplate
public void orphanedBlockStorageTest_noCanonicalBlocks(final DatabaseContext context)
throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ public class StorageConfiguration {
public static final Duration DEFAULT_BLOCK_PRUNING_INTERVAL = Duration.ofMinutes(15);
public static final Duration DEFAULT_BLOBS_PRUNING_INTERVAL = Duration.ofMinutes(1);
public static final int DEFAULT_BLOBS_PRUNING_LIMIT = 32;
public static final boolean DEFAULT_STORE_NON_CANONICAL_BLOB_SIDECARS_ENABLED = false;
private final Eth1Address eth1DepositContract;

private final StateStorageMode dataStorageMode;
Expand All @@ -39,7 +38,6 @@ public class StorageConfiguration {
private final Duration blockPruningInterval;
private final Duration blobsPruningInterval;
private final int blobsPruningLimit;
private final boolean storeNonCanonicalBlobSidecars;

private StorageConfiguration(
final Eth1Address eth1DepositContract,
Expand All @@ -51,7 +49,6 @@ private StorageConfiguration(
final Duration blockPruningInterval,
final Duration blobsPruningInterval,
final int blobsPruningLimit,
final boolean storeNonCanonicalBlobSidecars,
final Spec spec) {
this.eth1DepositContract = eth1DepositContract;
this.dataStorageMode = dataStorageMode;
Expand All @@ -62,7 +59,6 @@ private StorageConfiguration(
this.blockPruningInterval = blockPruningInterval;
this.blobsPruningInterval = blobsPruningInterval;
this.blobsPruningLimit = blobsPruningLimit;
this.storeNonCanonicalBlobSidecars = storeNonCanonicalBlobSidecars;
this.spec = spec;
}

Expand Down Expand Up @@ -106,10 +102,6 @@ public int getBlobsPruningLimit() {
return blobsPruningLimit;
}

public boolean isStoreNonCanonicalBlobSidecarsEnabled() {
return storeNonCanonicalBlobSidecars;
}

public Spec getSpec() {
return spec;
}
Expand All @@ -127,9 +119,6 @@ public static final class Builder {
private Duration blobsPruningInterval = DEFAULT_BLOBS_PRUNING_INTERVAL;
private int blobsPruningLimit = DEFAULT_BLOBS_PRUNING_LIMIT;

private boolean storeNonCanonicalBlobSidecars =
DEFAULT_STORE_NON_CANONICAL_BLOB_SIDECARS_ENABLED;

private Builder() {}

public Builder eth1DepositContract(Eth1Address eth1DepositContract) {
Expand Down Expand Up @@ -207,11 +196,6 @@ public Builder blobsPruningLimit(final int blobsPruningLimit) {
return this;
}

public Builder storeNonCanonicalBlobSidecars(final boolean storeNonCanonicalBlobSidecars) {
this.storeNonCanonicalBlobSidecars = storeNonCanonicalBlobSidecars;
return this;
}

public StorageConfiguration build() {
return new StorageConfiguration(
eth1DepositContract,
Expand All @@ -223,7 +207,6 @@ public StorageConfiguration build() {
blockPruningInterval,
blobsPruningInterval,
blobsPruningLimit,
storeNonCanonicalBlobSidecars,
spec);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
public class KvStoreDatabase implements Database {

protected static final int TX_BATCH_SIZE = 500;
protected static final int BLOBS_TX_BATCH_SIZE = 100;
private static final Logger LOG = LogManager.getLogger();
protected final Spec spec;
protected final boolean storeNonCanonicalBlocks;
Expand Down Expand Up @@ -1017,21 +1018,50 @@ private void removeNonCanonicalBlobSidecars(
final Map<Bytes32, UInt64> deletedHotBlocks,
final Map<Bytes32, Bytes32> finalizedChildToParentMap) {
try (final FinalizedUpdater updater = finalizedUpdater()) {
LOG.trace("Removing blob sidecars for non-canonical blocks");
final Set<SlotAndBlockRoot> nonCanonicalBlocks =
deletedHotBlocks.entrySet().stream()
.filter(entry -> !finalizedChildToParentMap.containsKey(entry.getKey()))
.map(entry -> new SlotAndBlockRoot(entry.getValue(), entry.getKey()))
.collect(Collectors.toSet());
for (final SlotAndBlockRoot slotAndBlockRoot : nonCanonicalBlocks) {
dao.getBlobSidecarKeys(slotAndBlockRoot)
.forEach(
key -> {
LOG.trace("Removing blobSidecar with index {} for non-canonical block", key);
updater.removeBlobSidecar(key);
});
if (storeNonCanonicalBlocks) {
int index = 0;
Iterator<SlotAndBlockRoot> nonCanonicalBlocksIterator = nonCanonicalBlocks.iterator();
while (nonCanonicalBlocksIterator.hasNext()) {
if (index < BLOBS_TX_BATCH_SIZE) {
SlotAndBlockRoot next = nonCanonicalBlocksIterator.next();
List<SlotAndBlockRootAndBlobIndex> test = dao.getBlobSidecarKeys(next);
dao.getBlobSidecarKeys(next)
.forEach(
key -> {
dao.getBlobSidecar(key)
.ifPresent(
blobSidecarBytes -> {
BlobSidecar blobSidecar =
spec.deserializeBlobSidecar(blobSidecarBytes, key.getSlot());
updater.addNonCanonicalBlobSidecar(blobSidecar);
updater.removeBlobSidecar(key);
});
});
}
if (index >= BLOBS_TX_BATCH_SIZE || index == nonCanonicalBlocks.size() - 1) {
index = 0;
updater.commit();
} else {
index++;
}
}
} else {
LOG.trace("Removing blob sidecars for non-canonical blocks");
for (final SlotAndBlockRoot slotAndBlockRoot : nonCanonicalBlocks) {
dao.getBlobSidecarKeys(slotAndBlockRoot)
.forEach(
key -> {
LOG.trace("Removing blobSidecar with index {} for non-canonical block", key);
updater.removeBlobSidecar(key);
});
}
updater.commit();
}
updater.commit();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ private V6SchemaCombinedSnapshot(final Spec spec, final int finalizedOffset) {

nonCanonicalBlobSidecarBySlotRootBlobIndex =
KvStoreColumn.create(
finalizedOffset + 12,
finalizedOffset + 13,
SLOT_AND_BLOCK_ROOT_AND_BLOB_INDEX_KEY_SERIALIZER,
BYTES_SERIALIZER);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public V6SchemaCombinedTreeState(final Spec spec) {
BYTES_SERIALIZER);
nonCanonicalBlobSidecarBySlotRootBlobIndex =
KvStoreColumn.create(
finalizedOffset + 14,
finalizedOffset + 15,
SLOT_AND_BLOCK_ROOT_AND_BLOB_INDEX_KEY_SERIALIZER,
BYTES_SERIALIZER);
deletedColumnIds =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public class BeaconNodeDataOptions extends ValidatorClientDataOptions {
names = {"--data-storage-non-canonical-blocks-enabled"},
paramLabel = "<BOOLEAN>",
showDefaultValue = Visibility.ALWAYS,
description = "Store non-canonical blocks",
description = "Store non-canonical blocks and associated blobs if they exist",
fallbackValue = "true",
arity = "0..1")
private boolean storeNonCanonicalBlocksEnabled =
Expand Down Expand Up @@ -127,16 +127,6 @@ public class BeaconNodeDataOptions extends ValidatorClientDataOptions {
arity = "0..1")
private int blobsPruningLimit = StorageConfiguration.DEFAULT_BLOBS_PRUNING_LIMIT;

@CommandLine.Option(
names = {"--data-storage-non-canonical-blob-sidecars-enabled"},
paramLabel = "<BOOLEAN>",
showDefaultValue = Visibility.ALWAYS,
description = "Store non-canonical blob sidecars",
fallbackValue = "true",
arity = "0..1")
private boolean storeNonCanonicalBlobSidecarsEnabled =
StorageConfiguration.DEFAULT_STORE_NON_CANONICAL_BLOB_SIDECARS_ENABLED;

@Override
protected DataConfig.Builder configureDataConfig(final DataConfig.Builder config) {
return super.configureDataConfig(config).beaconDataPath(dataBeaconPath);
Expand All @@ -154,8 +144,7 @@ public void configure(final TekuConfiguration.Builder builder) {
.maxKnownNodeCacheSize(maxKnownNodeCacheSize)
.blockPruningInterval(Duration.ofSeconds(blockPruningIntervalSeconds))
.blobsPruningInterval(Duration.ofSeconds(blobsPruningIntervalSeconds))
.blobsPruningLimit(blobsPruningLimit)
.storeNonCanonicalBlobSidecars(storeNonCanonicalBlobSidecarsEnabled));
.blobsPruningLimit(blobsPruningLimit));
builder.sync(
b ->
b.fetchAllHistoricBlocks(dataStorageMode.storesAllBlocks())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,6 @@ KvStoreDatabase createDatabase(final Path databasePath, DatabaseVersion database
.storeNonCanonicalBlocks(true)
.eth1DepositContract(config.getEth1DepositContractAddress())
.dataStorageCreateDbVersion(databaseVersion)
.storeNonCanonicalBlobSidecars(true)
.build());
final Database database = databaseFactory.createDatabase();
if (!(database instanceof KvStoreDatabase)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,19 +238,6 @@ void shouldSetBlobsPruningOptions() {
.isEqualTo(config);
}

@Test
void shouldSetNonCanonicalBlobSidecarsStoringOption() {
final TekuConfiguration config =
getTekuConfigurationFromArguments("--data-storage-non-canonical-blob-sidecars-enabled");
assertThat(config.storageConfiguration().isStoreNonCanonicalBlobSidecarsEnabled()).isTrue();
assertThat(
createConfigBuilder()
.storageConfiguration(b -> b.storeNonCanonicalBlobSidecars(true))
.build())
.usingRecursiveComparison()
.isEqualTo(config);
}

@Test
void shouldNotAllowPruningBlocksAndReconstructingStates() {
assertThatThrownBy(
Expand Down

0 comments on commit f3bf37e

Please sign in to comment.