Skip to content

Commit

Permalink
Late Block reorg refactor (#7911)
Browse files Browse the repository at this point in the history
Signed-off-by: Paul Harris <[email protected]>
  • Loading branch information
rolfyone authored Jan 24, 2024
1 parent a06207e commit 2271309
Show file tree
Hide file tree
Showing 12 changed files with 116 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public interface BlockFactory {

SafeFuture<BlockContainer> createUnsignedBlock(
BeaconState blockSlotState,
UInt64 newSlot,
UInt64 proposalSlot,
BLSSignature randaoReveal,
Optional<Bytes32> optionalGraffiti,
Optional<Boolean> requestedBlinded,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,14 @@ public BlockFactoryDeneb(final Spec spec, final BlockOperationSelectorFactory op
@Override
public SafeFuture<BlockContainer> createUnsignedBlock(
final BeaconState blockSlotState,
final UInt64 newSlot,
final UInt64 proposalSlot,
final BLSSignature randaoReveal,
final Optional<Bytes32> optionalGraffiti,
final Optional<Boolean> requestedBlinded,
final BlockProductionPerformance blockProductionPerformance) {
return super.createUnsignedBlock(
blockSlotState,
newSlot,
proposalSlot,
randaoReveal,
optionalGraffiti,
requestedBlinded,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,25 +45,25 @@ public BlockFactoryPhase0(
@Override
public SafeFuture<BlockContainer> createUnsignedBlock(
final BeaconState blockSlotState,
final UInt64 newSlot,
final UInt64 proposalSlot,
final BLSSignature randaoReveal,
final Optional<Bytes32> optionalGraffiti,
final Optional<Boolean> requestedBlinded,
final BlockProductionPerformance blockProductionPerformance) {
checkArgument(
blockSlotState.getSlot().equals(newSlot),
blockSlotState.getSlot().equals(proposalSlot),
"Block slot state for slot %s but should be for slot %s",
blockSlotState.getSlot(),
newSlot);
proposalSlot);

// Process empty slots up to the one before the new block slot
final UInt64 slotBeforeBlock = newSlot.minus(UInt64.ONE);
final UInt64 slotBeforeBlock = proposalSlot.minus(UInt64.ONE);

final Bytes32 parentRoot = spec.getBlockRootAtSlot(blockSlotState, slotBeforeBlock);

return spec.createNewUnsignedBlock(
newSlot,
spec.getBeaconProposerIndex(blockSlotState, newSlot),
proposalSlot,
spec.getBeaconProposerIndex(blockSlotState, proposalSlot),
blockSlotState,
parentRoot,
operationSelector.createSelector(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,17 +63,17 @@ public MilestoneBasedBlockFactory(
@Override
public SafeFuture<BlockContainer> createUnsignedBlock(
final BeaconState blockSlotState,
final UInt64 newSlot,
final UInt64 proposalSlot,
final BLSSignature randaoReveal,
final Optional<Bytes32> optionalGraffiti,
final Optional<Boolean> requestedBlinded,
final BlockProductionPerformance blockProductionPerformance) {
final SpecMilestone milestone = getMilestone(newSlot);
final SpecMilestone milestone = getMilestone(proposalSlot);
return registeredFactories
.get(milestone)
.createUnsignedBlock(
blockSlotState,
newSlot,
proposalSlot,
randaoReveal,
optionalGraffiti,
requestedBlinded,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static tech.pegasys.teku.infrastructure.async.SafeFutureAssert.safeJoin;
import static tech.pegasys.teku.infrastructure.time.TimeUtilities.secondsToMillis;
import static tech.pegasys.teku.networks.Eth2NetworkConfiguration.DEFAULT_FORK_CHOICE_LATE_BLOCK_REORG_ENABLED;

import com.google.common.collect.ImmutableMap;
import java.io.IOException;
Expand Down Expand Up @@ -127,6 +126,8 @@ spec, new SignedBlockAndState(anchorBlock, anchorState)),
final InlineEventThread eventThread = new InlineEventThread();
final KZG kzg = KzgRetriever.getKzgWithLoadedTrustedSetup(spec, testDefinition.getConfigName());
final StubBlobSidecarManager blobSidecarManager = new StubBlobSidecarManager(kzg);
// forkChoiceLateBlockReorgEnabled is true here always because this is the reference test
// executor
final ForkChoice forkChoice =
new ForkChoice(
spec,
Expand All @@ -137,7 +138,7 @@ spec, new SignedBlockAndState(anchorBlock, anchorState)),
new ForkChoiceStateProvider(eventThread, recentChainData),
new TickProcessor(spec, recentChainData),
transitionBlockValidator,
DEFAULT_FORK_CHOICE_LATE_BLOCK_REORG_ENABLED,
true,
storageSystem.getMetricsSystem());
final ExecutionLayerChannelStub executionLayer =
new ExecutionLayerChannelStub(spec, false, Optional.empty());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,12 @@ SafeFuture<Optional<BeaconState>> retrieveCheckpointState(
Checkpoint checkpoint, BeaconState latestStateAtEpoch);

// implements is_head_weak from fork-choice Consensus Spec
boolean isHeadWeak(BeaconState justifiedState, Bytes32 root);
boolean isHeadWeak(Bytes32 root);

// implements is_parent_strong from fork-choice Consensus Spec
boolean isParentStrong(BeaconState justifiedState, Bytes32 parentRoot);
boolean isParentStrong(Bytes32 parentRoot);

void computeBalanceThresholds(final BeaconState justifiedState);

// implements is_ffg_competitive from Consensus Spec
Optional<Boolean> isFfgCompetitive(final Bytes32 headRoot, final Bytes32 parentRoot);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,15 +247,18 @@ public SafeFuture<Optional<BeaconState>> retrieveCheckpointState(
}

@Override
public boolean isHeadWeak(BeaconState justifiedState, Bytes32 root) {
public boolean isHeadWeak(final Bytes32 root) {
return false;
}

@Override
public boolean isParentStrong(BeaconState justifiedState, Bytes32 parentRoot) {
public boolean isParentStrong(final Bytes32 parentRoot) {
return false;
}

@Override
public void computeBalanceThresholds(BeaconState justifiedState) {}

@Override
public Optional<Boolean> isFfgCompetitive(Bytes32 headRoot, Bytes32 parentRoot) {
return Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,16 @@
import org.apache.logging.log4j.Logger;
import org.apache.tuweni.bytes.Bytes32;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.plugin.services.metrics.Counter;
import org.hyperledger.besu.plugin.services.metrics.LabelledMetric;
import tech.pegasys.teku.ethereum.performance.trackers.BlockProductionPerformance;
import tech.pegasys.teku.infrastructure.async.ExceptionThrowingRunnable;
import tech.pegasys.teku.infrastructure.async.ExceptionThrowingSupplier;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.async.eventthread.EventThread;
import tech.pegasys.teku.infrastructure.exceptions.ExceptionUtil;
import tech.pegasys.teku.infrastructure.exceptions.FatalServiceFailureException;
import tech.pegasys.teku.infrastructure.metrics.TekuMetricCategory;
import tech.pegasys.teku.infrastructure.subscribers.Subscribers;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.spec.Spec;
Expand Down Expand Up @@ -100,6 +103,8 @@ public class ForkChoice implements ForkChoiceUpdatedResultSubscriber {
private final boolean forkChoiceLateBlockReorgEnabled;
private Optional<Boolean> optimisticSyncing = Optional.empty();

private final LabelledMetric<Counter> getProposerHeadSelectedCounter;

public ForkChoice(
final Spec spec,
final EventThread forkChoiceExecutor,
Expand All @@ -123,7 +128,12 @@ public ForkChoice(
this.tickProcessor = tickProcessor;
this.forkChoiceLateBlockReorgEnabled = forkChoiceLateBlockReorgEnabled;
LOG.debug("forkChoiceLateBlockReorgEnabled is set to {}", forkChoiceLateBlockReorgEnabled);

getProposerHeadSelectedCounter =
metricsSystem.createLabelledCounter(
TekuMetricCategory.BEACON,
"get_proposer_head_selection_total",
"when late_block_reorg is enabled, counts based on the proposer parent being based on fork choice, head, or parent of head.",
"selected_source");
recentChainData.subscribeStoreInitialized(this::initializeProtoArrayForkChoice);
forkChoiceNotifier.subscribeToForkChoiceUpdatedResult(this);
}
Expand Down Expand Up @@ -347,6 +357,9 @@ private void updateHeadTransaction(
final BeaconState justifiedState,
final Checkpoint finalizedCheckpoint,
final Checkpoint justifiedCheckpoint) {
if (forkChoiceLateBlockReorgEnabled) {
recentChainData.getStore().computeBalanceThresholds(justifiedState);
}
final VoteUpdater transaction = recentChainData.startVoteUpdate();
final ReadOnlyForkChoiceStrategy forkChoiceStrategy = getForkChoiceStrategy();
final List<UInt64> justifiedEffectiveBalances =
Expand Down Expand Up @@ -748,6 +761,7 @@ private void notifyForkChoiceUpdatedAndOptimisticSyncingChanged(
final ForkChoiceState forkChoiceState = forkChoiceStateProvider.getForkChoiceStateSync();

forkChoiceNotifier.onForkChoiceUpdated(forkChoiceState, proposingSlot);
getProposerHeadSelectedCounter.labels("fork_choice").inc();

if (optimisticSyncing
.map(oldValue -> !oldValue.equals(forkChoiceState.isHeadOptimistic()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.util.NavigableMap;
import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -646,6 +645,7 @@ public List<Bytes32> getAllBlockRootsAtSlot(final UInt64 slot) {

// implements get_proposer_head from Consensus Spec
public Bytes32 getProposerHead(final Bytes32 headRoot, final UInt64 slot) {
LOG.debug("start getProposerHead");
// if proposer boost is still active, don't attempt to override head
final boolean isProposerBoostActive =
store.getProposerBoostRoot().map(root -> !root.equals(headRoot)).orElse(false);
Expand All @@ -669,6 +669,14 @@ public Bytes32 getProposerHead(final Bytes32 headRoot, final UInt64 slot) {
|| !isProposingOnTime
|| isProposerBoostActive
|| maybeHead.isEmpty()) {
LOG.debug(
"getProposerHead - return headRoot - isHeadLate {}, isShufflingStable {}, isFinalizationOk {}, isProposingOnTime {}, isProposerBoostActive {}, head.isEmpty {}",
() -> isHeadLate,
() -> isShufflingStable,
() -> isFinalizationOk,
() -> isProposingOnTime,
() -> isProposerBoostActive,
() -> headRoot.isEmpty());
return headRoot;
}

Expand All @@ -685,31 +693,14 @@ public Bytes32 getProposerHead(final Bytes32 headRoot, final UInt64 slot) {
// from the initial list, check
// isFfgCompetitive, isSingleSlotReorg
if (!isFfgCompetitive || !isSingleSlotReorg) {
LOG.debug(
"getProposerHead - return headRoot - isFfgCompetitive {}, isSingleSlotReorg {}",
isFfgCompetitive,
isSingleSlotReorg);
return headRoot;
}

final SafeFuture<Optional<BeaconState>> future =
store.retrieveCheckpointState(store.getJustifiedCheckpoint());
try {
final Optional<BeaconState> maybeJustifiedState = future.join();
// to make further checks, we would need the justified state, return headRoot if we don't have
// it.
if (maybeJustifiedState.isEmpty()) {
return headRoot;
}
final boolean isHeadWeak = store.isHeadWeak(maybeJustifiedState.get(), headRoot);
final boolean isParentStrong =
store.isParentStrong(maybeJustifiedState.get(), head.getParentRoot());
// finally, the parent must be strong, and the current head must be weak.
if (isHeadWeak && isParentStrong) {
return head.getParentRoot();
}
} catch (Exception exception) {
if (!(exception instanceof CancellationException)) {
LOG.error("Failed to get justified checkpoint", exception);
}
}

LOG.debug("getProposerHead - return headRoot");
return headRoot;
}

Expand All @@ -723,6 +714,11 @@ public boolean shouldOverrideForkChoiceUpdate(final Bytes32 headRoot) {
final boolean isHeadLate = isBlockLate(headRoot);
if (maybeHead.isEmpty() || maybeCurrentSlot.isEmpty() || !isHeadLate) {
// ! isHeadLate, or we don't have data we need (currentSlot and the block in question)
LOG.debug(
"shouldOverrideForkChoiceUpdate head {}, currentSlot {}, isHeadLate {}",
() -> maybeHead.map(SignedBeaconBlock::getRoot),
() -> maybeCurrentSlot,
() -> isHeadLate);
return false;
}
final SignedBeaconBlock head = maybeHead.get();
Expand All @@ -739,6 +735,12 @@ public boolean shouldOverrideForkChoiceUpdate(final Bytes32 headRoot) {

if (!isShufflingStable || !isFfgCompetitive || !isFinalizationOk || maybeParentSlot.isEmpty()) {
// !shufflingStable or !ffgCompetetive or !finalizationOk, or parentSlot is not found
LOG.debug(
"shouldOverrideForkChoiceUpdate isShufflingStable {}, isFfgCompetitive {}, isFinalizationOk {}, maybeParentSlot {}",
isShufflingStable,
isFfgCompetitive,
isFinalizationOk,
maybeParentSlot);
return false;
}

Expand All @@ -751,28 +753,19 @@ public boolean shouldOverrideForkChoiceUpdate(final Bytes32 headRoot) {
final boolean isHeadWeak;
final boolean isParentStrong;
if (currentSlot.isGreaterThan(head.getSlot())) {
try {
final SafeFuture<Optional<BeaconState>> future =
store.retrieveCheckpointState(store.getJustifiedCheckpoint());
final Optional<BeaconState> maybeJustifiedState = future.join();
isHeadWeak =
maybeJustifiedState.map(state -> store.isHeadWeak(state, headRoot)).orElse(true);
isParentStrong =
maybeJustifiedState
.map(beaconState -> store.isParentStrong(beaconState, head.getParentRoot()))
.orElse(true);
} catch (Exception exception) {
if (!(exception instanceof CancellationException)) {
LOG.error("Failed to get justified checkpoint", exception);
}
return false;
}
isHeadWeak = store.isHeadWeak(headRoot);
isParentStrong = store.isParentStrong(head.getParentRoot());
} else {
isHeadWeak = true;
isParentStrong = true;
}
final boolean isSingleSlotReorg = isParentSlotOk && isCurrentTimeOk;
if (!isSingleSlotReorg || !isHeadWeak || !isParentStrong) {
LOG.debug(
"shouldOverrideForkChoiceUpdate isSingleSlotReorg {}, isHeadWeak {}, isParentStrong {}",
isSingleSlotReorg,
isHeadWeak,
isParentStrong);
return false;
}

Expand All @@ -786,6 +779,8 @@ public boolean shouldOverrideForkChoiceUpdate(final Bytes32 headRoot) {
final BeaconState proposerPreState = spec.processSlots(maybeParentState.get(), proposalSlot);
final int proposerIndex = spec.getBeaconProposerIndex(proposerPreState, proposalSlot);
if (!validatorIsConnectedProvider.isValidatorConnected(proposerIndex, proposalSlot)) {
LOG.debug(
"shouldOverrideForkChoiceUpdate isValidatorConnected({}) {}, ", proposerIndex, false);
return false;
}
} catch (SlotProcessingException | EpochProcessingException e) {
Expand Down
Loading

0 comments on commit 2271309

Please sign in to comment.