Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storagefsm: Rewrite input handling #5375

Merged
merged 21 commits into from
Feb 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
239d6f8
storagefsm: Rewrite input handling
magik6k Jan 18, 2021
542357a
storagefsm: Start packing correctly
magik6k Jan 18, 2021
e5814da
cbor-gen
magik6k Jan 18, 2021
9857ad8
storagefsm: Fix some deadlock cases
magik6k Jan 19, 2021
fd67a41
storagefsm: Change sector CreationTime to unix ts
magik6k Jan 20, 2021
270f293
storagefsm: Check per-sector deal limits
magik6k Jan 20, 2021
069766e
storagefsm: Don't persist piece assignment queue
magik6k Jan 20, 2021
f96f12c
storagefsm: Add rest of checks in WaitDeals
magik6k Jan 20, 2021
b9a9f23
storagefsm: Add stub AddPieceFailed state
magik6k Jan 20, 2021
df14f15
storagefsm: More logging for deal test debugging
magik6k Jan 20, 2021
1336d88
storagefsm: Drop addpiece wait after fixing storageadapter
magik6k Jan 21, 2021
ec4deb7
storagefsm: Fix unlocking in handleWaitDeals
magik6k Jan 21, 2021
1070ad2
storagefsm: Drop unused TargetWaitDealsSectors
magik6k Jan 21, 2021
6e7fcb7
Merge remote-tracking branch 'origin/master' into feat/refactor-fsm-i…
magik6k Feb 3, 2021
e92b8b2
Merge remote-tracking branch 'origin/master' into feat/refactor-fsm-i…
magik6k Feb 9, 2021
e27a530
storagefsm: cleanup openSectors better; pendingPieces by pieceCid
magik6k Feb 9, 2021
7111e92
Merge remote-tracking branch 'origin/master' into feat/refactor-fsm-i…
magik6k Feb 11, 2021
6907e58
Fix WaitDeals sector accounting
magik6k Feb 11, 2021
dd82729
Merge remote-tracking branch 'origin/master' into feat/refactor-fsm-i…
magik6k Feb 16, 2021
90b8612
Merge remote-tracking branch 'origin/master' into feat/refactor-fsm-i…
magik6k Feb 16, 2021
d8c9712
Merge remote-tracking branch 'origin/master' into feat/refactor-fsm-i…
magik6k Feb 17, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmd/lotus-storage-miner/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ var stateList = []stateMeta{

{col: color.FgBlue, state: sealing.Empty},
{col: color.FgBlue, state: sealing.WaitDeals},
{col: color.FgBlue, state: sealing.AddPiece},

{col: color.FgRed, state: sealing.UndefinedSectorState},
{col: color.FgYellow, state: sealing.Packing},
Expand All @@ -306,6 +307,7 @@ var stateList = []stateMeta{
{col: color.FgCyan, state: sealing.Removed},

{col: color.FgRed, state: sealing.FailedUnrecoverable},
{col: color.FgRed, state: sealing.AddPieceFailed},
{col: color.FgRed, state: sealing.SealPreCommit1Failed},
{col: color.FgRed, state: sealing.SealPreCommit2Failed},
{col: color.FgRed, state: sealing.PreCommitFailed},
Expand Down
50 changes: 49 additions & 1 deletion extern/storage-sealing/cbor_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

156 changes: 64 additions & 92 deletions extern/storage-sealing/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,22 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto
// Sealing

UndefinedSectorState: planOne(
on(SectorStart{}, Empty),
on(SectorStart{}, WaitDeals),
on(SectorStartCC{}, Packing),
),
Empty: planOne(on(SectorAddPiece{}, WaitDeals)),
Empty: planOne( // deprecated
on(SectorAddPiece{}, AddPiece),
on(SectorStartPacking{}, Packing),
),
WaitDeals: planOne(
on(SectorAddPiece{}, WaitDeals),
on(SectorAddPiece{}, AddPiece),
on(SectorStartPacking{}, Packing),
),
AddPiece: planOne(
on(SectorPieceAdded{}, WaitDeals),
apply(SectorStartPacking{}),
on(SectorAddPieceFailed{}, AddPieceFailed),
),
Packing: planOne(on(SectorPacked{}, GetTicket)),
GetTicket: planOne(
on(SectorTicket{}, PreCommit1),
Expand Down Expand Up @@ -97,6 +105,7 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto

// Sealing errors

AddPieceFailed: planOne(),
SealPreCommit1Failed: planOne(
on(SectorRetrySealPreCommit1{}, PreCommit1),
),
Expand Down Expand Up @@ -238,12 +247,11 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta

/*

* Empty <- incoming deals
| |
| v
*<- WaitDeals <- incoming deals
| |
| v
UndefinedSectorState (start)
v |
*<- WaitDeals <-> AddPiece |
| | /--------------------/
| v v
*<- Packing <- incoming committed capacity
| |
| v
Expand Down Expand Up @@ -282,10 +290,6 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
v
FailedUnrecoverable

UndefinedSectorState <- ¯\_(ツ)_/¯
| ^
*---------------------/

*/

m.stats.updateSector(m.minerSectorID(state.SectorNumber), state.State)
Expand All @@ -295,7 +299,9 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
case Empty:
fallthrough
case WaitDeals:
log.Infof("Waiting for deals %d", state.SectorNumber)
return m.handleWaitDeals, processed, nil
case AddPiece:
return m.handleAddPiece, processed, nil
case Packing:
return m.handlePacking, processed, nil
case GetTicket:
Expand Down Expand Up @@ -418,60 +424,10 @@ func (m *Sealing) restartSectors(ctx context.Context) error {
log.Errorf("loading sector list: %+v", err)
}

cfg, err := m.getConfig()
if err != nil {
return xerrors.Errorf("getting the sealing delay: %w", err)
}

spt, err := m.currentSealProof(ctx)
if err != nil {
return xerrors.Errorf("getting current seal proof: %w", err)
}
ssize, err := spt.SectorSize()
if err != nil {
return err
}

// m.unsealedInfoMap.lk.Lock() taken early in .New to prevent races
defer m.unsealedInfoMap.lk.Unlock()

for _, sector := range trackedSectors {
if err := m.sectors.Send(uint64(sector.SectorNumber), SectorRestart{}); err != nil {
log.Errorf("restarting sector %d: %+v", sector.SectorNumber, err)
}

if sector.State == WaitDeals {

// put the sector in the unsealedInfoMap
if _, ok := m.unsealedInfoMap.infos[sector.SectorNumber]; ok {
// something's funky here, but probably safe to move on
log.Warnf("sector %v was already in the unsealedInfoMap when restarting", sector.SectorNumber)
} else {
ui := UnsealedSectorInfo{
ssize: ssize,
}
for _, p := range sector.Pieces {
if p.DealInfo != nil {
ui.numDeals++
}
ui.stored += p.Piece.Size
ui.pieceSizes = append(ui.pieceSizes, p.Piece.Size.Unpadded())
}

m.unsealedInfoMap.infos[sector.SectorNumber] = ui
}

// start a fresh timer for the sector
if cfg.WaitDealsDelay > 0 {
timer := time.NewTimer(cfg.WaitDealsDelay)
go func() {
<-timer.C
if err := m.StartPacking(sector.SectorNumber); err != nil {
log.Errorf("starting sector %d: %+v", sector.SectorNumber, err)
}
}()
}
}
}

// TODO: Grab on-chain sector set and diff with trackedSectors
Expand All @@ -494,56 +450,72 @@ func final(events []statemachine.Event, state *SectorInfo) (uint64, error) {
return 0, xerrors.Errorf("didn't expect any events in state %s, got %+v", state.State, events)
}

func on(mut mutator, next SectorState) func() (mutator, func(*SectorInfo) error) {
return func() (mutator, func(*SectorInfo) error) {
return mut, func(state *SectorInfo) error {
func on(mut mutator, next SectorState) func() (mutator, func(*SectorInfo) (bool, error)) {
return func() (mutator, func(*SectorInfo) (bool, error)) {
return mut, func(state *SectorInfo) (bool, error) {
state.State = next
return nil
return false, nil
}
}
}

// like `on`, but doesn't change state
func apply(mut mutator) func() (mutator, func(*SectorInfo) (bool, error)) {
return func() (mutator, func(*SectorInfo) (bool, error)) {
return mut, func(state *SectorInfo) (bool, error) {
return true, nil
}
}
}

func onReturning(mut mutator) func() (mutator, func(*SectorInfo) error) {
return func() (mutator, func(*SectorInfo) error) {
return mut, func(state *SectorInfo) error {
func onReturning(mut mutator) func() (mutator, func(*SectorInfo) (bool, error)) {
return func() (mutator, func(*SectorInfo) (bool, error)) {
return mut, func(state *SectorInfo) (bool, error) {
if state.Return == "" {
return xerrors.Errorf("return state not set")
return false, xerrors.Errorf("return state not set")
}

state.State = SectorState(state.Return)
state.Return = ""
return nil
return false, nil
}
}
}

func planOne(ts ...func() (mut mutator, next func(*SectorInfo) error)) func(events []statemachine.Event, state *SectorInfo) (uint64, error) {
func planOne(ts ...func() (mut mutator, next func(*SectorInfo) (more bool, err error))) func(events []statemachine.Event, state *SectorInfo) (uint64, error) {
return func(events []statemachine.Event, state *SectorInfo) (uint64, error) {
if gm, ok := events[0].User.(globalMutator); ok {
gm.applyGlobal(state)
return 1, nil
}
for i, event := range events {
if gm, ok := event.User.(globalMutator); ok {
gm.applyGlobal(state)
return uint64(i + 1), nil
}

for _, t := range ts {
mut, next := t()
for _, t := range ts {
mut, next := t()

if reflect.TypeOf(events[0].User) != reflect.TypeOf(mut) {
continue
}
if reflect.TypeOf(event.User) != reflect.TypeOf(mut) {
continue
}

if err, iserr := event.User.(error); iserr {
log.Warnf("sector %d got error event %T: %+v", state.SectorNumber, event.User, err)
}

if err, iserr := events[0].User.(error); iserr {
log.Warnf("sector %d got error event %T: %+v", state.SectorNumber, events[0].User, err)
event.User.(mutator).apply(state)
more, err := next(state)
if err != nil || !more {
return uint64(i + 1), err
}
}

events[0].User.(mutator).apply(state)
return 1, next(state)
}
_, ok := event.User.(Ignorable)
if ok {
continue
}

_, ok := events[0].User.(Ignorable)
if ok {
return 1, nil
return uint64(i + 1), xerrors.Errorf("planner for state %s received unexpected event %T (%+v)", state.State, event.User, event)
}

return 0, xerrors.Errorf("planner for state %s received unexpected event %T (%+v)", state.State, events[0].User, events[0])
return uint64(len(events)), nil
}
}
26 changes: 21 additions & 5 deletions extern/storage-sealing/fsm_events.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package sealing

import (
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
"time"

"github.com/ipfs/go-cid"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/specs-storage/storage"

"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
)

type mutator interface {
Expand Down Expand Up @@ -76,14 +79,27 @@ func (evt SectorStartCC) apply(state *SectorInfo) {
state.SectorType = evt.SectorType
}

type SectorAddPiece struct {
NewPiece Piece
}
type SectorAddPiece struct{}

func (evt SectorAddPiece) apply(state *SectorInfo) {
state.Pieces = append(state.Pieces, evt.NewPiece)
if state.CreationTime == 0 {
state.CreationTime = time.Now().Unix()
}
}

type SectorPieceAdded struct {
NewPieces []Piece
}

func (evt SectorPieceAdded) apply(state *SectorInfo) {
state.Pieces = append(state.Pieces, evt.NewPieces...)
}

type SectorAddPieceFailed struct{ error }

func (evt SectorAddPieceFailed) FormatError(xerrors.Printer) (next error) { return evt.error }
func (evt SectorAddPieceFailed) apply(si *SectorInfo) {}

type SectorStartPacking struct{}

func (evt SectorStartPacking) apply(*SectorInfo) {}
Expand Down
Loading