Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added more utilities for late block reorg #7741

Merged
merged 7 commits into from
Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,9 @@ public SafeFuture<InternalValidationResult> validateAndImportBlock(

final Optional<BlockImportPerformance> blockImportPerformance;

arrivalTimestamp.ifPresent(
arrivalTime -> recentChainData.setBlockTimelinessFromArrivalTime(block, arrivalTime));

if (blockImportMetrics.isPresent()) {
final BlockImportPerformance performance =
new BlockImportPerformance(timeProvider, blockImportMetrics.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ SafeFuture<Optional<ExecutionPayloadContext>> getPayloadId(

void onTerminalBlockReached(Bytes32 executionBlockHash);

boolean validatorIsConnected(UInt64 validatorIndex, UInt64 currentSlot);

long subscribeToForkChoiceUpdatedResult(ForkChoiceUpdatedResultSubscriber subscriber);

boolean unsubscribeFromForkChoiceUpdatedResult(long subscriberId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,11 @@ public void onTerminalBlockReached(Bytes32 executionBlockHash) {
eventThread.execute(() -> internalTerminalBlockReached(executionBlockHash));
}

@Override
public boolean validatorIsConnected(UInt64 validatorIndex, UInt64 currentSlot) {
return proposersDataManager.validatorIsConnected(validatorIndex, currentSlot);
}

@Override
public void onPreparedProposersUpdated() {
eventThread.execute(this::internalUpdatePreparableProposers);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,12 @@ public SafeFuture<Void> updateValidatorRegistrations(
headState, signedValidatorRegistrations, currentSlot));
}

// used in ForkChoice validator_is_connected
public boolean validatorIsConnected(final UInt64 validatorIndex, final UInt64 currentSlot) {
final PreparedProposerInfo info = preparedProposerInfoByValidatorIndex.get(validatorIndex);
return info != null && !info.hasExpired(currentSlot);
}

private void updatePreparedProposerCache(
final Collection<BeaconPreparableProposer> preparedProposers, final UInt64 currentSlot) {
final UInt64 expirySlot =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright Consensys Software Inc., 2023
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package tech.pegasys.teku.statetransition.forkchoice;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;

import java.util.List;
import java.util.Optional;
import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.junit.jupiter.api.Test;
import tech.pegasys.teku.ethereum.execution.types.Eth1Address;
import tech.pegasys.teku.infrastructure.async.eventthread.EventThread;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.spec.Spec;
import tech.pegasys.teku.spec.TestSpecFactory;
import tech.pegasys.teku.spec.datastructures.operations.versions.bellatrix.BeaconPreparableProposer;
import tech.pegasys.teku.spec.executionlayer.ExecutionLayerChannel;
import tech.pegasys.teku.spec.util.DataStructureUtil;
import tech.pegasys.teku.storage.client.RecentChainData;

class ProposersDataManagerTest {
Dismissed Show dismissed Hide dismissed

private final Spec spec = TestSpecFactory.createMinimalCapella();

private final DataStructureUtil dataStructureUtil = new DataStructureUtil(spec);

private final RecentChainData recentChainData = mock(RecentChainData.class);

private final ExecutionLayerChannel channel = ExecutionLayerChannel.NOOP;
private final MetricsSystem metricsSystem = new NoOpMetricsSystem();

private final Eth1Address defaultAddress = dataStructureUtil.randomEth1Address();
private final ProposersDataManager manager =
new ProposersDataManager(
mock(EventThread.class),
spec,
metricsSystem,
channel,
recentChainData,
Optional.of(defaultAddress));

final List<BeaconPreparableProposer> proposers =
List.of(
new BeaconPreparableProposer(UInt64.ONE, dataStructureUtil.randomEth1Address()),
new BeaconPreparableProposer(UInt64.ZERO, defaultAddress));

@Test
void validatorIsConnected_notFound_withEmptyPreparedList() {
assertThat(manager.validatorIsConnected(UInt64.ZERO, UInt64.ZERO)).isFalse();
}

@Test
void validatorIsConnected_found_withPreparedProposer() {
manager.updatePreparedProposers(proposers, UInt64.ONE);
assertThat(manager.validatorIsConnected(UInt64.ONE, UInt64.valueOf(1))).isTrue();
}

@Test
void validatorIsConnected_notFound_withDifferentPreparedProposer() {
manager.updatePreparedProposers(proposers, UInt64.ONE);
assertThat(manager.validatorIsConnected(UInt64.valueOf(2), UInt64.valueOf(2))).isFalse();
}

@Test
void validatorIsConnected_notFound_withExpiredPreparedProposer() {
manager.updatePreparedProposers(proposers, UInt64.ONE);
assertThat(manager.validatorIsConnected(UInt64.ONE, UInt64.valueOf(26))).isFalse();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,9 @@ public SafeFuture<Optional<ExecutionPayloadContext>> getPayloadId(

@Override
public void onTerminalBlockReached(final Bytes32 executionBlockHash) {}

@Override
public boolean validatorIsConnected(UInt64 validatorIndex, UInt64 currentSlot) {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,7 @@ protected SafeFuture<?> initialize() {
metricsSystem,
storeConfig,
beaconAsyncRunner,
timeProvider,
storageQueryChannel,
storageUpdateChannel,
voteUpdateChannel,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright Consensys Software Inc., 2023
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package tech.pegasys.teku.storage.client;

import static tech.pegasys.teku.spec.constants.NetworkConstants.INTERVALS_PER_SLOT;

import java.util.Map;
import java.util.Optional;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.tuweni.bytes.Bytes32;
import tech.pegasys.teku.infrastructure.collections.LimitedMap;
import tech.pegasys.teku.infrastructure.time.TimeProvider;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.spec.Spec;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;

public class BlockTimelinessTracker {
private static final Logger LOG = LogManager.getLogger();
private final Map<Bytes32, Boolean> blockTimeliness;
private final TimeProvider timeProvider;
private final Spec spec;
private final RecentChainData recentChainData;

// implements is_timely from Consensus Spec
public BlockTimelinessTracker(
final Spec spec, final RecentChainData recentChainData, final TimeProvider timeProvider) {
this.spec = spec;
final int epochsForTimeliness =
Math.max(spec.getGenesisSpecConfig().getReorgMaxEpochsSinceFinalization(), 3);
this.blockTimeliness =
LimitedMap.createSynchronizedNatural(
spec.getGenesisSpec().getSlotsPerEpoch() * epochsForTimeliness);
this.timeProvider = timeProvider;
this.recentChainData = recentChainData;
}

public void setBlockTimelinessFromArrivalTime(
final SignedBeaconBlock block, final UInt64 arrivalTimeMillis) {
final UInt64 genesisTime = recentChainData.getGenesisTime();
final UInt64 computedSlot = spec.getCurrentSlot(timeProvider.getTimeInSeconds(), genesisTime);
if (computedSlot.isGreaterThan(block.getMessage().getSlot())) {
LOG.debug(
"Block {}:{} is before computed slot {}, and won't be checked for timeliness",
block.getRoot(),
block.getSlot(),
computedSlot);
return;
}
recentChainData
.getCurrentSlot()
.ifPresent(
slot -> {
final UInt64 slotStartTimeMillis =
spec.getSlotStartTimeMillis(slot, genesisTime.times(1000));
final int millisIntoSlot =
arrivalTimeMillis.minusMinZero(slotStartTimeMillis).intValue();

final UInt64 timelinessLimit =
spec.getMillisPerSlot(slot).dividedBy(INTERVALS_PER_SLOT);

final boolean isTimely =
block.getMessage().getSlot().equals(slot)
&& timelinessLimit.isGreaterThan(millisIntoSlot);
LOG.debug(
"Block {}:{} arrived at {} ms into slot {}, timeliness limit is {} ms. result: {}",
block.getRoot(),
block.getSlot(),
millisIntoSlot,
computedSlot,
timelinessLimit,
isTimely);
blockTimeliness.put(block.getRoot(), isTimely);
});
}

public Optional<Boolean> isBlockTimely(final Bytes32 root) {
return Optional.ofNullable(blockTimeliness.get(root));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.bytes.Bytes4;
import tech.pegasys.teku.infrastructure.metrics.TekuMetricCategory;
import tech.pegasys.teku.infrastructure.time.TimeProvider;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.spec.Spec;
import tech.pegasys.teku.spec.SpecMilestone;
Expand Down Expand Up @@ -98,10 +99,13 @@ public abstract class RecentChainData implements StoreUpdateHandler {
private volatile Optional<ChainHead> chainHead = Optional.empty();
private volatile UInt64 genesisTime;

private final BlockTimelinessTracker blockTimelinessTracker;

RecentChainData(
final AsyncRunner asyncRunner,
final MetricsSystem metricsSystem,
final StoreConfig storeConfig,
final TimeProvider timeProvider,
final BlockProvider blockProvider,
final StateAndBlockSummaryProvider stateProvider,
final EarliestBlobSidecarSlotProvider earliestBlobSidecarSlotProvider,
Expand All @@ -120,6 +124,7 @@ public abstract class RecentChainData implements StoreUpdateHandler {
this.chainHeadChannel = chainHeadChannel;
this.storageUpdateChannel = storageUpdateChannel;
this.finalizedCheckpointChannel = finalizedCheckpointChannel;
this.blockTimelinessTracker = new BlockTimelinessTracker(spec, this, timeProvider);
reorgCounter =
metricsSystem.createCounter(
TekuMetricCategory.BEACON,
Expand Down Expand Up @@ -613,4 +618,13 @@ public List<Bytes32> getAllBlockRootsAtSlot(final UInt64 slot) {
.map(forkChoiceStrategy -> forkChoiceStrategy.getBlockRootsAtSlot(slot))
.orElse(Collections.emptyList());
}

public void setBlockTimelinessFromArrivalTime(
final SignedBeaconBlock block, final UInt64 arrivalTime) {
blockTimelinessTracker.setBlockTimelinessFromArrivalTime(block, arrivalTime);
}

public Optional<Boolean> getBlockTimeliness(final Bytes32 blockRoot) {
return blockTimelinessTracker.isBlockTimely(blockRoot);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import tech.pegasys.teku.dataproviders.lookup.StateAndBlockSummaryProvider;
import tech.pegasys.teku.infrastructure.async.AsyncRunner;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.time.TimeProvider;
import tech.pegasys.teku.spec.Spec;
import tech.pegasys.teku.spec.config.Constants;
import tech.pegasys.teku.storage.api.ChainHeadChannel;
Expand All @@ -49,6 +50,7 @@ public StorageBackedRecentChainData(
final AsyncRunner asyncRunner,
final MetricsSystem metricsSystem,
final StoreConfig storeConfig,
final TimeProvider timeProvider,
final StorageQueryChannel storageQueryChannel,
final StorageUpdateChannel storageUpdateChannel,
final VoteUpdateChannel voteUpdateChannel,
Expand All @@ -59,6 +61,7 @@ public StorageBackedRecentChainData(
asyncRunner,
metricsSystem,
storeConfig,
timeProvider,
storageQueryChannel::getHotBlocksByRoot,
storageQueryChannel::getHotStateAndBlockSummaryByBlockRoot,
storageQueryChannel::getEarliestAvailableBlobSidecarSlot,
Expand All @@ -77,6 +80,7 @@ public static SafeFuture<RecentChainData> create(
final MetricsSystem metricsSystem,
final StoreConfig storeConfig,
final AsyncRunner asyncRunner,
final TimeProvider timeProvider,
final StorageQueryChannel storageQueryChannel,
final StorageUpdateChannel storageUpdateChannel,
final VoteUpdateChannel voteUpdateChannel,
Expand All @@ -88,6 +92,7 @@ public static SafeFuture<RecentChainData> create(
asyncRunner,
metricsSystem,
storeConfig,
timeProvider,
storageQueryChannel,
storageUpdateChannel,
voteUpdateChannel,
Expand All @@ -103,6 +108,7 @@ public static RecentChainData createImmediately(
final AsyncRunner asyncRunner,
final MetricsSystem metricsSystem,
final StoreConfig storeConfig,
final TimeProvider timeProvider,
final StorageQueryChannel storageQueryChannel,
final StorageUpdateChannel storageUpdateChannel,
final VoteUpdateChannel voteUpdateChannel,
Expand All @@ -114,6 +120,7 @@ public static RecentChainData createImmediately(
asyncRunner,
metricsSystem,
storeConfig,
timeProvider,
storageQueryChannel,
storageUpdateChannel,
voteUpdateChannel,
Expand Down
Loading
Loading