Skip to content

Commit

Permalink
test: implement miner sector event extractor tests (#999)
Browse files Browse the repository at this point in the history
* chore: convert SectorEventsExtractor to new context interface

* test: implement miner sector diff extractor test
  • Loading branch information
frrist committed Jun 29, 2022
1 parent 9432089 commit c249b67
Show file tree
Hide file tree
Showing 2 changed files with 554 additions and 50 deletions.
102 changes: 52 additions & 50 deletions tasks/actorstate/miner/sector_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/filecoin-project/lily/model"
minermodel "github.com/filecoin-project/lily/model/actors/miner"
"github.com/filecoin-project/lily/tasks/actorstate"
"github.com/filecoin-project/lily/tasks/actorstate/miner/extraction"
)

type SectorEventsExtractor struct{}
Expand All @@ -25,27 +26,27 @@ func (SectorEventsExtractor) Extract(ctx context.Context, a actorstate.ActorInfo
span.SetAttributes(a.Attributes()...)
}

ec, err := NewMinerStateExtractionContext(ctx, a, node)
extState, err := extraction.LoadMinerStates(ctx, a, node)
if err != nil {
return nil, fmt.Errorf("creating miner state extraction context: %w", err)
}

var sectorChanges *miner.SectorChanges
var preCommitChanges *miner.PreCommitChanges
if !ec.HasPreviousState() {
if extState.ParentState() == nil {
// If the miner doesn't have previous state list all of its current sectors and precommits
sectors, err := ec.CurrState.LoadSectors(nil)
sectors, err := extState.CurrentState().LoadSectors(nil)
if err != nil {
return nil, fmt.Errorf("loading miner sectors: %w", err)
}

sectorChanges = miner.MakeSectorChanges()
for i, sector := range sectors {
sectorChanges.Added[i] = *sector
for _, sector := range sectors {
sectorChanges.Added = append(sectorChanges.Added, *sector)
}

preCommitChanges = miner.MakePreCommitChanges()
if err = ec.CurrState.ForEachPrecommittedSector(func(info miner.SectorPreCommitOnChainInfo) error {
if err = extState.CurrentState().ForEachPrecommittedSector(func(info miner.SectorPreCommitOnChainInfo) error {
preCommitChanges.Added = append(preCommitChanges.Added, info)
return nil
}); err != nil {
Expand All @@ -54,37 +55,42 @@ func (SectorEventsExtractor) Extract(ctx context.Context, a actorstate.ActorInfo

} else {
// If the miner has previous state compute the list of new sectors and precommit in its current state.
preCommitChanges, err = node.DiffPreCommits(ctx, a.Address, a.Current, a.Executed, ec.PrevState, ec.CurrState)
preCommitChanges, err = node.DiffPreCommits(ctx, a.Address, a.Current, a.Executed, extState.ParentState(), extState.CurrentState())
if err != nil {
return nil, err
}

sectorChanges, err = node.DiffSectors(ctx, a.Address, a.Current, a.Executed, ec.PrevState, ec.CurrState)
sectorChanges, err = node.DiffSectors(ctx, a.Address, a.Current, a.Executed, extState.ParentState(), extState.CurrentState())
if err != nil {
return nil, err
}
}

sectorEventModel, err := extractSectorEvents(ctx, a, ec, sectorChanges, preCommitChanges)
dlDiff, err := miner.DiffDeadlines(extState.ParentState(), extState.CurrentState())
if err != nil {
return nil, err
}

sectorEventModel, err := ExtractSectorEvents(ctx, extState, sectorChanges, preCommitChanges, dlDiff)
if err != nil {
return nil, err
}

return sectorEventModel, nil
}

func extractSectorEvents(ctx context.Context, a actorstate.ActorInfo, ec *MinerStateExtractionContext, sc *miner.SectorChanges, pc *miner.PreCommitChanges) (minermodel.MinerSectorEventList, error) {
ctx, span := otel.Tracer("").Start(ctx, "extractMinerSectorEvents")
func ExtractSectorEvents(ctx context.Context, extState extraction.State, sc *miner.SectorChanges, pc *miner.PreCommitChanges, dlDiff miner.DeadlinesDiff) (minermodel.MinerSectorEventList, error) {
ctx, span := otel.Tracer("").Start(ctx, "ExtractSectorEvents")
defer span.End()

partitionEvents, err := extractMinerPartitionsDiff(ctx, a, ec)
partitionEvents, err := ExtractMinerPartitionsDiff(ctx, extState, dlDiff)
if err != nil {
return nil, fmt.Errorf("extracting miner partition diff: %w", err)
}

sectorEvents := extractMinerSectorEvents(a, sc)
sectorEvents := ExtractMinerSectorEvents(extState, sc)

preCommitEvents := extractMinerPreCommitEvents(a, pc)
preCommitEvents := ExtractMinerPreCommitEvents(extState, pc)

out := make(minermodel.MinerSectorEventList, 0, len(partitionEvents)+len(sectorEvents)+len(preCommitEvents))
out = append(out, partitionEvents...)
Expand All @@ -94,7 +100,7 @@ func extractSectorEvents(ctx context.Context, a actorstate.ActorInfo, ec *MinerS
return out, nil
}

func extractMinerSectorEvents(a actorstate.ActorInfo, sectors *miner.SectorChanges) minermodel.MinerSectorEventList {
func ExtractMinerSectorEvents(extState extraction.State, sectors *miner.SectorChanges) minermodel.MinerSectorEventList {
out := make(minermodel.MinerSectorEventList, 0, len(sectors.Added)+len(sectors.Extended)+len(sectors.Snapped))

// track sector add and commit-capacity add
Expand All @@ -104,9 +110,9 @@ func extractMinerSectorEvents(a actorstate.ActorInfo, sectors *miner.SectorChang
event = minermodel.CommitCapacityAdded
}
out = append(out, &minermodel.MinerSectorEvent{
Height: int64(a.Current.Height()),
MinerID: a.Address.String(),
StateRoot: a.Current.ParentState().String(),
Height: int64(extState.CurrentTipSet().Height()),
MinerID: extState.Address().String(),
StateRoot: extState.CurrentTipSet().ParentState().String(),
SectorID: uint64(add.SectorNumber),
Event: event,
})
Expand All @@ -115,9 +121,9 @@ func extractMinerSectorEvents(a actorstate.ActorInfo, sectors *miner.SectorChang
// sector extension events
for _, mod := range sectors.Extended {
out = append(out, &minermodel.MinerSectorEvent{
Height: int64(a.Current.Height()),
MinerID: a.Address.String(),
StateRoot: a.Current.ParentState().String(),
Height: int64(extState.CurrentTipSet().Height()),
MinerID: extState.Address().String(),
StateRoot: extState.CurrentTipSet().ParentState().String(),
SectorID: uint64(mod.To.SectorNumber),
Event: minermodel.SectorExtended,
})
Expand All @@ -126,9 +132,9 @@ func extractMinerSectorEvents(a actorstate.ActorInfo, sectors *miner.SectorChang
// sector snapped events
for _, snap := range sectors.Snapped {
out = append(out, &minermodel.MinerSectorEvent{
Height: int64(a.Current.Height()),
MinerID: a.Address.String(),
StateRoot: a.Current.ParentState().String(),
Height: int64(extState.CurrentTipSet().Height()),
MinerID: extState.Address().String(),
StateRoot: extState.CurrentTipSet().ParentState().String(),
SectorID: uint64(snap.To.SectorNumber),
Event: minermodel.SectorSnapped,
})
Expand All @@ -137,14 +143,14 @@ func extractMinerSectorEvents(a actorstate.ActorInfo, sectors *miner.SectorChang
return out
}

func extractMinerPreCommitEvents(a actorstate.ActorInfo, preCommits *miner.PreCommitChanges) minermodel.MinerSectorEventList {
func ExtractMinerPreCommitEvents(extState extraction.State, preCommits *miner.PreCommitChanges) minermodel.MinerSectorEventList {
out := make(minermodel.MinerSectorEventList, len(preCommits.Added))
// track precommit addition
for i, add := range preCommits.Added {
out[i] = &minermodel.MinerSectorEvent{
Height: int64(a.Current.Height()),
MinerID: a.Address.String(),
StateRoot: a.Current.ParentState().String(),
Height: int64(extState.CurrentTipSet().Height()),
MinerID: extState.Address().String(),
StateRoot: extState.CurrentTipSet().ParentState().String(),
SectorID: uint64(add.Info.SectorNumber),
Event: minermodel.PreCommitAdded,
}
Expand All @@ -153,20 +159,15 @@ func extractMinerPreCommitEvents(a actorstate.ActorInfo, preCommits *miner.PreCo
return out
}

func extractMinerPartitionsDiff(ctx context.Context, a actorstate.ActorInfo, ec *MinerStateExtractionContext) (minermodel.MinerSectorEventList, error) {
_, span := otel.Tracer("").Start(ctx, "extractMinerPartitionDiff") // nolint: ineffassign,staticcheck
func ExtractMinerPartitionsDiff(ctx context.Context, extState extraction.State, dlDiff miner.DeadlinesDiff) (minermodel.MinerSectorEventList, error) {
_, span := otel.Tracer("").Start(ctx, "ExtractMinerPartitionsDiff") // nolint: ineffassign,staticcheck
defer span.End()

// short circuit genesis state.
if !ec.HasPreviousState() {
if extState.ParentState() == nil {
return nil, nil
}

dlDiff, err := miner.DiffDeadlines(ec.PrevState, ec.CurrState)
if err != nil {
return nil, err
}

if dlDiff == nil {
return nil, nil
}
Expand All @@ -176,6 +177,7 @@ func extractMinerPartitionsDiff(ctx context.Context, a actorstate.ActorInfo, ec
recovered := bitfield.New()
recovering := bitfield.New()

var err error
for _, deadline := range dlDiff {
for _, partition := range deadline {
removed, err = bitfield.MergeBitFields(removed, partition.Removed)
Expand All @@ -198,7 +200,7 @@ func extractMinerPartitionsDiff(ctx context.Context, a actorstate.ActorInfo, ec
}
// build an index of removed sector expiration's for comparison below.

removedSectors, err := ec.CurrState.LoadSectors(&removed)
removedSectors, err := extState.CurrentState().LoadSectors(&removed)
if err != nil {
return nil, fmt.Errorf("fetching miners removed sectors: %w", err)
}
Expand All @@ -213,13 +215,13 @@ func extractMinerPartitionsDiff(ctx context.Context, a actorstate.ActorInfo, ec
if err := removed.ForEach(func(u uint64) error {
event := minermodel.SectorTerminated
expiration := rmExpireIndex[u]
if expiration == a.Current.Height() {
if expiration == extState.CurrentTipSet().Height() {
event = minermodel.SectorExpired
}
out = append(out, &minermodel.MinerSectorEvent{
Height: int64(a.Current.Height()),
MinerID: a.Address.String(),
StateRoot: a.Current.ParentState().String(),
Height: int64(extState.CurrentTipSet().Height()),
MinerID: extState.Address().String(),
StateRoot: extState.CurrentTipSet().ParentState().String(),
SectorID: u,
Event: event,
})
Expand All @@ -231,9 +233,9 @@ func extractMinerPartitionsDiff(ctx context.Context, a actorstate.ActorInfo, ec
// track recovering sectors
if err := recovering.ForEach(func(u uint64) error {
out = append(out, &minermodel.MinerSectorEvent{
Height: int64(a.Current.Height()),
MinerID: a.Address.String(),
StateRoot: a.Current.ParentState().String(),
Height: int64(extState.CurrentTipSet().Height()),
MinerID: extState.Address().String(),
StateRoot: extState.CurrentTipSet().ParentState().String(),
SectorID: u,
Event: minermodel.SectorRecovering,
})
Expand All @@ -245,9 +247,9 @@ func extractMinerPartitionsDiff(ctx context.Context, a actorstate.ActorInfo, ec
// track faulted sectors
if err := faulted.ForEach(func(u uint64) error {
out = append(out, &minermodel.MinerSectorEvent{
Height: int64(a.Current.Height()),
MinerID: a.Address.String(),
StateRoot: a.Current.ParentState().String(),
Height: int64(extState.CurrentTipSet().Height()),
MinerID: extState.Address().String(),
StateRoot: extState.CurrentTipSet().ParentState().String(),
SectorID: u,
Event: minermodel.SectorFaulted,
})
Expand All @@ -259,9 +261,9 @@ func extractMinerPartitionsDiff(ctx context.Context, a actorstate.ActorInfo, ec
// track recovered sectors
if err := recovered.ForEach(func(u uint64) error {
out = append(out, &minermodel.MinerSectorEvent{
Height: int64(a.Current.Height()),
MinerID: a.Address.String(),
StateRoot: a.Current.ParentState().String(),
Height: int64(extState.CurrentTipSet().Height()),
MinerID: extState.Address().String(),
StateRoot: extState.CurrentTipSet().ParentState().String(),
SectorID: u,
Event: minermodel.SectorRecovered,
})
Expand Down
Loading

0 comments on commit c249b67

Please sign in to comment.