diff --git a/cmd/lotus-chainwatch/processor/power.go b/cmd/lotus-chainwatch/processor/power.go index daf17a88482..6fa03e9430e 100644 --- a/cmd/lotus-chainwatch/processor/power.go +++ b/cmd/lotus-chainwatch/processor/power.go @@ -7,6 +7,7 @@ import ( "golang.org/x/xerrors" + "github.com/filecoin-project/specs-actors/actors/abi/big" "github.com/filecoin-project/specs-actors/actors/builtin" "github.com/filecoin-project/specs-actors/actors/builtin/power" "github.com/filecoin-project/specs-actors/actors/util/smoothing" @@ -15,7 +16,19 @@ import ( type powerActorInfo struct { common actorInfo - epochSmoothingEstimate *smoothing.FilterEstimate + totalRawBytes big.Int + totalRawBytesCommitted big.Int + totalQualityAdjustedBytes big.Int + totalQualityAdjustedBytesCommitted big.Int + totalPledgeCollateral big.Int + + newRawBytes big.Int + newQualityAdjustedBytes big.Int + newPledgeCollateral big.Int + newQAPowerSmoothed *smoothing.FilterEstimate + + minerCount int64 + minerCountAboveMinimumPower int64 } func (p *Processor) setupPower() error { @@ -25,13 +38,27 @@ func (p *Processor) setupPower() error { } if _, err := tx.Exec(` -create table if not exists power_smoothing_estimates +create table if not exists chain_power ( - state_root text not null - constraint power_smoothing_estimates_pk - primary key, - position_estimate text not null, - velocity_estimate text not null + state_root text not null + constraint power_smoothing_estimates_pk + primary key, + + new_raw_bytes_power text not null, + new_qa_bytes_power text not null, + new_pledge_collateral text not null, + + total_raw_bytes_power text not null, + total_raw_bytes_committed text not null, + total_qa_bytes_power text not null, + total_qa_bytes_committed text not null, + total_pledge_collateral text not null, + + qa_smoothed_position_estimate text not null, + qa_smoothed_velocity_estimate text not null, + + miner_count int not null, + minimum_consensus_miner_count int not null ); `); err != nil { return err @@ -60,8 +87,8 @@ func (p *Processor) processPowerActors(ctx context.Context, powerTips ActorTips) }() var out []powerActorInfo - for tipset, powers := range powerTips { - for _, act := range powers { + for tipset, powerStates := range powerTips { + for _, act := range powerStates { var pw powerActorInfo pw.common = act @@ -80,7 +107,19 @@ func (p *Processor) processPowerActors(ctx context.Context, powerTips ActorTips) return nil, xerrors.Errorf("unmarshal state (@ %s): %w", pw.common.stateroot.String(), err) } - pw.epochSmoothingEstimate = powerActorState.ThisEpochQAPowerSmoothed + pw.totalRawBytes = powerActorState.TotalRawBytePower + pw.totalRawBytesCommitted = powerActorState.TotalBytesCommitted + pw.totalQualityAdjustedBytes = powerActorState.TotalQualityAdjPower + pw.totalQualityAdjustedBytesCommitted = powerActorState.TotalQABytesCommitted + pw.totalPledgeCollateral = powerActorState.TotalPledgeCollateral + + pw.newRawBytes = powerActorState.ThisEpochRawBytePower + pw.newQualityAdjustedBytes = powerActorState.ThisEpochQualityAdjPower + pw.newPledgeCollateral = powerActorState.ThisEpochPledgeCollateral + pw.newQAPowerSmoothed = powerActorState.ThisEpochQAPowerSmoothed + + pw.minerCount = powerActorState.MinerCount + pw.minerCountAboveMinimumPower = powerActorState.MinerAboveMinPowerCount out = append(out, pw) } } @@ -88,46 +127,59 @@ func (p *Processor) processPowerActors(ctx context.Context, powerTips ActorTips) return out, nil } -func (p *Processor) persistPowerActors(ctx context.Context, powers []powerActorInfo) error { +func (p *Processor) persistPowerActors(ctx context.Context, powerStates []powerActorInfo) error { // NB: use errgroup when there is more than a single store operation - return p.storePowerSmoothingEstimates(powers) + return p.storePowerSmoothingEstimates(powerStates) } -func (p *Processor) storePowerSmoothingEstimates(powers []powerActorInfo) error { +func (p *Processor) storePowerSmoothingEstimates(powerStates []powerActorInfo) error { tx, err := p.db.Begin() if err != nil { - return xerrors.Errorf("begin power_smoothing_estimates tx: %w", err) + return xerrors.Errorf("begin chain_power tx: %w", err) } - if _, err := tx.Exec(`create temp table rse (like power_smoothing_estimates) on commit drop`); err != nil { - return xerrors.Errorf("prep power_smoothing_estimates: %w", err) + if _, err := tx.Exec(`create temp table cp (like chain_power) on commit drop`); err != nil { + return xerrors.Errorf("prep chain_power: %w", err) } - stmt, err := tx.Prepare(`copy rse (state_root, position_estimate, velocity_estimate) from stdin;`) + stmt, err := tx.Prepare(`copy cp (state_root, new_raw_bytes_power, new_qa_bytes_power, new_pledge_collateral, total_raw_bytes_power, total_raw_bytes_committed, total_qa_bytes_power, total_qa_bytes_committed, total_pledge_collateral, qa_smoothed_position_estimate, qa_smoothed_velocity_estimate, miner_count, minimum_consensus_miner_count) from stdin;`) if err != nil { - return xerrors.Errorf("prepare tmp power_smoothing_estimates: %w", err) + return xerrors.Errorf("prepare tmp chain_power: %w", err) } - for _, powerState := range powers { + for _, ps := range powerStates { if _, err := stmt.Exec( - powerState.common.stateroot.String(), - powerState.epochSmoothingEstimate.PositionEstimate.String(), - powerState.epochSmoothingEstimate.VelocityEstimate.String(), + ps.common.stateroot.String(), + ps.newRawBytes.String(), + ps.newQualityAdjustedBytes.String(), + ps.newPledgeCollateral.String(), + + ps.totalRawBytes.String(), + ps.totalRawBytesCommitted.String(), + ps.totalQualityAdjustedBytes.String(), + ps.totalQualityAdjustedBytesCommitted.String(), + ps.totalPledgeCollateral.String(), + + ps.newQAPowerSmoothed.PositionEstimate.String(), + ps.newQAPowerSmoothed.VelocityEstimate.String(), + + ps.minerCount, + ps.minerCountAboveMinimumPower, ); err != nil { return xerrors.Errorf("failed to store smoothing estimate: %w", err) } } if err := stmt.Close(); err != nil { - return xerrors.Errorf("close prepared power_smoothing_estimates: %w", err) + return xerrors.Errorf("close prepared chain_power: %w", err) } - if _, err := tx.Exec(`insert into power_smoothing_estimates select * from rse on conflict do nothing`); err != nil { - return xerrors.Errorf("insert power_smoothing_estimates from tmp: %w", err) + if _, err := tx.Exec(`insert into chain_power select * from cp on conflict do nothing`); err != nil { + return xerrors.Errorf("insert chain_power from tmp: %w", err) } if err := tx.Commit(); err != nil { - return xerrors.Errorf("commit power_smoothing_estimates tx: %w", err) + return xerrors.Errorf("commit chain_power tx: %w", err) } return nil diff --git a/cmd/lotus-chainwatch/processor/reward.go b/cmd/lotus-chainwatch/processor/reward.go index 230b3c6c136..7068c1a93c6 100644 --- a/cmd/lotus-chainwatch/processor/reward.go +++ b/cmd/lotus-chainwatch/processor/reward.go @@ -5,7 +5,6 @@ import ( "context" "time" - "golang.org/x/sync/errgroup" "golang.org/x/xerrors" "github.com/filecoin-project/specs-actors/actors/abi/big" @@ -13,20 +12,23 @@ import ( "github.com/filecoin-project/specs-actors/actors/builtin/reward" "github.com/filecoin-project/specs-actors/actors/util/smoothing" - "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/types" ) type rewardActorInfo struct { common actorInfo - // expected power in bytes during this epoch - baselinePower big.Int + cumSumBaselinePower big.Int + cumSumRealizedPower big.Int - // base reward in attofil for each block found during this epoch - baseBlockReward big.Int + effectiveNetworkTime int64 + effectiveBaselinePower big.Int - epochSmoothingEstimate *smoothing.FilterEstimate + newBaselinePower big.Int + newBaseReward big.Int + newSmoothingEstimate *smoothing.FilterEstimate + + totalMinedReward big.Int } func (p *Processor) setupRewards() error { @@ -36,34 +38,23 @@ func (p *Processor) setupRewards() error { } if _, err := tx.Exec(` -/* -* captures base block reward per miner per state root and does not -* include penalties or gas reward -*/ -create table if not exists base_block_rewards -( - state_root text not null - constraint block_rewards_pk - primary key, - base_block_reward numeric not null -); - /* captures chain-specific power state for any given stateroot */ -create table if not exists chain_power +create table if not exists chain_reward ( state_root text not null - constraint chain_power_pk + constraint chain_reward_pk primary key, - baseline_power text not null -); + cum_sum_baseline text not null, + cum_sum_realized text not null, + effective_network_time int not null, + effective_baseline_power text not null, -create table if not exists reward_smoothing_estimates -( - state_root text not null - constraint reward_smoothing_estimates_pk - primary key, - position_estimate text not null, - velocity_estimate text not null + new_baseline_power text not null, + new_reward numeric not null, + new_reward_smoothed_position_estimate text not null, + new_reward_smoothed_velocity_estimate text not null, + + total_mined_reward text not null ); `); err != nil { return err @@ -113,9 +104,14 @@ func (p *Processor) processRewardActors(ctx context.Context, rewardTips ActorTip return nil, xerrors.Errorf("unmarshal state (@ %s): %w", rw.common.stateroot.String(), err) } - rw.baseBlockReward = rewardActorState.ThisEpochReward - rw.baselinePower = rewardActorState.ThisEpochBaselinePower - rw.epochSmoothingEstimate = rewardActorState.ThisEpochRewardSmoothed + rw.cumSumBaselinePower = rewardActorState.CumsumBaseline + rw.cumSumRealizedPower = rewardActorState.CumsumRealized + rw.effectiveNetworkTime = int64(rewardActorState.EffectiveNetworkTime) + rw.effectiveBaselinePower = rewardActorState.EffectiveBaselinePower + rw.newBaselinePower = rewardActorState.ThisEpochBaselinePower + rw.newBaseReward = rewardActorState.ThisEpochReward + rw.newSmoothingEstimate = rewardActorState.ThisEpochRewardSmoothed + rw.totalMinedReward = rewardActorState.TotalMined out = append(out, rw) } } @@ -145,8 +141,14 @@ func (p *Processor) processRewardActors(ctx context.Context, rewardTips ActorTip return nil, err } - rw.baseBlockReward = rewardActorState.ThisEpochReward - rw.baselinePower = rewardActorState.ThisEpochBaselinePower + rw.cumSumBaselinePower = rewardActorState.CumsumBaseline + rw.cumSumRealizedPower = rewardActorState.CumsumRealized + rw.effectiveNetworkTime = int64(rewardActorState.EffectiveNetworkTime) + rw.effectiveBaselinePower = rewardActorState.EffectiveBaselinePower + rw.newBaselinePower = rewardActorState.ThisEpochBaselinePower + rw.newBaseReward = rewardActorState.ThisEpochReward + rw.newSmoothingEstimate = rewardActorState.ThisEpochRewardSmoothed + rw.totalMinedReward = rewardActorState.TotalMined out = append(out, rw) } @@ -159,149 +161,47 @@ func (p *Processor) persistRewardActors(ctx context.Context, rewards []rewardAct log.Debugw("Persisted Reward Actors", "duration", time.Since(start).String()) }() - grp, ctx := errgroup.WithContext(ctx) //nolint - - grp.Go(func() error { - if err := p.storeChainPower(rewards); err != nil { - return err - } - return nil - }) - - grp.Go(func() error { - if err := p.storeBaseBlockReward(rewards); err != nil { - return err - } - return nil - }) - - grp.Go(func() error { - if err := p.storeRewardSmoothingEstimates(rewards); err != nil { - return err - } - return nil - }) - - return grp.Wait() -} - -func (p *Processor) storeChainPower(rewards []rewardActorInfo) error { tx, err := p.db.Begin() if err != nil { - return xerrors.Errorf("begin chain_power tx: %w", err) + return xerrors.Errorf("begin chain_reward tx: %w", err) } - if _, err := tx.Exec(`create temp table cp (like chain_power excluding constraints) on commit drop`); err != nil { - return xerrors.Errorf("prep chain_power temp: %w", err) + if _, err := tx.Exec(`create temp table cr (like chain_reward excluding constraints) on commit drop`); err != nil { + return xerrors.Errorf("prep chain_reward temp: %w", err) } - stmt, err := tx.Prepare(`copy cp (state_root, baseline_power) from STDIN`) + stmt, err := tx.Prepare(`copy cr ( state_root, cum_sum_baseline, cum_sum_realized, effective_network_time, effective_baseline_power, new_baseline_power, new_reward, new_reward_smoothed_position_estimate, new_reward_smoothed_velocity_estimate, total_mined_reward) from STDIN`) if err != nil { - return xerrors.Errorf("prepare tmp chain_power: %w", err) + return xerrors.Errorf("prepare tmp chain_reward: %w", err) } for _, rewardState := range rewards { if _, err := stmt.Exec( rewardState.common.stateroot.String(), - rewardState.baselinePower.String(), + rewardState.cumSumBaselinePower.String(), + rewardState.cumSumRealizedPower.String(), + rewardState.effectiveNetworkTime, + rewardState.effectiveBaselinePower.String(), + rewardState.newBaselinePower.String(), + rewardState.newBaseReward.String(), + rewardState.newSmoothingEstimate.PositionEstimate.String(), + rewardState.newSmoothingEstimate.VelocityEstimate.String(), + rewardState.totalMinedReward.String(), ); err != nil { log.Errorw("failed to store chain power", "state_root", rewardState.common.stateroot, "error", err) } } if err := stmt.Close(); err != nil { - return xerrors.Errorf("close prepared chain_power: %w", err) - } - - if _, err := tx.Exec(`insert into chain_power select * from cp on conflict do nothing`); err != nil { - return xerrors.Errorf("insert chain_power from tmp: %w", err) - } - - if err := tx.Commit(); err != nil { - return xerrors.Errorf("commit chain_power tx: %w", err) - } - - return nil -} - -func (p *Processor) storeBaseBlockReward(rewards []rewardActorInfo) error { - tx, err := p.db.Begin() - if err != nil { - return xerrors.Errorf("begin base_block_reward tx: %w", err) - } - - if _, err := tx.Exec(`create temp table bbr (like base_block_rewards excluding constraints) on commit drop`); err != nil { - return xerrors.Errorf("prep base_block_reward temp: %w", err) - } - - stmt, err := tx.Prepare(`copy bbr (state_root, base_block_reward) from STDIN`) - if err != nil { - return xerrors.Errorf("prepare tmp base_block_reward: %w", err) - } - - for _, rewardState := range rewards { - baseBlockReward := big.Div(rewardState.baseBlockReward, big.NewIntUnsigned(build.BlocksPerEpoch)) - if _, err := stmt.Exec( - rewardState.common.stateroot.String(), - baseBlockReward.String(), - ); err != nil { - log.Errorw("failed to store base block reward", "state_root", rewardState.common.stateroot, "error", err) - } - } - - if err := stmt.Close(); err != nil { - return xerrors.Errorf("close prepared base_block_reward: %w", err) - } - - if _, err := tx.Exec(`insert into base_block_rewards select * from bbr on conflict do nothing`); err != nil { - return xerrors.Errorf("insert base_block_reward from tmp: %w", err) - } - - if err := tx.Commit(); err != nil { - return xerrors.Errorf("commit base_block_reward tx: %w", err) - } - - return nil -} - -func (p *Processor) storeRewardSmoothingEstimates(rewards []rewardActorInfo) error { - tx, err := p.db.Begin() - if err != nil { - return xerrors.Errorf("begin reward_smoothing_estimates tx: %w", err) - } - - if _, err := tx.Exec(`create temp table rse (like reward_smoothing_estimates) on commit drop`); err != nil { - return xerrors.Errorf("prep reward_smoothing_estimates: %w", err) - } - - stmt, err := tx.Prepare(`copy rse (state_root, position_estimate, velocity_estimate) from stdin;`) - if err != nil { - return xerrors.Errorf("prepare tmp reward_smoothing_estimates: %w", err) - } - - for _, rewardState := range rewards { - if rewardState.epochSmoothingEstimate == nil { - continue - } - if _, err := stmt.Exec( - rewardState.common.stateroot.String(), - rewardState.epochSmoothingEstimate.PositionEstimate.String(), - rewardState.epochSmoothingEstimate.VelocityEstimate.String(), - ); err != nil { - return xerrors.Errorf("failed to store smoothing estimate: %w", err) - } - } - - if err := stmt.Close(); err != nil { - return xerrors.Errorf("close prepared reward_smoothing_estimates: %w", err) + return xerrors.Errorf("close prepared chain_reward: %w", err) } - if _, err := tx.Exec(`insert into reward_smoothing_estimates select * from rse on conflict do nothing`); err != nil { - return xerrors.Errorf("insert reward_smoothing_estimates from tmp: %w", err) + if _, err := tx.Exec(`insert into chain_reward select * from cr on conflict do nothing`); err != nil { + return xerrors.Errorf("insert chain_reward from tmp: %w", err) } if err := tx.Commit(); err != nil { - return xerrors.Errorf("commit reward_smoothing_estimates tx: %w", err) + return xerrors.Errorf("commit chain_reward tx: %w", err) } return nil diff --git a/cmd/lotus-chainwatch/scheduler/refresh_top_miners_by_base_reward.go b/cmd/lotus-chainwatch/scheduler/refresh_top_miners_by_base_reward.go index d15d48899b2..145e84229ec 100644 --- a/cmd/lotus-chainwatch/scheduler/refresh_top_miners_by_base_reward.go +++ b/cmd/lotus-chainwatch/scheduler/refresh_top_miners_by_base_reward.go @@ -23,9 +23,9 @@ func setupTopMinerByBaseRewardSchema(ctx context.Context, db *sql.DB) error { with total_rewards_by_miner as ( select b.miner, - sum(bbr.base_block_reward * b.win_count) as total_reward + sum(cr.new_reward * b.win_count) as total_reward from blocks b - inner join base_block_rewards bbr on b.parentstateroot = bbr.state_root + inner join chain_reward cr on b.parentstateroot = cr.state_root group by 1 ) select rank() over (order by total_reward desc), @@ -42,17 +42,17 @@ func setupTopMinerByBaseRewardSchema(ctx context.Context, db *sql.DB) error { b."timestamp"as current_timestamp, max(b.height) as current_height from blocks b - join base_block_rewards bbr on b.parentstateroot = bbr.state_root - where bbr.base_block_reward is not null + join chain_reward cr on b.parentstateroot = cr.state_root + where cr.new_reward is not null group by 1 order by 1 desc limit 1; `); err != nil { - return xerrors.Errorf("create top_miner_by_base_reward views: %w", err) + return xerrors.Errorf("create top_miners_by_base_reward views: %w", err) } if err := tx.Commit(); err != nil { - return xerrors.Errorf("committing top_miner_by_base_reward views; %w", err) + return xerrors.Errorf("committing top_miners_by_base_reward views; %w", err) } return nil }