Skip to content

Commit

Permalink
Single-pass epoch processing (sigp#4483, sigp#4573)
Browse files Browse the repository at this point in the history
Co-authored-by: Michael Sproul <[email protected]>
  • Loading branch information
dapplion and michaelsproul committed Feb 7, 2024
1 parent 8530427 commit 128bc22
Show file tree
Hide file tree
Showing 78 changed files with 2,623 additions and 1,104 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

55 changes: 34 additions & 21 deletions beacon_node/beacon_chain/src/attestation_rewards.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use safe_arith::SafeArith;
use serde_utils::quoted_u64::Quoted;
use slog::debug;
use state_processing::per_epoch_processing::altair::{
process_inactivity_updates, process_justification_and_finalization,
process_inactivity_updates_slow, process_justification_and_finalization,
};
use state_processing::{
common::altair::BaseRewardPerIncrement,
Expand Down Expand Up @@ -134,10 +134,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let spec = &self.spec;

// Calculate ideal_rewards
let participation_cache = ParticipationCache::new(&state, spec)?;
process_justification_and_finalization(&state, &participation_cache)?
.apply_changes_to_state(&mut state);
process_inactivity_updates(&mut state, &participation_cache, spec)?;
let participation_cache = ParticipationCache::new(&state, spec)
.map_err(|_| BeaconChainError::AttestationRewardsError)?;
process_justification_and_finalization(&state)?.apply_changes_to_state(&mut state);
process_inactivity_updates_slow(&mut state, spec)?;

let previous_epoch = state.previous_epoch();

Expand All @@ -147,13 +147,9 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let weight = get_flag_weight(flag_index)
.map_err(|_| BeaconChainError::AttestationRewardsError)?;

let unslashed_participating_indices = participation_cache
.get_unslashed_participating_indices(flag_index, previous_epoch)?;

let unslashed_participating_balance =
unslashed_participating_indices
.total_balance()
.map_err(|_| BeaconChainError::AttestationRewardsError)?;
let unslashed_participating_balance = participation_cache
.previous_epoch_flag_attesting_balance(flag_index)
.map_err(|_| BeaconChainError::AttestationRewardsError)?;

let unslashed_participating_increments =
unslashed_participating_balance.safe_div(spec.effective_balance_increment)?;
Expand Down Expand Up @@ -199,24 +195,41 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
Self::validators_ids_to_indices(&mut state, validators)?
};

for validator_index in &validators {
let eligible = state.is_eligible_validator(previous_epoch, *validator_index)?;
for &validator_index in &validators {
// Return 0s for unknown/inactive validator indices. This is a bit different from stable
// where we error for unknown pubkeys.
let Ok(validator) = participation_cache.get_validator(validator_index) else {
debug!(
self.log,
"No rewards for inactive/unknown validator";
"index" => validator_index,
"epoch" => previous_epoch
);
total_rewards.push(TotalAttestationRewards {
validator_index: validator_index as u64,
head: 0,
target: 0,
source: 0,
inclusion_delay: None,
inactivity: 0,
});
continue;
};
let eligible = validator.is_eligible;
let mut head_reward = 0i64;
let mut target_reward = 0i64;
let mut source_reward = 0i64;
let mut inactivity_penalty = 0i64;

if eligible {
let effective_balance = state.get_effective_balance(*validator_index)?;
let effective_balance = validator.effective_balance;

for flag_index in 0..PARTICIPATION_FLAG_WEIGHTS.len() {
let (ideal_reward, penalty) = ideal_rewards_hashmap
.get(&(flag_index, effective_balance))
.ok_or(BeaconChainError::AttestationRewardsError)?;
let voted_correctly = participation_cache
.get_unslashed_participating_indices(flag_index, previous_epoch)
.map_err(|_| BeaconChainError::AttestationRewardsError)?
.contains(*validator_index)
let voted_correctly = validator
.is_unslashed_participating_index(flag_index)
.map_err(|_| BeaconChainError::AttestationRewardsError)?;
if voted_correctly {
if flag_index == TIMELY_HEAD_FLAG_INDEX {
Expand All @@ -232,7 +245,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
target_reward = *penalty;

let penalty_numerator = effective_balance
.safe_mul(state.get_inactivity_score(*validator_index)?)?;
.safe_mul(state.get_inactivity_score(validator_index)?)?;
let penalty_denominator = spec
.inactivity_score_bias
.safe_mul(spec.inactivity_penalty_quotient_for_state(&state))?;
Expand All @@ -244,7 +257,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
}
total_rewards.push(TotalAttestationRewards {
validator_index: *validator_index as u64,
validator_index: validator_index as u64,
head: head_reward,
target: target_reward,
source: source_reward,
Expand Down
21 changes: 5 additions & 16 deletions beacon_node/beacon_chain/src/beacon_block_reward.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@ use operation_pool::RewardCache;
use safe_arith::SafeArith;
use slog::error;
use state_processing::{
common::{
altair, get_attestation_participation_flag_indices, get_attesting_indices_from_state,
},
common::{get_attestation_participation_flag_indices, get_attesting_indices_from_state},
epoch_cache::initialize_epoch_cache,
per_block_processing::{
altair::sync_committee::compute_sync_aggregate_rewards, get_slashable_indices,
},
Expand All @@ -32,6 +31,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {

state.build_committee_cache(RelativeEpoch::Previous, &self.spec)?;
state.build_committee_cache(RelativeEpoch::Current, &self.spec)?;
initialize_epoch_cache(state, &self.spec)?;

self.compute_beacon_block_reward_with_cache(block, block_root, state)
}
Expand Down Expand Up @@ -191,10 +191,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
block: BeaconBlockRef<'_, T::EthSpec, Payload>,
state: &BeaconState<T::EthSpec>,
) -> Result<BeaconBlockSubRewardValue, BeaconChainError> {
let total_active_balance = state.get_total_active_balance()?;
let base_reward_per_increment =
altair::BaseRewardPerIncrement::new(total_active_balance, &self.spec)?;

let mut total_proposer_reward = 0;

let proposer_reward_denominator = WEIGHT_DENOMINATOR
Expand Down Expand Up @@ -235,15 +231,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
&& !validator_participation.has_flag(flag_index)?
{
validator_participation.add_flag(flag_index)?;
proposer_reward_numerator.safe_add_assign(
altair::get_base_reward(
state,
index,
base_reward_per_increment,
&self.spec,
)?
.safe_mul(weight)?,
)?;
proposer_reward_numerator
.safe_add_assign(state.get_base_reward(index)?.safe_mul(weight)?)?;
}
}
}
Expand Down
1 change: 1 addition & 0 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4967,6 +4967,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let attestation_packing_timer =
metrics::start_timer(&metrics::BLOCK_PRODUCTION_ATTESTATION_TIMES);

state.build_total_active_balance_cache_at(state.current_epoch(), &self.spec)?;
let mut prev_filter_cache = HashMap::new();
let prev_attestation_filter = |att: &AttestationRef<T::EthSpec>| {
self.filter_op_pool_attestation(&mut prev_filter_cache, att, &state)
Expand Down
2 changes: 2 additions & 0 deletions beacon_node/beacon_chain/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ pub enum BeaconChainError {
SlotClockDidNotStart,
NoStateForSlot(Slot),
BeaconStateError(BeaconStateError),
EpochCacheError(EpochCacheError),
DBInconsistent(String),
DBError(store::Error),
ForkChoiceError(ForkChoiceError),
Expand Down Expand Up @@ -250,6 +251,7 @@ easy_from_to!(StateAdvanceError, BeaconChainError);
easy_from_to!(BlockReplayError, BeaconChainError);
easy_from_to!(InconsistentFork, BeaconChainError);
easy_from_to!(AvailabilityCheckError, BeaconChainError);
easy_from_to!(EpochCacheError, BeaconChainError);

#[derive(Debug)]
pub enum BlockProductionError {
Expand Down
8 changes: 2 additions & 6 deletions beacon_node/beacon_chain/tests/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@ use beacon_chain::{
};
use lazy_static::lazy_static;
use operation_pool::PersistedOperationPool;
use state_processing::{
per_slot_processing, per_slot_processing::Error as SlotProcessingError, EpochProcessingError,
};
use state_processing::{per_slot_processing, per_slot_processing::Error as SlotProcessingError};
use types::{
BeaconState, BeaconStateError, EthSpec, Hash256, Keypair, MinimalEthSpec, RelativeEpoch, Slot,
};
Expand Down Expand Up @@ -59,9 +57,7 @@ fn massive_skips() {
assert!(state.slot() > 1, "the state should skip at least one slot");
assert_eq!(
error,
SlotProcessingError::EpochProcessingError(EpochProcessingError::BeaconStateError(
BeaconStateError::InsufficientValidators
)),
SlotProcessingError::BeaconStateError(BeaconStateError::InsufficientValidators),
"should return error indicating that validators have been slashed out"
)
}
Expand Down
10 changes: 6 additions & 4 deletions beacon_node/genesis/src/interop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,13 +178,14 @@ mod test {
}

for v in state.validators() {
let creds = v.withdrawal_credentials.as_bytes();
let creds = v.withdrawal_credentials;
assert_eq!(
creds[0], spec.bls_withdrawal_prefix_byte,
creds.as_bytes()[0],
spec.bls_withdrawal_prefix_byte,
"first byte of withdrawal creds should be bls prefix"
);
assert_eq!(
&creds[1..],
&creds.as_bytes()[1..],
&hash(&v.pubkey.as_ssz_bytes())[1..],
"rest of withdrawal creds should be pubkey hash"
)
Expand Down Expand Up @@ -240,7 +241,8 @@ mod test {
}

for (index, v) in state.validators().iter().enumerate() {
let creds = v.withdrawal_credentials.as_bytes();
let withdrawal_credientials = v.withdrawal_credentials;
let creds = withdrawal_credientials.as_bytes();
if index % 2 == 0 {
assert_eq!(
creds[0], spec.bls_withdrawal_prefix_byte,
Expand Down
9 changes: 3 additions & 6 deletions beacon_node/http_api/src/validator_inclusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,8 @@ use eth2::{
lighthouse::{GlobalValidatorInclusionData, ValidatorInclusionData},
types::ValidatorId,
};
use state_processing::per_epoch_processing::{
altair::participation_cache::Error as ParticipationCacheError, process_epoch,
EpochProcessingSummary,
};
use types::{BeaconState, ChainSpec, Epoch, EthSpec};
use state_processing::per_epoch_processing::{process_epoch, EpochProcessingSummary};
use types::{BeaconState, BeaconStateError, ChainSpec, Epoch, EthSpec};

/// Returns the state in the last slot of `epoch`.
fn end_of_epoch_state<T: BeaconChainTypes>(
Expand All @@ -35,7 +32,7 @@ fn get_epoch_processing_summary<T: EthSpec>(
.map_err(|e| warp_utils::reject::custom_server_error(format!("{:?}", e)))
}

fn convert_cache_error(error: ParticipationCacheError) -> warp::reject::Rejection {
fn convert_cache_error(error: BeaconStateError) -> warp::reject::Rejection {
warp_utils::reject::custom_server_error(format!("{:?}", error))
}

Expand Down
4 changes: 2 additions & 2 deletions beacon_node/http_api/src/validators.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub fn get_beacon_state_validators<T: BeaconChainTypes>(
.filter(|(index, (validator, _))| {
query_ids.as_ref().map_or(true, |ids| {
ids.iter().any(|id| match id {
ValidatorId::PublicKey(pubkey) => &validator.pubkey == pubkey,
ValidatorId::PublicKey(pubkey) => validator.pubkey == *pubkey,
ValidatorId::Index(param_index) => {
*param_index == *index as u64
}
Expand Down Expand Up @@ -93,7 +93,7 @@ pub fn get_beacon_state_validator_balances<T: BeaconChainTypes>(
.filter(|(index, (validator, _))| {
optional_ids.map_or(true, |ids| {
ids.iter().any(|id| match id {
ValidatorId::PublicKey(pubkey) => &validator.pubkey == pubkey,
ValidatorId::PublicKey(pubkey) => validator.pubkey == *pubkey,
ValidatorId::Index(param_index) => {
*param_index == *index as u64
}
Expand Down
10 changes: 5 additions & 5 deletions beacon_node/http_api/tests/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -820,7 +820,7 @@ impl ApiTester {
ValidatorId::PublicKey(
validators
.get(i as usize)
.map_or(PublicKeyBytes::empty(), |val| val.pubkey.clone()),
.map_or(PublicKeyBytes::empty(), |val| *val.pubkey),
)
})
.collect::<Vec<ValidatorId>>();
Expand Down Expand Up @@ -863,7 +863,7 @@ impl ApiTester {
if i < state.balances().len() as u64 {
validators.push(ValidatorBalanceData {
index: i as u64,
balance: state.balances()[i as usize],
balance: *state.balances().get(i as usize).unwrap(),
});
}
}
Expand Down Expand Up @@ -905,7 +905,7 @@ impl ApiTester {
ValidatorId::PublicKey(
validators
.get(i as usize)
.map_or(PublicKeyBytes::empty(), |val| val.pubkey.clone()),
.map_or(PublicKeyBytes::empty(), |val| *val.pubkey),
)
})
.collect::<Vec<ValidatorId>>();
Expand Down Expand Up @@ -999,7 +999,7 @@ impl ApiTester {

for (i, validator) in validators.into_iter().enumerate() {
let validator_ids = &[
ValidatorId::PublicKey(validator.pubkey.clone()),
ValidatorId::PublicKey(*validator.pubkey),
ValidatorId::Index(i as u64),
];

Expand Down Expand Up @@ -2463,7 +2463,7 @@ impl ApiTester {
let index = state
.get_beacon_proposer_index(slot, &self.chain.spec)
.unwrap();
let pubkey = state.validators()[index].pubkey.clone().into();
let pubkey = *state.validators().get(index).unwrap().pubkey;

ProposerData {
pubkey,
Expand Down
20 changes: 11 additions & 9 deletions beacon_node/operation_pool/src/attestation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,18 +47,17 @@ impl<'a, T: EthSpec> AttMaxCover<'a, T> {
.get_beacon_committee(att.data.slot, att.data.index)
.ok()?;
let indices = get_attesting_indices::<T>(committee.committee, &fresh_validators).ok()?;
let sqrt_total_active_balance = base::SqrtTotalActiveBalance::new(total_active_balance);
let fresh_validators_rewards: HashMap<u64, u64> = indices
.iter()
.copied()
.flat_map(|validator_index| {
let reward = base::get_base_reward(
state,
validator_index as usize,
total_active_balance,
spec,
)
.ok()?
.checked_div(spec.proposer_reward_quotient)?;
let effective_balance =
state.get_effective_balance(validator_index as usize).ok()?;
let reward =
base::get_base_reward(effective_balance, sqrt_total_active_balance, spec)
.ok()?
.checked_div(spec.proposer_reward_quotient)?;
Some((validator_index, reward))
})
.collect();
Expand Down Expand Up @@ -99,8 +98,11 @@ impl<'a, T: EthSpec> AttMaxCover<'a, T> {

let mut proposer_reward_numerator = 0;

// FIXME(sproul): store base_reward in reward cache
// let effective_balance = reward_cache.get_effective_balance(index)?;
let effective_balance = state.get_effective_balance(index as usize).ok()?;
let base_reward =
altair::get_base_reward(state, index as usize, base_reward_per_increment, spec)
altair::get_base_reward(effective_balance, base_reward_per_increment, spec)
.ok()?;

for (flag_index, weight) in PARTICIPATION_FLAG_WEIGHTS.iter().enumerate() {
Expand Down
2 changes: 2 additions & 0 deletions beacon_node/store/src/partial_beacon_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,8 @@ macro_rules! impl_try_into_beacon_state {
committee_caches: <_>::default(),
pubkey_cache: <_>::default(),
exit_cache: <_>::default(),
slashings_cache: <_>::default(),
epoch_cache: <_>::default(),
tree_hash_cache: <_>::default(),

// Variant-specific fields
Expand Down
Loading

0 comments on commit 128bc22

Please sign in to comment.