From 15191ff80f43a4f3e780771095fb34f89fe62814 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 27 Aug 2020 12:39:20 +0200 Subject: [PATCH 01/25] storagefsm: Allow removing sectors in all states --- extern/storage-sealing/fsm.go | 96 +++++++++++++--------------- extern/storage-sealing/fsm_events.go | 5 +- 2 files changed, 47 insertions(+), 54 deletions(-) diff --git a/extern/storage-sealing/fsm.go b/extern/storage-sealing/fsm.go index 82db5c15af0..1612cafb701 100644 --- a/extern/storage-sealing/fsm.go +++ b/extern/storage-sealing/fsm.go @@ -17,9 +17,9 @@ import ( ) func (m *Sealing) Plan(events []statemachine.Event, user interface{}) (interface{}, uint64, error) { - next, err := m.plan(events, user.(*SectorInfo)) + next, processed, err := m.plan(events, user.(*SectorInfo)) if err != nil || next == nil { - return nil, uint64(len(events)), err + return nil, processed, err } return func(ctx statemachine.Context, si SectorInfo) error { @@ -30,10 +30,10 @@ func (m *Sealing) Plan(events []statemachine.Event, user interface{}) (interface } return nil - }, uint64(len(events)), nil // TODO: This processed event count is not very correct + }, processed, nil // TODO: This processed event count is not very correct } -var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *SectorInfo) error{ +var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *SectorInfo) (uint64, error){ // Sealing UndefinedSectorState: planOne( @@ -119,7 +119,6 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto Proving: planOne( on(SectorFaultReported{}, FaultReported), on(SectorFaulty{}, Faulty), - on(SectorRemove{}, Removing), ), Removing: planOne( on(SectorRemoved{}, Removed), @@ -133,7 +132,7 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto Removed: final, } -func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(statemachine.Context, SectorInfo) error, error) { +func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(statemachine.Context, SectorInfo) error, uint64, error) { ///// // First process all events @@ -170,11 +169,12 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta p := fsmPlanners[state.State] if p == nil { - return nil, xerrors.Errorf("planner for state %s not found", state.State) + return nil, 0, xerrors.Errorf("planner for state %s not found", state.State) } - if err := p(events, state); err != nil { - return nil, xerrors.Errorf("running planner for state %s failed: %w", state.State, err) + processed, err := p(events, state) + if err != nil { + return nil, 0, xerrors.Errorf("running planner for state %s failed: %w", state.State, err) } ///// @@ -235,51 +235,51 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta case WaitDeals: log.Infof("Waiting for deals %d", state.SectorNumber) case Packing: - return m.handlePacking, nil + return m.handlePacking, processed, nil case PreCommit1: - return m.handlePreCommit1, nil + return m.handlePreCommit1, processed, nil case PreCommit2: - return m.handlePreCommit2, nil + return m.handlePreCommit2, processed, nil case PreCommitting: - return m.handlePreCommitting, nil + return m.handlePreCommitting, processed, nil case PreCommitWait: - return m.handlePreCommitWait, nil + return m.handlePreCommitWait, processed, nil case WaitSeed: - return m.handleWaitSeed, nil + return m.handleWaitSeed, processed, nil case Committing: - return m.handleCommitting, nil + return m.handleCommitting, processed, nil case CommitWait: - return m.handleCommitWait, nil + return m.handleCommitWait, processed, nil case FinalizeSector: - return m.handleFinalizeSector, nil + return m.handleFinalizeSector, processed, nil // Handled failure modes case SealPreCommit1Failed: - return m.handleSealPrecommit1Failed, nil + return m.handleSealPrecommit1Failed, processed, nil case SealPreCommit2Failed: - return m.handleSealPrecommit2Failed, nil + return m.handleSealPrecommit2Failed, processed, nil case PreCommitFailed: - return m.handlePreCommitFailed, nil + return m.handlePreCommitFailed, processed, nil case ComputeProofFailed: - return m.handleComputeProofFailed, nil + return m.handleComputeProofFailed, processed, nil case CommitFailed: - return m.handleCommitFailed, nil + return m.handleCommitFailed, processed, nil case FinalizeFailed: - return m.handleFinalizeFailed, nil + return m.handleFinalizeFailed, processed, nil // Post-seal case Proving: - return m.handleProvingSector, nil + return m.handleProvingSector, processed, nil case Removing: - return m.handleRemoving, nil + return m.handleRemoving, processed, nil case Removed: - return nil, nil + return nil, processed, nil // Faults case Faulty: - return m.handleFaulty, nil + return m.handleFaulty, processed, nil case FaultReported: - return m.handleFaultReported, nil + return m.handleFaultReported, processed, nil // Fatal errors case UndefinedSectorState: @@ -290,15 +290,15 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta log.Errorf("unexpected sector update state: %s", state.State) } - return nil, nil + return nil, processed, nil } -func planCommitting(events []statemachine.Event, state *SectorInfo) error { +func planCommitting(events []statemachine.Event, state *SectorInfo) (uint64, error) { for _, event := range events { switch e := event.User.(type) { case globalMutator: if e.applyGlobal(state) { - return nil + return 1, nil } case SectorCommitted: // the normal case e.apply(state) @@ -311,7 +311,7 @@ func planCommitting(events []statemachine.Event, state *SectorInfo) error { log.Warnf("planCommitting: commit Seed changed") e.apply(state) state.State = Committing - return nil + return 1, nil case SectorComputeProofFailed: state.State = ComputeProofFailed case SectorSealPreCommit1Failed: @@ -321,10 +321,10 @@ func planCommitting(events []statemachine.Event, state *SectorInfo) error { case SectorRetryCommitWait: state.State = CommitWait default: - return xerrors.Errorf("planCommitting got event of unknown type %T, events: %+v", event.User, events) + return 0, xerrors.Errorf("planCommitting got event of unknown type %T, events: %+v", event.User, events) } } - return nil + return 1, nil } func (m *Sealing) restartSectors(ctx context.Context) error { @@ -365,8 +365,8 @@ func (m *Sealing) ForceSectorState(ctx context.Context, id abi.SectorNumber, sta return m.sectors.Send(id, SectorForceState{state}) } -func final(events []statemachine.Event, state *SectorInfo) error { - return xerrors.Errorf("didn't expect any events in state %s, got %+v", state.State, events) +func final(events []statemachine.Event, state *SectorInfo) (uint64, error) { + return 0, xerrors.Errorf("didn't expect any events in state %s, got %+v", state.State, events) } func on(mut mutator, next SectorState) func() (mutator, SectorState) { @@ -375,21 +375,11 @@ func on(mut mutator, next SectorState) func() (mutator, SectorState) { } } -func planOne(ts ...func() (mut mutator, next SectorState)) func(events []statemachine.Event, state *SectorInfo) error { - return func(events []statemachine.Event, state *SectorInfo) error { - if len(events) != 1 { - for _, event := range events { - if gm, ok := event.User.(globalMutator); ok { - gm.applyGlobal(state) - return nil - } - } - return xerrors.Errorf("planner for state %s only has a plan for a single event only, got %+v", state.State, events) - } - +func planOne(ts ...func() (mut mutator, next SectorState)) func(events []statemachine.Event, state *SectorInfo) (uint64, error) { + return func(events []statemachine.Event, state *SectorInfo) (uint64, error) { if gm, ok := events[0].User.(globalMutator); ok { gm.applyGlobal(state) - return nil + return 1, nil } for _, t := range ts { @@ -405,14 +395,14 @@ func planOne(ts ...func() (mut mutator, next SectorState)) func(events []statema events[0].User.(mutator).apply(state) state.State = next - return nil + return 1, nil } _, ok := events[0].User.(Ignorable) if ok { - return nil + return 1, nil } - return xerrors.Errorf("planner for state %s received unexpected event %T (%+v)", state.State, events[0].User, events[0]) + return 0, xerrors.Errorf("planner for state %s received unexpected event %T (%+v)", state.State, events[0].User, events[0]) } } diff --git a/extern/storage-sealing/fsm_events.go b/extern/storage-sealing/fsm_events.go index f270b3668f7..06f4b7b2cfd 100644 --- a/extern/storage-sealing/fsm_events.go +++ b/extern/storage-sealing/fsm_events.go @@ -274,7 +274,10 @@ type SectorFaultedFinal struct{} type SectorRemove struct{} -func (evt SectorRemove) apply(state *SectorInfo) {} +func (evt SectorRemove) applyGlobal(state *SectorInfo) bool { + state.State = Removing + return true +} type SectorRemoved struct{} From e2361734179024ab3e3497fddae654c644917154 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 27 Aug 2020 12:40:19 +0200 Subject: [PATCH 02/25] miner: Print (pre)commit message cids in sector status --- api/api_storage.go | 2 ++ cmd/lotus-storage-miner/sectors.go | 2 ++ node/impl/storminer.go | 2 ++ 3 files changed, 6 insertions(+) diff --git a/api/api_storage.go b/api/api_storage.go index 65ecf12eaa4..8745dd0a2bb 100644 --- a/api/api_storage.go +++ b/api/api_storage.go @@ -128,6 +128,8 @@ type SectorInfo struct { Deals []abi.DealID Ticket SealTicket Seed SealSeed + PreCommitMsg *cid.Cid + CommitMsg *cid.Cid Retries uint64 ToUpgrade bool diff --git a/cmd/lotus-storage-miner/sectors.go b/cmd/lotus-storage-miner/sectors.go index e1815572e5f..8bd8ff6a5b9 100644 --- a/cmd/lotus-storage-miner/sectors.go +++ b/cmd/lotus-storage-miner/sectors.go @@ -97,6 +97,8 @@ var sectorsStatusCmd = &cli.Command{ fmt.Printf("TicketH:\t%d\n", status.Ticket.Epoch) fmt.Printf("Seed:\t\t%x\n", status.Seed.Value) fmt.Printf("SeedH:\t\t%d\n", status.Seed.Epoch) + fmt.Printf("Precommit:\t%s\n", status.PreCommitMsg) + fmt.Printf("Commit:\t\t%s\n", status.CommitMsg) fmt.Printf("Proof:\t\t%x\n", status.Proof) fmt.Printf("Deals:\t\t%v\n", status.Deals) fmt.Printf("Retries:\t%d\n", status.Retries) diff --git a/node/impl/storminer.go b/node/impl/storminer.go index 6c1b98db1e0..2bda63042e6 100644 --- a/node/impl/storminer.go +++ b/node/impl/storminer.go @@ -153,6 +153,8 @@ func (sm *StorageMinerAPI) SectorsStatus(ctx context.Context, sid abi.SectorNumb Value: info.SeedValue, Epoch: info.SeedEpoch, }, + PreCommitMsg: info.PreCommitMessage, + CommitMsg: info.CommitMessage, Retries: info.InvalidProofs, ToUpgrade: sm.Miner.IsMarkedForUpgrade(sid), From 51ca460f187b682c633ec8a941d4d5810fe334d5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 27 Aug 2020 12:41:12 +0200 Subject: [PATCH 03/25] miner: Fix sorting of some sector states in miner-info --- cmd/lotus-storage-miner/info.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/cmd/lotus-storage-miner/info.go b/cmd/lotus-storage-miner/info.go index 3118e6afa94..63e57111b71 100644 --- a/cmd/lotus-storage-miner/info.go +++ b/cmd/lotus-storage-miner/info.go @@ -236,8 +236,10 @@ var stateList = []stateMeta{ {col: 39, state: "Total"}, {col: color.FgGreen, state: sealing.Proving}, + {col: color.FgBlue, state: sealing.Empty}, + {col: color.FgBlue, state: sealing.WaitDeals}, + {col: color.FgRed, state: sealing.UndefinedSectorState}, - {col: color.FgYellow, state: sealing.Empty}, {col: color.FgYellow, state: sealing.Packing}, {col: color.FgYellow, state: sealing.PreCommit1}, {col: color.FgYellow, state: sealing.PreCommit2}, @@ -248,6 +250,9 @@ var stateList = []stateMeta{ {col: color.FgYellow, state: sealing.CommitWait}, {col: color.FgYellow, state: sealing.FinalizeSector}, + {col: color.FgCyan, state: sealing.Removing}, + {col: color.FgCyan, state: sealing.Removed}, + {col: color.FgRed, state: sealing.FailedUnrecoverable}, {col: color.FgRed, state: sealing.SealPreCommit1Failed}, {col: color.FgRed, state: sealing.SealPreCommit2Failed}, @@ -259,6 +264,7 @@ var stateList = []stateMeta{ {col: color.FgRed, state: sealing.Faulty}, {col: color.FgRed, state: sealing.FaultReported}, {col: color.FgRed, state: sealing.FaultedFinal}, + {col: color.FgRed, state: sealing.RemoveFailed}, } func init() { From 99ecef89b80b5c5e718446d17be8fc37a4618298 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 27 Aug 2020 12:41:24 +0200 Subject: [PATCH 04/25] gofmt --- api/api_storage.go | 22 +++++++++++----------- node/impl/storminer.go | 6 +++--- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/api/api_storage.go b/api/api_storage.go index 8745dd0a2bb..14aa5ff971d 100644 --- a/api/api_storage.go +++ b/api/api_storage.go @@ -120,18 +120,18 @@ type SectorLog struct { } type SectorInfo struct { - SectorID abi.SectorNumber - State SectorState - CommD *cid.Cid - CommR *cid.Cid - Proof []byte - Deals []abi.DealID - Ticket SealTicket - Seed SealSeed + SectorID abi.SectorNumber + State SectorState + CommD *cid.Cid + CommR *cid.Cid + Proof []byte + Deals []abi.DealID + Ticket SealTicket + Seed SealSeed PreCommitMsg *cid.Cid - CommitMsg *cid.Cid - Retries uint64 - ToUpgrade bool + CommitMsg *cid.Cid + Retries uint64 + ToUpgrade bool LastErr string diff --git a/node/impl/storminer.go b/node/impl/storminer.go index 2bda63042e6..2eb9c99055d 100644 --- a/node/impl/storminer.go +++ b/node/impl/storminer.go @@ -154,9 +154,9 @@ func (sm *StorageMinerAPI) SectorsStatus(ctx context.Context, sid abi.SectorNumb Epoch: info.SeedEpoch, }, PreCommitMsg: info.PreCommitMessage, - CommitMsg: info.CommitMessage, - Retries: info.InvalidProofs, - ToUpgrade: sm.Miner.IsMarkedForUpgrade(sid), + CommitMsg: info.CommitMessage, + Retries: info.InvalidProofs, + ToUpgrade: sm.Miner.IsMarkedForUpgrade(sid), LastErr: info.LastErr, Log: log, From 788c7dbf489a559f2c4627a9c88514d7ec09a0e0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 27 Aug 2020 12:57:08 +0200 Subject: [PATCH 05/25] storagefsm: Separate satte for submitting commit message --- cmd/lotus-storage-miner/info.go | 1 + extern/storage-sealing/fsm.go | 104 +++++++++++++---------- extern/storage-sealing/fsm_events.go | 10 ++- extern/storage-sealing/sector_state.go | 7 +- extern/storage-sealing/states_sealing.go | 17 ++-- 5 files changed, 80 insertions(+), 59 deletions(-) diff --git a/cmd/lotus-storage-miner/info.go b/cmd/lotus-storage-miner/info.go index 63e57111b71..79b2ba3ea47 100644 --- a/cmd/lotus-storage-miner/info.go +++ b/cmd/lotus-storage-miner/info.go @@ -247,6 +247,7 @@ var stateList = []stateMeta{ {col: color.FgYellow, state: sealing.PreCommitWait}, {col: color.FgYellow, state: sealing.WaitSeed}, {col: color.FgYellow, state: sealing.Committing}, + {col: color.FgYellow, state: sealing.SubmitCommit}, {col: color.FgYellow, state: sealing.CommitWait}, {col: color.FgYellow, state: sealing.FinalizeSector}, diff --git a/extern/storage-sealing/fsm.go b/extern/storage-sealing/fsm.go index 1612cafb701..d8c7430a2e7 100644 --- a/extern/storage-sealing/fsm.go +++ b/extern/storage-sealing/fsm.go @@ -71,6 +71,10 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto on(SectorChainPreCommitFailed{}, PreCommitFailed), ), Committing: planCommitting, + SubmitCommit: planOne( + on(SectorCommitSubmitted{}, CommitWait), + on(SectorCommitFailed{}, CommitFailed), + ), CommitWait: planOne( on(SectorProving{}, FinalizeSector), on(SectorCommitFailed{}, CommitFailed), @@ -182,47 +186,50 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta /* - * Empty <- incoming deals - | | - | v - *<- WaitDeals <- incoming deals - | | - | v - *<- Packing <- incoming committed capacity - | | - | v - *<- PreCommit1 <--> SealPreCommit1Failed - | | ^ ^^ - | | *----------++----\ - | v v || | - *<- PreCommit2 --------++--> SealPreCommit2Failed - | | || - | v /-------/| - * PreCommitting <-----+---> PreCommitFailed - | | | ^ - | v | | - *<- WaitSeed -----------+-----/ - | ||| ^ | - | ||| \--------*-----/ - | ||| | - | vvv v----+----> ComputeProofFailed - *<- Committing | - | | ^--> CommitFailed - | v ^ - *<- CommitWait ---/ - | | - | v - | FinalizeSector <--> FinalizeFailed - | | - | v - *<- Proving - | - v - FailedUnrecoverable - - UndefinedSectorState <- ¯\_(ツ)_/¯ - | ^ - *---------------------/ + * Empty <- incoming deals + | | + | v + *<- WaitDeals <- incoming deals + | | + | v + *<- Packing <- incoming committed capacity + | | + | v + *<- PreCommit1 <--> SealPreCommit1Failed + | | ^ ^^ + | | *----------++----\ + | v v || | + *<- PreCommit2 --------++--> SealPreCommit2Failed + | | || + | v /-------/| + * PreCommitting <-----+---> PreCommitFailed + | | | ^ + | v | | + *<- WaitSeed -----------+-----/ + | ||| ^ | + | ||| \--------*-----/ + | ||| | + | vvv v----+----> ComputeProofFailed + *<- Committing | + | | ^--> CommitFailed + | v ^ + | SubmitCommit | + | | | + | v | + *<- CommitWait ---/ + | | + | v + | FinalizeSector <--> FinalizeFailed + | | + | v + *<- Proving + | + v + FailedUnrecoverable + + UndefinedSectorState <- ¯\_(ツ)_/¯ + | ^ + *---------------------/ */ @@ -248,6 +255,8 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta return m.handleWaitSeed, processed, nil case Committing: return m.handleCommitting, processed, nil + case SubmitCommit: + return m.handleSubmitCommit, processed, nil case CommitWait: return m.handleCommitWait, processed, nil case FinalizeSector: @@ -294,24 +303,25 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta } func planCommitting(events []statemachine.Event, state *SectorInfo) (uint64, error) { - for _, event := range events { + for i, event := range events { switch e := event.User.(type) { case globalMutator: if e.applyGlobal(state) { - return 1, nil + return uint64(i + 1), nil } case SectorCommitted: // the normal case e.apply(state) - state.State = CommitWait + state.State = SubmitCommit case SectorSeedReady: // seed changed :/ if e.SeedEpoch == state.SeedEpoch && bytes.Equal(e.SeedValue, state.SeedValue) { log.Warnf("planCommitting: got SectorSeedReady, but the seed didn't change") continue // or it didn't! } + log.Warnf("planCommitting: commit Seed changed") e.apply(state) state.State = Committing - return 1, nil + return uint64(i + 1), nil case SectorComputeProofFailed: state.State = ComputeProofFailed case SectorSealPreCommit1Failed: @@ -321,10 +331,10 @@ func planCommitting(events []statemachine.Event, state *SectorInfo) (uint64, err case SectorRetryCommitWait: state.State = CommitWait default: - return 0, xerrors.Errorf("planCommitting got event of unknown type %T, events: %+v", event.User, events) + return uint64(i), xerrors.Errorf("planCommitting got event of unknown type %T, events: %+v", event.User, events) } } - return 1, nil + return uint64(len(events)), nil } func (m *Sealing) restartSectors(ctx context.Context) error { diff --git a/extern/storage-sealing/fsm_events.go b/extern/storage-sealing/fsm_events.go index 06f4b7b2cfd..29a52f362c3 100644 --- a/extern/storage-sealing/fsm_events.go +++ b/extern/storage-sealing/fsm_events.go @@ -192,12 +192,18 @@ func (evt SectorCommitFailed) FormatError(xerrors.Printer) (next error) { return func (evt SectorCommitFailed) apply(*SectorInfo) {} type SectorCommitted struct { - Message cid.Cid - Proof []byte + Proof []byte } func (evt SectorCommitted) apply(state *SectorInfo) { state.Proof = evt.Proof +} + +type SectorCommitSubmitted struct { + Message cid.Cid +} + +func (evt SectorCommitSubmitted) apply(state *SectorInfo) { state.CommitMessage = &evt.Message } diff --git a/extern/storage-sealing/sector_state.go b/extern/storage-sealing/sector_state.go index 2f57d83e891..f3106ea3b98 100644 --- a/extern/storage-sealing/sector_state.go +++ b/extern/storage-sealing/sector_state.go @@ -10,12 +10,13 @@ const ( WaitDeals SectorState = "WaitDeals" // waiting for more pieces (deals) to be added to the sector Packing SectorState = "Packing" // sector not in sealStore, and not on chain PreCommit1 SectorState = "PreCommit1" // do PreCommit1 - PreCommit2 SectorState = "PreCommit2" // do PreCommit1 + PreCommit2 SectorState = "PreCommit2" // do PreCommit2 PreCommitting SectorState = "PreCommitting" // on chain pre-commit PreCommitWait SectorState = "PreCommitWait" // waiting for precommit to land on chain WaitSeed SectorState = "WaitSeed" // waiting for seed - Committing SectorState = "Committing" - CommitWait SectorState = "CommitWait" // waiting for message to land on chain + Committing SectorState = "Committing" // compute PoRep + SubmitCommit SectorState = "SubmitCommit" // send commit message to the chain + CommitWait SectorState = "CommitWait" // wait for the commit message to land on chain FinalizeSector SectorState = "FinalizeSector" Proving SectorState = "Proving" // error modes diff --git a/extern/storage-sealing/states_sealing.go b/extern/storage-sealing/states_sealing.go index a434d1fcb30..4dfec6b5236 100644 --- a/extern/storage-sealing/states_sealing.go +++ b/extern/storage-sealing/states_sealing.go @@ -326,21 +326,25 @@ func (m *Sealing) handleCommitting(ctx statemachine.Context, sector SectorInfo) return ctx.Send(SectorComputeProofFailed{xerrors.Errorf("computing seal proof failed(2): %w", err)}) } + return ctx.Send(SectorCommitted{ + Proof: proof, + }) +} + +func (m *Sealing) handleSubmitCommit(ctx statemachine.Context, sector SectorInfo) error { tok, _, err := m.api.ChainHead(ctx.Context()) if err != nil { log.Errorf("handleCommitting: api error, not proceeding: %+v", err) return nil } - if err := m.checkCommit(ctx.Context(), sector, proof, tok); err != nil { + if err := m.checkCommit(ctx.Context(), sector, sector.Proof, tok); err != nil { return ctx.Send(SectorCommitFailed{xerrors.Errorf("commit check error: %w", err)}) } - // TODO: Consider splitting states and persist proof for faster recovery - params := &miner.ProveCommitSectorParams{ SectorNumber: sector.SectorNumber, - Proof: proof, + Proof: sector.Proof, } enc := new(bytes.Buffer) @@ -372,14 +376,13 @@ func (m *Sealing) handleCommitting(ctx statemachine.Context, sector SectorInfo) collateral = big.Zero() } - // TODO: check seed / ticket are up to date + // TODO: check seed / ticket / deals are up to date mcid, err := m.api.SendMsg(ctx.Context(), waddr, m.maddr, builtin.MethodsMiner.ProveCommitSector, collateral, m.feeCfg.MaxCommitGasFee, enc.Bytes()) if err != nil { return ctx.Send(SectorCommitFailed{xerrors.Errorf("pushing message to mpool: %w", err)}) } - return ctx.Send(SectorCommitted{ - Proof: proof, + return ctx.Send(SectorCommitSubmitted{ Message: mcid, }) } From df635579c4e9f9eb09278f69e8fadf0bfed62c86 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 27 Aug 2020 13:51:13 +0200 Subject: [PATCH 06/25] storagefsm: Handle sectors with expired deals better --- api/api_full.go | 2 +- chain/stmgr/utils.go | 2 +- cmd/lotus-storage-miner/info.go | 1 + extern/storage-sealing/checks.go | 14 +++++++- extern/storage-sealing/fsm.go | 8 +++++ extern/storage-sealing/fsm_events.go | 5 +++ extern/storage-sealing/sector_state.go | 1 + extern/storage-sealing/states_failed.go | 42 +++++++++++++++++++++++- extern/storage-sealing/states_sealing.go | 15 +++++++-- extern/storage-sealing/upgrade_queue.go | 4 +++ node/impl/storminer.go | 3 ++ 11 files changed, 90 insertions(+), 7 deletions(-) diff --git a/api/api_full.go b/api/api_full.go index 2d8a4e5150b..7f3d7b093f4 100644 --- a/api/api_full.go +++ b/api/api_full.go @@ -323,7 +323,7 @@ type FullNode interface { StateMinerAvailableBalance(context.Context, address.Address, types.TipSetKey) (types.BigInt, error) // StateSectorPreCommitInfo returns the PreCommit info for the specified miner's sector StateSectorPreCommitInfo(context.Context, address.Address, abi.SectorNumber, types.TipSetKey) (miner.SectorPreCommitOnChainInfo, error) - // StateSectorGetInfo returns the on-chain info for the specified miner's sector + // StateSectorGetInfo returns the on-chain info for the specified miner's sector. Returns null in case the sector info isn't found // NOTE: returned info.Expiration may not be accurate in some cases, use StateSectorExpiration to get accurate // expiration epoch StateSectorGetInfo(context.Context, address.Address, abi.SectorNumber, types.TipSetKey) (*miner.SectorOnChainInfo, error) diff --git a/chain/stmgr/utils.go b/chain/stmgr/utils.go index 098021f6f35..a95374b6938 100644 --- a/chain/stmgr/utils.go +++ b/chain/stmgr/utils.go @@ -157,7 +157,7 @@ func MinerSectorInfo(ctx context.Context, sm *StateManager, maddr address.Addres return nil, err } if !ok { - return nil, xerrors.New("sector not found") + return nil, nil } return sectorInfo, nil diff --git a/cmd/lotus-storage-miner/info.go b/cmd/lotus-storage-miner/info.go index 79b2ba3ea47..3a773cf3e34 100644 --- a/cmd/lotus-storage-miner/info.go +++ b/cmd/lotus-storage-miner/info.go @@ -266,6 +266,7 @@ var stateList = []stateMeta{ {col: color.FgRed, state: sealing.FaultReported}, {col: color.FgRed, state: sealing.FaultedFinal}, {col: color.FgRed, state: sealing.RemoveFailed}, + {col: color.FgRed, state: sealing.DealsExpired}, } func init() { diff --git a/extern/storage-sealing/checks.go b/extern/storage-sealing/checks.go index b415c6f8eb0..3a59ea05948 100644 --- a/extern/storage-sealing/checks.go +++ b/extern/storage-sealing/checks.go @@ -33,7 +33,7 @@ type ErrInvalidProof struct{ error } type ErrNoPrecommit struct{ error } type ErrCommitWaitFailed struct{ error } -func checkPieces(ctx context.Context, si SectorInfo, api SealingAPI) error { +func checkPieces(ctx context.Context, maddr address.Address, si SectorInfo, api SealingAPI) error { tok, height, err := api.ChainHead(ctx) if err != nil { return &ErrApi{xerrors.Errorf("getting chain head: %w", err)} @@ -55,6 +55,10 @@ func checkPieces(ctx context.Context, si SectorInfo, api SealingAPI) error { return &ErrInvalidDeals{xerrors.Errorf("getting deal %d for piece %d: %w", p.DealInfo.DealID, i, err)} } + if proposal.Provider != maddr { + return &ErrInvalidDeals{xerrors.Errorf("piece %d (of %d) of sector %d refers deal %d with wrong provider: %s != %s", i, len(si.Pieces), si.SectorNumber, p.DealInfo.DealID, proposal.Provider, maddr)} + } + if proposal.PieceCID != p.Piece.PieceCID { return &ErrInvalidDeals{xerrors.Errorf("piece %d (of %d) of sector %d refers deal %d with wrong PieceCID: %x != %x", i, len(si.Pieces), si.SectorNumber, p.DealInfo.DealID, p.Piece.PieceCID, proposal.PieceCID)} } @@ -74,6 +78,10 @@ func checkPieces(ctx context.Context, si SectorInfo, api SealingAPI) error { // checkPrecommit checks that data commitment generated in the sealing process // matches pieces, and that the seal ticket isn't expired func checkPrecommit(ctx context.Context, maddr address.Address, si SectorInfo, tok TipSetToken, height abi.ChainEpoch, api SealingAPI) (err error) { + if err := checkPieces(ctx, maddr, si, api); err != nil { + return err + } + commD, err := api.StateComputeDataCommitment(ctx, maddr, si.SectorType, si.dealIDs(), tok) if err != nil { return &ErrApi{xerrors.Errorf("calling StateComputeDataCommitment: %w", err)} @@ -176,5 +184,9 @@ func (m *Sealing) checkCommit(ctx context.Context, si SectorInfo, proof []byte, return &ErrInvalidProof{xerrors.New("invalid proof (compute error?)")} } + if err := checkPieces(ctx, m.maddr, si, m.api); err != nil { + return err + } + return nil } diff --git a/extern/storage-sealing/fsm.go b/extern/storage-sealing/fsm.go index d8c7430a2e7..2ae94aad40c 100644 --- a/extern/storage-sealing/fsm.go +++ b/extern/storage-sealing/fsm.go @@ -61,6 +61,7 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto on(SectorPreCommitted{}, PreCommitWait), on(SectorChainPreCommitFailed{}, PreCommitFailed), on(SectorPreCommitLanded{}, WaitSeed), + on(SectorDealsExpired{}, DealsExpired), ), PreCommitWait: planOne( on(SectorChainPreCommitFailed{}, PreCommitFailed), @@ -99,6 +100,7 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto on(SectorRetryWaitSeed{}, WaitSeed), on(SectorSealPreCommit1Failed{}, SealPreCommit1Failed), on(SectorPreCommitLanded{}, WaitSeed), + on(SectorDealsExpired{}, DealsExpired), ), ComputeProofFailed: planOne( on(SectorRetryComputeProof{}, Committing), @@ -113,10 +115,14 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto on(SectorChainPreCommitFailed{}, PreCommitFailed), on(SectorRetryPreCommit{}, PreCommitting), on(SectorRetryCommitWait{}, CommitWait), + on(SectorDealsExpired{}, DealsExpired), ), FinalizeFailed: planOne( on(SectorRetryFinalize{}, FinalizeSector), ), + DealsExpired: planOne( + // SectorRemove (global) + ), // Post-seal @@ -275,6 +281,8 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta return m.handleCommitFailed, processed, nil case FinalizeFailed: return m.handleFinalizeFailed, processed, nil + case DealsExpired: + return m.handleDealsExpired, processed, nil // Post-seal case Proving: diff --git a/extern/storage-sealing/fsm_events.go b/extern/storage-sealing/fsm_events.go index 29a52f362c3..7761ad3e9e4 100644 --- a/extern/storage-sealing/fsm_events.go +++ b/extern/storage-sealing/fsm_events.go @@ -191,6 +191,11 @@ type SectorCommitFailed struct{ error } func (evt SectorCommitFailed) FormatError(xerrors.Printer) (next error) { return evt.error } func (evt SectorCommitFailed) apply(*SectorInfo) {} +type SectorDealsExpired struct{ error } + +func (evt SectorDealsExpired) FormatError(xerrors.Printer) (next error) { return evt.error } +func (evt SectorDealsExpired) apply(*SectorInfo) {} + type SectorCommitted struct { Proof []byte } diff --git a/extern/storage-sealing/sector_state.go b/extern/storage-sealing/sector_state.go index f3106ea3b98..9c31b952c60 100644 --- a/extern/storage-sealing/sector_state.go +++ b/extern/storage-sealing/sector_state.go @@ -28,6 +28,7 @@ const ( CommitFailed SectorState = "CommitFailed" PackingFailed SectorState = "PackingFailed" FinalizeFailed SectorState = "FinalizeFailed" + DealsExpired SectorState = "DealsExpired" Faulty SectorState = "Faulty" // sector is corrupted or gone for some reason FaultReported SectorState = "FaultReported" // sector has been declared as a fault on chain diff --git a/extern/storage-sealing/states_failed.go b/extern/storage-sealing/states_failed.go index cf829f44fa3..fbdbd6177cf 100644 --- a/extern/storage-sealing/states_failed.go +++ b/extern/storage-sealing/states_failed.go @@ -81,6 +81,12 @@ func (m *Sealing) handlePreCommitFailed(ctx statemachine.Context, sector SectorI return ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("ticket expired error: %w", err)}) case *ErrBadTicket: return ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("bad expired: %w", err)}) + case *ErrInvalidDeals: + // TODO: Deals got reorged, figure out what to do about this + // (this will probably require tracking the deal submit message CID, and re-checking what's on chain) + return xerrors.Errorf("invalid deals in sector %d: %w", sector.SectorNumber, err) + case *ErrExpiredDeals: + return ctx.Send(SectorDealsExpired{xerrors.Errorf("sector deals expired: %w", err)}) case *ErrNoPrecommit: return ctx.Send(SectorRetryPreCommit{}) case *ErrPrecommitOnChain: @@ -88,6 +94,7 @@ func (m *Sealing) handlePreCommitFailed(ctx statemachine.Context, sector SectorI case *ErrSectorNumberAllocated: log.Errorf("handlePreCommitFailed: sector number already allocated, not proceeding: %+v", err) // TODO: check if the sector is committed (not sure how we'd end up here) + // TODO: check on-chain state, adjust local sector number counter to not give out allocated numbers return nil default: return xerrors.Errorf("checkPrecommit sanity check error: %w", err) @@ -157,7 +164,13 @@ func (m *Sealing) handleCommitFailed(ctx statemachine.Context, sector SectorInfo case *ErrExpiredTicket: return ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("ticket expired error: %w", err)}) case *ErrBadTicket: - return ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("bad expired: %w", err)}) + return ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("bad ticket: %w", err)}) + case *ErrInvalidDeals: + // TODO: Deals got reorged, figure out what to do about this + // (this will probably require tracking the deal submit message CID, and re-checking what's on chain) + return xerrors.Errorf("invalid deals in sector %d: %w", sector.SectorNumber, err) + case *ErrExpiredDeals: + return ctx.Send(SectorDealsExpired{xerrors.Errorf("sector deals expired: %w", err)}) case nil: return ctx.Send(SectorChainPreCommitFailed{xerrors.Errorf("no precommit: %w", err)}) case *ErrPrecommitOnChain: @@ -192,6 +205,12 @@ func (m *Sealing) handleCommitFailed(ctx statemachine.Context, sector SectorInfo return ctx.Send(SectorRetryPreCommitWait{}) case *ErrNoPrecommit: return ctx.Send(SectorRetryPreCommit{}) + case *ErrInvalidDeals: + // TODO: Deals got reorged, figure out what to do about this + // (this will probably require tracking the deal submit message CID, and re-checking what's on chain) + return xerrors.Errorf("invalid deals in sector %d: %w", sector.SectorNumber, err) + case *ErrExpiredDeals: + return ctx.Send(SectorDealsExpired{xerrors.Errorf("sector deals expired: %w", err)}) case *ErrCommitWaitFailed: if err := failedCooldown(ctx, sector); err != nil { return err @@ -221,3 +240,24 @@ func (m *Sealing) handleFinalizeFailed(ctx statemachine.Context, sector SectorIn return ctx.Send(SectorRetryFinalize{}) } + +func (m *Sealing) handleDealsExpired(ctx statemachine.Context, sector SectorInfo) error { + // First make vary sure the sector isn't committed + si, err := m.api.StateSectorGetInfo(ctx.Context(), m.maddr, sector.SectorNumber, nil) + if err != nil { + return xerrors.Errorf("getting sector info: %w", err) + } + if si != nil { + // TODO: this should never happen, but in case it does, try to go back to + // the proving state after running some checks + return xerrors.Errorf("sector is committed on-chain, but we're in DealsExpired") + } + + if sector.PreCommitInfo == nil { + // TODO: Create a separate state which will remove those pieces, and go back to PC1 + return xerrors.Errorf("non-precommitted sector with expired deals, can't recover from this yet") + } + + // Not much to do here, we can't go back in time to commit this sector + return ctx.Send(SectorRemove{}) +} diff --git a/extern/storage-sealing/states_sealing.go b/extern/storage-sealing/states_sealing.go index 4dfec6b5236..956d4f46eeb 100644 --- a/extern/storage-sealing/states_sealing.go +++ b/extern/storage-sealing/states_sealing.go @@ -79,7 +79,7 @@ func (m *Sealing) getTicket(ctx statemachine.Context, sector SectorInfo) (abi.Se } func (m *Sealing) handlePreCommit1(ctx statemachine.Context, sector SectorInfo) error { - if err := checkPieces(ctx.Context(), sector, m.api); err != nil { // Sanity check state + if err := checkPieces(ctx.Context(), m.maddr, sector, m.api); err != nil { // Sanity check state switch err.(type) { case *ErrApi: log.Errorf("handlePreCommit1: api error, not proceeding: %+v", err) @@ -155,6 +155,12 @@ func (m *Sealing) handlePreCommitting(ctx statemachine.Context, sector SectorInf return ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("ticket expired: %w", err)}) case *ErrBadTicket: return ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("bad ticket: %w", err)}) + case *ErrInvalidDeals: + // TODO: Deals got reorged, figure out what to do about this + // (this will probably require tracking the deal submit message CID, and re-checking what's on chain) + return xerrors.Errorf("invalid deals in sector %d: %w", sector.SectorNumber, err) + case *ErrExpiredDeals: + return ctx.Send(SectorDealsExpired{xerrors.Errorf("sector deals expired: %w", err)}) case *ErrPrecommitOnChain: return ctx.Send(SectorPreCommitLanded{TipSet: tok}) // we re-did precommit case *ErrSectorNumberAllocated: @@ -402,9 +408,12 @@ func (m *Sealing) handleCommitWait(ctx statemachine.Context, sector SectorInfo) return ctx.Send(SectorCommitFailed{xerrors.Errorf("submitting sector proof failed (exit=%d, msg=%s) (t:%x; s:%x(%d); p:%x)", mw.Receipt.ExitCode, sector.CommitMessage, sector.TicketValue, sector.SeedValue, sector.SeedEpoch, sector.Proof)}) } - _, err = m.api.StateSectorGetInfo(ctx.Context(), m.maddr, sector.SectorNumber, mw.TipSetTok) + si, err := m.api.StateSectorGetInfo(ctx.Context(), m.maddr, sector.SectorNumber, mw.TipSetTok) if err != nil { - return ctx.Send(SectorCommitFailed{xerrors.Errorf("proof validation failed, sector not found in sector set after cron: %w", err)}) + return ctx.Send(SectorCommitFailed{xerrors.Errorf("proof validation failed, calling StateSectorGetInfo: %w", err)}) + } + if si == nil { + return ctx.Send(SectorCommitFailed{xerrors.Errorf("proof validation failed, sector not found in sector set after cron")}) } return ctx.Send(SectorProving{}) diff --git a/extern/storage-sealing/upgrade_queue.go b/extern/storage-sealing/upgrade_queue.go index 12a94f042e5..870f60dbb4b 100644 --- a/extern/storage-sealing/upgrade_queue.go +++ b/extern/storage-sealing/upgrade_queue.go @@ -72,6 +72,10 @@ func (m *Sealing) tryUpgradeSector(ctx context.Context, params *miner.SectorPreC log.Errorf("error calling StateSectorGetInfo for replaced sector: %+v", err) return big.Zero() } + if ri == nil { + log.Errorf("couldn't find sector info for sector to replace: %+v", replace) + return big.Zero() + } if params.Expiration < ri.Expiration { // TODO: Some limit on this diff --git a/node/impl/storminer.go b/node/impl/storminer.go index 2eb9c99055d..3507453dd71 100644 --- a/node/impl/storminer.go +++ b/node/impl/storminer.go @@ -177,6 +177,9 @@ func (sm *StorageMinerAPI) SectorsStatus(ctx context.Context, sid abi.SectorNumb onChainInfo, err := sm.Full.StateSectorGetInfo(ctx, sm.Miner.Address(), sid, types.EmptyTSK) if err != nil { + return sInfo, err + } + if onChainInfo == nil { return sInfo, nil } sInfo.SealProof = onChainInfo.SealProof From 8a45148070718f222c57f1afddfae5c69e3ac178 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 27 Aug 2020 13:53:04 +0200 Subject: [PATCH 07/25] docsgen --- documentation/en/api-methods.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/documentation/en/api-methods.md b/documentation/en/api-methods.md index bd395fc3e80..7f1d79be456 100644 --- a/documentation/en/api-methods.md +++ b/documentation/en/api-methods.md @@ -3603,7 +3603,7 @@ Response: ``` ### StateSectorGetInfo -StateSectorGetInfo returns the on-chain info for the specified miner's sector +StateSectorGetInfo returns the on-chain info for the specified miner's sector. Returns null in case the sector info isn't found NOTE: returned info.Expiration may not be accurate in some cases, use StateSectorExpiration to get accurate expiration epoch From 29078aaae9726d13adecfe8cf252139d8ccecc32 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 27 Aug 2020 14:02:00 +0200 Subject: [PATCH 08/25] storagefsm: Retry sending messages on out-of-gas errors --- extern/storage-sealing/fsm.go | 2 ++ extern/storage-sealing/fsm_events.go | 4 ++++ extern/storage-sealing/states_sealing.go | 18 ++++++++++++++++-- 3 files changed, 22 insertions(+), 2 deletions(-) diff --git a/extern/storage-sealing/fsm.go b/extern/storage-sealing/fsm.go index 2ae94aad40c..d80fa5e944f 100644 --- a/extern/storage-sealing/fsm.go +++ b/extern/storage-sealing/fsm.go @@ -66,6 +66,7 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto PreCommitWait: planOne( on(SectorChainPreCommitFailed{}, PreCommitFailed), on(SectorPreCommitLanded{}, WaitSeed), + on(SectorRetryPreCommit{}, PreCommitting), ), WaitSeed: planOne( on(SectorSeedReady{}, Committing), @@ -79,6 +80,7 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto CommitWait: planOne( on(SectorProving{}, FinalizeSector), on(SectorCommitFailed{}, CommitFailed), + on(SectorRetrySubmitCommit{}, SubmitCommit), ), FinalizeSector: planOne( diff --git a/extern/storage-sealing/fsm_events.go b/extern/storage-sealing/fsm_events.go index 7761ad3e9e4..4ae9d708b8a 100644 --- a/extern/storage-sealing/fsm_events.go +++ b/extern/storage-sealing/fsm_events.go @@ -191,6 +191,10 @@ type SectorCommitFailed struct{ error } func (evt SectorCommitFailed) FormatError(xerrors.Printer) (next error) { return evt.error } func (evt SectorCommitFailed) apply(*SectorInfo) {} +type SectorRetrySubmitCommit struct{} + +func (evt SectorRetrySubmitCommit) apply(*SectorInfo) {} + type SectorDealsExpired struct{ error } func (evt SectorDealsExpired) FormatError(xerrors.Printer) (next error) { return evt.error } diff --git a/extern/storage-sealing/states_sealing.go b/extern/storage-sealing/states_sealing.go index 956d4f46eeb..ae2bea48af3 100644 --- a/extern/storage-sealing/states_sealing.go +++ b/extern/storage-sealing/states_sealing.go @@ -12,6 +12,7 @@ import ( "github.com/filecoin-project/specs-actors/actors/builtin" "github.com/filecoin-project/specs-actors/actors/builtin/miner" "github.com/filecoin-project/specs-actors/actors/crypto" + "github.com/filecoin-project/specs-actors/actors/runtime/exitcode" "github.com/filecoin-project/specs-storage/storage" ) @@ -232,11 +233,18 @@ func (m *Sealing) handlePreCommitWait(ctx statemachine.Context, sector SectorInf return ctx.Send(SectorChainPreCommitFailed{err}) } - if mw.Receipt.ExitCode != 0 { + switch mw.Receipt.ExitCode { + case exitcode.Ok: + // this is what we expect + case exitcode.SysErrOutOfGas: + // gas estimator guessed a wrong number + return ctx.Send(SectorRetryPreCommit{}) + default: log.Error("sector precommit failed: ", mw.Receipt.ExitCode) err := xerrors.Errorf("sector precommit failed: %d", mw.Receipt.ExitCode) return ctx.Send(SectorChainPreCommitFailed{err}) } + log.Info("precommit message landed on chain: ", sector.SectorNumber) return ctx.Send(SectorPreCommitLanded{TipSet: mw.TipSetTok}) @@ -404,7 +412,13 @@ func (m *Sealing) handleCommitWait(ctx statemachine.Context, sector SectorInfo) return ctx.Send(SectorCommitFailed{xerrors.Errorf("failed to wait for porep inclusion: %w", err)}) } - if mw.Receipt.ExitCode != 0 { + switch mw.Receipt.ExitCode { + case exitcode.Ok: + // this is what we expect + case exitcode.SysErrOutOfGas: + // gas estimator guessed a wrong number + return ctx.Send(SectorRetrySubmitCommit{}) + default: return ctx.Send(SectorCommitFailed{xerrors.Errorf("submitting sector proof failed (exit=%d, msg=%s) (t:%x; s:%x(%d); p:%x)", mw.Receipt.ExitCode, sector.CommitMessage, sector.TicketValue, sector.SeedValue, sector.SeedEpoch, sector.Proof)}) } From 7806a9885a97817f44aab3b4fe42e87c63cd07f9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 27 Aug 2020 17:50:37 +0200 Subject: [PATCH 09/25] storagefsm: Record publish deals message CID --- extern/storage-sealing/cbor_gen.go | 49 ++++++++++++++++++++++++++++-- extern/storage-sealing/sealing.go | 2 +- extern/storage-sealing/types.go | 1 + markets/storageadapter/provider.go | 9 ++++-- storage/sectorblocks/blocks.go | 1 + 5 files changed, 57 insertions(+), 5 deletions(-) diff --git a/extern/storage-sealing/cbor_gen.go b/extern/storage-sealing/cbor_gen.go index 5a513dbdb39..d93fdd71214 100644 --- a/extern/storage-sealing/cbor_gen.go +++ b/extern/storage-sealing/cbor_gen.go @@ -135,12 +135,34 @@ func (t *DealInfo) MarshalCBOR(w io.Writer) error { _, err := w.Write(cbg.CborNull) return err } - if _, err := w.Write([]byte{163}); err != nil { + if _, err := w.Write([]byte{164}); err != nil { return err } scratch := make([]byte, 9) + // t.PublishCid (cid.Cid) (struct) + if len("PublishCid") > cbg.MaxLength { + return xerrors.Errorf("Value in field \"PublishCid\" was too long") + } + + if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len("PublishCid"))); err != nil { + return err + } + if _, err := io.WriteString(w, string("PublishCid")); err != nil { + return err + } + + if t.PublishCid == nil { + if _, err := w.Write(cbg.CborNull); err != nil { + return err + } + } else { + if err := cbg.WriteCidBuf(scratch, w, *t.PublishCid); err != nil { + return xerrors.Errorf("failed to write cid field t.PublishCid: %w", err) + } + } + // t.DealID (abi.DealID) (uint64) if len("DealID") > cbg.MaxLength { return xerrors.Errorf("Value in field \"DealID\" was too long") @@ -224,7 +246,30 @@ func (t *DealInfo) UnmarshalCBOR(r io.Reader) error { } switch name { - // t.DealID (abi.DealID) (uint64) + // t.PublishCid (cid.Cid) (struct) + case "PublishCid": + + { + + b, err := br.ReadByte() + if err != nil { + return err + } + if b != cbg.CborNull[0] { + if err := br.UnreadByte(); err != nil { + return err + } + + c, err := cbg.ReadCid(br) + if err != nil { + return xerrors.Errorf("failed to read cid field t.PublishCid: %w", err) + } + + t.PublishCid = &c + } + + } + // t.DealID (abi.DealID) (uint64) case "DealID": { diff --git a/extern/storage-sealing/sealing.go b/extern/storage-sealing/sealing.go index a19ada17654..31fc0faf659 100644 --- a/extern/storage-sealing/sealing.go +++ b/extern/storage-sealing/sealing.go @@ -141,7 +141,7 @@ func (m *Sealing) Stop(ctx context.Context) error { return m.sectors.Stop(ctx) } func (m *Sealing) AddPieceToAnySector(ctx context.Context, size abi.UnpaddedPieceSize, r io.Reader, d DealInfo) (abi.SectorNumber, abi.PaddedPieceSize, error) { - log.Infof("Adding piece for deal %d", d.DealID) + log.Infof("Adding piece for deal %d (publish msg: %s)", d.DealID, d.PublishCid) if (padreader.PaddedSize(uint64(size))) != size { return 0, 0, xerrors.Errorf("cannot allocate unpadded piece") } diff --git a/extern/storage-sealing/types.go b/extern/storage-sealing/types.go index 9d82ea2c2f6..cac1643b328 100644 --- a/extern/storage-sealing/types.go +++ b/extern/storage-sealing/types.go @@ -30,6 +30,7 @@ type Piece struct { // DealInfo is a tuple of deal identity and its schedule type DealInfo struct { + PublishCid *cid.Cid DealID abi.DealID DealSchedule DealSchedule KeepUnsealed bool diff --git a/markets/storageadapter/provider.go b/markets/storageadapter/provider.go index eaa2a3ae2fb..1a66275292b 100644 --- a/markets/storageadapter/provider.go +++ b/markets/storageadapter/provider.go @@ -87,8 +87,13 @@ func (n *ProviderNodeAdapter) PublishDeals(ctx context.Context, deal storagemark } func (n *ProviderNodeAdapter) OnDealComplete(ctx context.Context, deal storagemarket.MinerDeal, pieceSize abi.UnpaddedPieceSize, pieceData io.Reader) (*storagemarket.PackingResult, error) { + if deal.PublishCid == nil { + return nil, xerrors.Errorf("deal.PublishCid can't be nil") + } + p, offset, err := n.secb.AddPiece(ctx, pieceSize, pieceData, sealing.DealInfo{ - DealID: deal.DealID, + DealID: deal.DealID, + PublishCid: deal.PublishCid, DealSchedule: sealing.DealSchedule{ StartEpoch: deal.ClientDealProposal.Proposal.StartEpoch, EndEpoch: deal.ClientDealProposal.Proposal.EndEpoch, @@ -351,7 +356,7 @@ func (n *ProviderNodeAdapter) GetChainHead(ctx context.Context) (shared.TipSetTo } func (n *ProviderNodeAdapter) WaitForMessage(ctx context.Context, mcid cid.Cid, cb func(code exitcode.ExitCode, bytes []byte, err error) error) error { - receipt, err := n.StateWaitMsg(ctx, mcid, build.MessageConfidence) + receipt, err := n.StateWaitMsg(ctx, mcid, 2*build.MessageConfidence) if err != nil { return cb(0, nil, err) } diff --git a/storage/sectorblocks/blocks.go b/storage/sectorblocks/blocks.go index 5bfafe26396..b88ebcbae90 100644 --- a/storage/sectorblocks/blocks.go +++ b/storage/sectorblocks/blocks.go @@ -102,6 +102,7 @@ func (st *SectorBlocks) AddPiece(ctx context.Context, size abi.UnpaddedPieceSize return 0, 0, err } + // TODO: DealID has very low finality here err = st.writeRef(d.DealID, sn, offset, size) if err != nil { return 0, 0, xerrors.Errorf("writeRef: %w", err) From 489d5239a50f28b046fcf1cf167dd53f73d0121f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 27 Aug 2020 21:04:43 +0200 Subject: [PATCH 10/25] storagefsm: Attempt to auto-recover from reorged DealIDs --- extern/storage-sealing/cbor_gen.go | 36 +++++++- extern/storage-sealing/fsm.go | 35 ++++++-- extern/storage-sealing/fsm_events.go | 18 ++++ extern/storage-sealing/sector_state.go | 1 + extern/storage-sealing/states_failed.go | 109 +++++++++++++++++++++-- extern/storage-sealing/states_sealing.go | 8 +- extern/storage-sealing/types.go | 11 +++ 7 files changed, 198 insertions(+), 20 deletions(-) diff --git a/extern/storage-sealing/cbor_gen.go b/extern/storage-sealing/cbor_gen.go index d93fdd71214..b0762618768 100644 --- a/extern/storage-sealing/cbor_gen.go +++ b/extern/storage-sealing/cbor_gen.go @@ -475,7 +475,7 @@ func (t *SectorInfo) MarshalCBOR(w io.Writer) error { _, err := w.Write(cbg.CborNull) return err } - if _, err := w.Write([]byte{182}); err != nil { + if _, err := w.Write([]byte{183}); err != nil { return err } @@ -905,6 +905,29 @@ func (t *SectorInfo) MarshalCBOR(w io.Writer) error { } } + // t.Return (sealing.ReturnState) (string) + if len("Return") > cbg.MaxLength { + return xerrors.Errorf("Value in field \"Return\" was too long") + } + + if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len("Return"))); err != nil { + return err + } + if _, err := io.WriteString(w, string("Return")); err != nil { + return err + } + + if len(t.Return) > cbg.MaxLength { + return xerrors.Errorf("Value in field t.Return was too long") + } + + if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len(t.Return))); err != nil { + return err + } + if _, err := io.WriteString(w, string(t.Return)); err != nil { + return err + } + // t.LastErr (string) (string) if len("LastErr") > cbg.MaxLength { return xerrors.Errorf("Value in field \"LastErr\" was too long") @@ -1407,6 +1430,17 @@ func (t *SectorInfo) UnmarshalCBOR(r io.Reader) error { } } + // t.Return (sealing.ReturnState) (string) + case "Return": + + { + sval, err := cbg.ReadStringBuf(br, scratch) + if err != nil { + return err + } + + t.Return = ReturnState(sval) + } // t.LastErr (string) (string) case "LastErr": diff --git a/extern/storage-sealing/fsm.go b/extern/storage-sealing/fsm.go index d80fa5e944f..2b3a8596545 100644 --- a/extern/storage-sealing/fsm.go +++ b/extern/storage-sealing/fsm.go @@ -50,6 +50,7 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto on(SectorPreCommit1{}, PreCommit2), on(SectorSealPreCommit1Failed{}, SealPreCommit1Failed), on(SectorPackingFailed{}, PackingFailed), + on(SectorInvalidDealIDs{}, RecoverDealIDs), ), PreCommit2: planOne( on(SectorPreCommit2{}, PreCommitting), @@ -62,6 +63,7 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto on(SectorChainPreCommitFailed{}, PreCommitFailed), on(SectorPreCommitLanded{}, WaitSeed), on(SectorDealsExpired{}, DealsExpired), + on(SectorInvalidDealIDs{}, RecoverDealIDs), ), PreCommitWait: planOne( on(SectorChainPreCommitFailed{}, PreCommitFailed), @@ -103,6 +105,7 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto on(SectorSealPreCommit1Failed{}, SealPreCommit1Failed), on(SectorPreCommitLanded{}, WaitSeed), on(SectorDealsExpired{}, DealsExpired), + on(SectorInvalidDealIDs{}, RecoverDealIDs), ), ComputeProofFailed: planOne( on(SectorRetryComputeProof{}, Committing), @@ -118,6 +121,7 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto on(SectorRetryPreCommit{}, PreCommitting), on(SectorRetryCommitWait{}, CommitWait), on(SectorDealsExpired{}, DealsExpired), + on(SectorInvalidDealIDs{}, RecoverDealIDs), ), FinalizeFailed: planOne( on(SectorRetryFinalize{}, FinalizeSector), @@ -125,6 +129,9 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto DealsExpired: planOne( // SectorRemove (global) ), + RecoverDealIDs: planOne( + onReturning(SectorUpdateDealIDs{}), + ), // Post-seal @@ -389,13 +396,30 @@ func final(events []statemachine.Event, state *SectorInfo) (uint64, error) { return 0, xerrors.Errorf("didn't expect any events in state %s, got %+v", state.State, events) } -func on(mut mutator, next SectorState) func() (mutator, SectorState) { - return func() (mutator, SectorState) { - return mut, next +func on(mut mutator, next SectorState) func() (mutator, func(*SectorInfo) error) { + return func() (mutator, func(*SectorInfo) error) { + return mut, func(state *SectorInfo) error { + state.State = next + return nil + } } } -func planOne(ts ...func() (mut mutator, next SectorState)) func(events []statemachine.Event, state *SectorInfo) (uint64, error) { +func onReturning(mut mutator) func() (mutator, func(*SectorInfo) error) { + return func() (mutator, func(*SectorInfo) error) { + return mut, func(state *SectorInfo) error { + if state.Return == "" { + return xerrors.Errorf("return state not set") + } + + state.State = SectorState(state.Return) + state.Return = "" + return nil + } + } +} + +func planOne(ts ...func() (mut mutator, next func(*SectorInfo) error)) func(events []statemachine.Event, state *SectorInfo) (uint64, error) { return func(events []statemachine.Event, state *SectorInfo) (uint64, error) { if gm, ok := events[0].User.(globalMutator); ok { gm.applyGlobal(state) @@ -414,8 +438,7 @@ func planOne(ts ...func() (mut mutator, next SectorState)) func(events []statema } events[0].User.(mutator).apply(state) - state.State = next - return 1, nil + return 1, next(state) } _, ok := events[0].User.(Ignorable) diff --git a/extern/storage-sealing/fsm_events.go b/extern/storage-sealing/fsm_events.go index 4ae9d708b8a..e1ebe34c15f 100644 --- a/extern/storage-sealing/fsm_events.go +++ b/extern/storage-sealing/fsm_events.go @@ -271,6 +271,24 @@ type SectorRetryCommitWait struct{} func (evt SectorRetryCommitWait) apply(state *SectorInfo) {} +type SectorInvalidDealIDs struct{ + Return ReturnState +} + +func (evt SectorInvalidDealIDs) apply(state *SectorInfo) { + state.Return = evt.Return +} + +type SectorUpdateDealIDs struct{ + Updates map[int]abi.DealID +} + +func (evt SectorUpdateDealIDs) apply(state *SectorInfo) { + for i, id := range evt.Updates { + state.Pieces[i].DealInfo.DealID = id + } +} + // Faults type SectorFaulty struct{} diff --git a/extern/storage-sealing/sector_state.go b/extern/storage-sealing/sector_state.go index 9c31b952c60..26a82dc6d54 100644 --- a/extern/storage-sealing/sector_state.go +++ b/extern/storage-sealing/sector_state.go @@ -29,6 +29,7 @@ const ( PackingFailed SectorState = "PackingFailed" FinalizeFailed SectorState = "FinalizeFailed" DealsExpired SectorState = "DealsExpired" + RecoverDealIDs SectorState = "RecoverDealIDs" Faulty SectorState = "Faulty" // sector is corrupted or gone for some reason FaultReported SectorState = "FaultReported" // sector has been declared as a fault on chain diff --git a/extern/storage-sealing/states_failed.go b/extern/storage-sealing/states_failed.go index fbdbd6177cf..f32816df6f5 100644 --- a/extern/storage-sealing/states_failed.go +++ b/extern/storage-sealing/states_failed.go @@ -1,12 +1,18 @@ package sealing import ( + "bytes" "time" "golang.org/x/xerrors" "github.com/filecoin-project/go-statemachine" + "github.com/filecoin-project/specs-actors/actors/abi" + "github.com/filecoin-project/specs-actors/actors/builtin/market" "github.com/filecoin-project/specs-actors/actors/builtin/miner" + "github.com/filecoin-project/specs-actors/actors/runtime/exitcode" + + "github.com/filecoin-project/lotus/extern/sector-storage/zerocomm" ) const minRetryTime = 1 * time.Minute @@ -82,9 +88,8 @@ func (m *Sealing) handlePreCommitFailed(ctx statemachine.Context, sector SectorI case *ErrBadTicket: return ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("bad expired: %w", err)}) case *ErrInvalidDeals: - // TODO: Deals got reorged, figure out what to do about this - // (this will probably require tracking the deal submit message CID, and re-checking what's on chain) - return xerrors.Errorf("invalid deals in sector %d: %w", sector.SectorNumber, err) + log.Warnf("invalid deals in sector %d: %v", sector.SectorNumber, err) + return ctx.Send(SectorInvalidDealIDs{ Return: RetPreCommitFailed }) case *ErrExpiredDeals: return ctx.Send(SectorDealsExpired{xerrors.Errorf("sector deals expired: %w", err)}) case *ErrNoPrecommit: @@ -166,9 +171,8 @@ func (m *Sealing) handleCommitFailed(ctx statemachine.Context, sector SectorInfo case *ErrBadTicket: return ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("bad ticket: %w", err)}) case *ErrInvalidDeals: - // TODO: Deals got reorged, figure out what to do about this - // (this will probably require tracking the deal submit message CID, and re-checking what's on chain) - return xerrors.Errorf("invalid deals in sector %d: %w", sector.SectorNumber, err) + log.Warnf("invalid deals in sector %d: %v", sector.SectorNumber, err) + return ctx.Send(SectorInvalidDealIDs{ Return: RetCommitFailed }) case *ErrExpiredDeals: return ctx.Send(SectorDealsExpired{xerrors.Errorf("sector deals expired: %w", err)}) case nil: @@ -206,9 +210,8 @@ func (m *Sealing) handleCommitFailed(ctx statemachine.Context, sector SectorInfo case *ErrNoPrecommit: return ctx.Send(SectorRetryPreCommit{}) case *ErrInvalidDeals: - // TODO: Deals got reorged, figure out what to do about this - // (this will probably require tracking the deal submit message CID, and re-checking what's on chain) - return xerrors.Errorf("invalid deals in sector %d: %w", sector.SectorNumber, err) + log.Warnf("invalid deals in sector %d: %v", sector.SectorNumber, err) + return ctx.Send(SectorInvalidDealIDs{ Return: RetCommitFailed }) case *ErrExpiredDeals: return ctx.Send(SectorDealsExpired{xerrors.Errorf("sector deals expired: %w", err)}) case *ErrCommitWaitFailed: @@ -261,3 +264,91 @@ func (m *Sealing) handleDealsExpired(ctx statemachine.Context, sector SectorInfo // Not much to do here, we can't go back in time to commit this sector return ctx.Send(SectorRemove{}) } + +func (m *Sealing) handleRecoverDealIDs(ctx statemachine.Context, sector SectorInfo) error { + tok, height, err := m.api.ChainHead(ctx.Context()) + if err != nil { + return xerrors.Errorf("getting chain head: %w", err) + } + + var toFix []int + + for i, p := range sector.Pieces { + // if no deal is associated with the piece, ensure that we added it as + // filler (i.e. ensure that it has a zero PieceCID) + if p.DealInfo == nil { + exp := zerocomm.ZeroPieceCommitment(p.Piece.Size.Unpadded()) + if !p.Piece.PieceCID.Equals(exp) { + return xerrors.Errorf("sector %d piece %d had non-zero PieceCID %+v", sector.SectorNumber, i, p.Piece.PieceCID) + } + continue + } + + proposal, err := m.api.StateMarketStorageDeal(ctx.Context(), p.DealInfo.DealID, tok) + if err != nil { + log.Warn("getting deal %d for piece %d: %+v", p.DealInfo.DealID, i, err) + toFix = append(toFix, i) + continue + } + + if proposal.Provider != m.maddr { + log.Warn("piece %d (of %d) of sector %d refers deal %d with wrong provider: %s != %s", i, len(sector.Pieces), sector.SectorNumber, p.DealInfo.DealID, proposal.Provider, m.maddr) + toFix = append(toFix, i) + continue + } + + if proposal.PieceCID != p.Piece.PieceCID { + log.Warn("piece %d (of %d) of sector %d refers deal %d with wrong PieceCID: %x != %x", i, len(sector.Pieces), sector.SectorNumber, p.DealInfo.DealID, p.Piece.PieceCID, proposal.PieceCID) + toFix = append(toFix, i) + continue + } + + if p.Piece.Size != proposal.PieceSize { + log.Warn("piece %d (of %d) of sector %d refers deal %d with different size: %d != %d", i, len(sector.Pieces), sector.SectorNumber, p.DealInfo.DealID, p.Piece.Size, proposal.PieceSize) + toFix = append(toFix, i) + continue + } + + if height >= proposal.StartEpoch { + // TODO: check if we are in an early enough state (before precommit), try to remove the offending pieces + // (tricky as we have to 'defragment' the sector while doing that, and update piece references for retrieval) + return xerrors.Errorf("can't fix sector deals: piece %d (of %d) of sector %d refers expired deal %d - should start at %d, head %d", i, len(sector.Pieces), sector.SectorNumber, p.DealInfo.DealID, proposal.StartEpoch, height) + } + } + + updates := map[int]abi.DealID{} + for _, i := range toFix { + p := sector.Pieces[i] + + if p.DealInfo.PublishCid == nil { + // TODO: check if we are in an early enough state try to remove this piece + log.Error("can't fix sector deals: piece %d (of %d) of sector %d has nil DealInfo.PublishCid (refers to deal %d)", i, len(sector.Pieces), sector.SectorNumber, p.DealInfo.DealID) + // Not much to do here (and this can only happen for old spacerace sectors) + return ctx.Send(SectorRemove{}) + } + + ml, err := m.api.StateSearchMsg(ctx.Context(), *p.DealInfo.PublishCid) + if err != nil { + return xerrors.Errorf("looking for publish deal message %s (sector %d, piece %d): %w", *p.DealInfo.PublishCid, sector.SectorNumber, i, err) + } + + if ml.Receipt.ExitCode != exitcode.Ok { + return xerrors.Errorf("looking for publish deal message %s (sector %d, piece %d): non-ok exit code: %s", *p.DealInfo.PublishCid, sector.SectorNumber, i, ml.Receipt.ExitCode) + } + + var retval market.PublishStorageDealsReturn + if err := retval.UnmarshalCBOR(bytes.NewReader(ml.Receipt.Return)); err != nil { + return xerrors.Errorf("looking for publish deal message: unmarshaling message return: %w", err) + } + + if len(retval.IDs) != 1 { + // market currently only ever sends messages with 1 deal + return xerrors.Errorf("can't recover dealIDs from publish deal message with more than 1 deal") + } + + updates[i] = retval.IDs[0] + } + + // Not much to do here, we can't go back in time to commit this sector + return ctx.Send(SectorUpdateDealIDs{Updates: updates}) +} diff --git a/extern/storage-sealing/states_sealing.go b/extern/storage-sealing/states_sealing.go index ae2bea48af3..8301c043114 100644 --- a/extern/storage-sealing/states_sealing.go +++ b/extern/storage-sealing/states_sealing.go @@ -86,7 +86,8 @@ func (m *Sealing) handlePreCommit1(ctx statemachine.Context, sector SectorInfo) log.Errorf("handlePreCommit1: api error, not proceeding: %+v", err) return nil case *ErrInvalidDeals: - return ctx.Send(SectorPackingFailed{xerrors.Errorf("invalid dealIDs in sector: %w", err)}) + log.Warnf("invalid deals in sector %d: %v", sector.SectorNumber, err) + return ctx.Send(SectorInvalidDealIDs{ Return: RetPreCommit1 }) case *ErrExpiredDeals: // Probably not much we can do here, maybe re-pack the sector? return ctx.Send(SectorPackingFailed{xerrors.Errorf("expired dealIDs in sector: %w", err)}) default: @@ -157,9 +158,8 @@ func (m *Sealing) handlePreCommitting(ctx statemachine.Context, sector SectorInf case *ErrBadTicket: return ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("bad ticket: %w", err)}) case *ErrInvalidDeals: - // TODO: Deals got reorged, figure out what to do about this - // (this will probably require tracking the deal submit message CID, and re-checking what's on chain) - return xerrors.Errorf("invalid deals in sector %d: %w", sector.SectorNumber, err) + log.Warnf("invalid deals in sector %d: %v", sector.SectorNumber, err) + return ctx.Send(SectorInvalidDealIDs{ Return: RetPreCommitting }) case *ErrExpiredDeals: return ctx.Send(SectorDealsExpired{xerrors.Errorf("sector deals expired: %w", err)}) case *ErrPrecommitOnChain: diff --git a/extern/storage-sealing/types.go b/extern/storage-sealing/types.go index cac1643b328..ea72d969151 100644 --- a/extern/storage-sealing/types.go +++ b/extern/storage-sealing/types.go @@ -54,6 +54,14 @@ type Log struct { Kind string } +type ReturnState string +const ( + RetPreCommit1 = ReturnState(PreCommit1) + RetPreCommitting = ReturnState(PreCommitting) + RetPreCommitFailed = ReturnState(PreCommitFailed) + RetCommitFailed = ReturnState(CommitFailed) +) + type SectorInfo struct { State SectorState SectorNumber abi.SectorNumber @@ -91,6 +99,9 @@ type SectorInfo struct { // Faults FaultReportMsg *cid.Cid + // Recovery + Return ReturnState + // Debug LastErr string From 15204a9b903cc7452595dfb23406dadc02c5fbdd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 27 Aug 2020 21:34:45 +0200 Subject: [PATCH 11/25] storagefsm: fix tests --- extern/storage-sealing/fsm_test.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/extern/storage-sealing/fsm_test.go b/extern/storage-sealing/fsm_test.go index f41d8c535ee..36e61305b8f 100644 --- a/extern/storage-sealing/fsm_test.go +++ b/extern/storage-sealing/fsm_test.go @@ -16,7 +16,7 @@ func init() { } func (t *test) planSingle(evt interface{}) { - _, err := t.s.plan([]statemachine.Event{{User: evt}}, t.state) + _, _, err := t.s.plan([]statemachine.Event{{User: evt}}, t.state) require.NoError(t.t, err) } @@ -98,12 +98,12 @@ func TestSeedRevert(t *testing.T) { m.planSingle(SectorSeedReady{}) require.Equal(m.t, m.state.State, Committing) - _, err := m.s.plan([]statemachine.Event{{User: SectorSeedReady{SeedValue: nil, SeedEpoch: 5}}, {User: SectorCommitted{}}}, m.state) + _, _, err := m.s.plan([]statemachine.Event{{User: SectorSeedReady{SeedValue: nil, SeedEpoch: 5}}, {User: SectorCommitted{}}}, m.state) require.NoError(t, err) require.Equal(m.t, m.state.State, Committing) // not changing the seed this time - _, err = m.s.plan([]statemachine.Event{{User: SectorSeedReady{SeedValue: nil, SeedEpoch: 5}}, {User: SectorCommitted{}}}, m.state) + _, _, err = m.s.plan([]statemachine.Event{{User: SectorSeedReady{SeedValue: nil, SeedEpoch: 5}}, {User: SectorCommitted{}}}, m.state) require.NoError(t, err) require.Equal(m.t, m.state.State, CommitWait) @@ -129,7 +129,8 @@ func TestPlanCommittingHandlesSectorCommitFailed(t *testing.T) { events := []statemachine.Event{{User: SectorCommitFailed{}}} - require.NoError(t, planCommitting(events, m.state)) + _, err := planCommitting(events, m.state) + require.NoError(t, err) require.Equal(t, CommitFailed, m.state.State) } From d8e58e67c6787c1d6b7d5965d7e2cc85f5ca7e3c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 27 Aug 2020 22:41:35 +0200 Subject: [PATCH 12/25] storagefsm: Treat PackingFailed sectors as expired --- cmd/lotus-storage-miner/info.go | 1 + extern/storage-sealing/fsm.go | 7 +++++-- extern/storage-sealing/fsm_events.go | 4 ---- extern/storage-sealing/fsm_test.go | 6 ++++++ extern/storage-sealing/sector_state.go | 2 +- extern/storage-sealing/states_failed.go | 2 +- extern/storage-sealing/states_sealing.go | 2 +- 7 files changed, 15 insertions(+), 9 deletions(-) diff --git a/cmd/lotus-storage-miner/info.go b/cmd/lotus-storage-miner/info.go index 3a773cf3e34..dbb93c9726b 100644 --- a/cmd/lotus-storage-miner/info.go +++ b/cmd/lotus-storage-miner/info.go @@ -267,6 +267,7 @@ var stateList = []stateMeta{ {col: color.FgRed, state: sealing.FaultedFinal}, {col: color.FgRed, state: sealing.RemoveFailed}, {col: color.FgRed, state: sealing.DealsExpired}, + {col: color.FgRed, state: sealing.RecoverDealIDs}, } func init() { diff --git a/extern/storage-sealing/fsm.go b/extern/storage-sealing/fsm.go index 2b3a8596545..2dc43e5d077 100644 --- a/extern/storage-sealing/fsm.go +++ b/extern/storage-sealing/fsm.go @@ -49,13 +49,12 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto PreCommit1: planOne( on(SectorPreCommit1{}, PreCommit2), on(SectorSealPreCommit1Failed{}, SealPreCommit1Failed), - on(SectorPackingFailed{}, PackingFailed), + on(SectorDealsExpired{}, DealsExpired), on(SectorInvalidDealIDs{}, RecoverDealIDs), ), PreCommit2: planOne( on(SectorPreCommit2{}, PreCommitting), on(SectorSealPreCommit2Failed{}, SealPreCommit2Failed), - on(SectorPackingFailed{}, PackingFailed), ), PreCommitting: planOne( on(SectorSealPreCommit1Failed{}, SealPreCommit1Failed), @@ -126,6 +125,7 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto FinalizeFailed: planOne( on(SectorRetryFinalize{}, FinalizeSector), ), + PackingFailed: planOne(), // TODO: Deprecated, remove DealsExpired: planOne( // SectorRemove (global) ), @@ -290,6 +290,9 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta return m.handleCommitFailed, processed, nil case FinalizeFailed: return m.handleFinalizeFailed, processed, nil + case PackingFailed: // DEPRECATED: remove this for the next reset + state.State = DealsExpired + fallthrough case DealsExpired: return m.handleDealsExpired, processed, nil diff --git a/extern/storage-sealing/fsm_events.go b/extern/storage-sealing/fsm_events.go index e1ebe34c15f..63e6e684bec 100644 --- a/extern/storage-sealing/fsm_events.go +++ b/extern/storage-sealing/fsm_events.go @@ -101,10 +101,6 @@ func (evt SectorPacked) apply(state *SectorInfo) { } } -type SectorPackingFailed struct{ error } - -func (evt SectorPackingFailed) apply(*SectorInfo) {} - type SectorPreCommit1 struct { PreCommit1Out storage.PreCommit1Out TicketValue abi.SealRandomness diff --git a/extern/storage-sealing/fsm_test.go b/extern/storage-sealing/fsm_test.go index 36e61305b8f..c67decbebf8 100644 --- a/extern/storage-sealing/fsm_test.go +++ b/extern/storage-sealing/fsm_test.go @@ -58,6 +58,9 @@ func TestHappyPath(t *testing.T) { require.Equal(m.t, m.state.State, Committing) m.planSingle(SectorCommitted{}) + require.Equal(m.t, m.state.State, SubmitCommit) + + m.planSingle(SectorCommitSubmitted{}) require.Equal(m.t, m.state.State, CommitWait) m.planSingle(SectorProving{}) @@ -105,6 +108,9 @@ func TestSeedRevert(t *testing.T) { // not changing the seed this time _, _, err = m.s.plan([]statemachine.Event{{User: SectorSeedReady{SeedValue: nil, SeedEpoch: 5}}, {User: SectorCommitted{}}}, m.state) require.NoError(t, err) + require.Equal(m.t, m.state.State, SubmitCommit) + + m.planSingle(SectorCommitSubmitted{}) require.Equal(m.t, m.state.State, CommitWait) m.planSingle(SectorProving{}) diff --git a/extern/storage-sealing/sector_state.go b/extern/storage-sealing/sector_state.go index 26a82dc6d54..4e674603d13 100644 --- a/extern/storage-sealing/sector_state.go +++ b/extern/storage-sealing/sector_state.go @@ -26,7 +26,7 @@ const ( PreCommitFailed SectorState = "PreCommitFailed" ComputeProofFailed SectorState = "ComputeProofFailed" CommitFailed SectorState = "CommitFailed" - PackingFailed SectorState = "PackingFailed" + PackingFailed SectorState = "PackingFailed" // TODO: deprecated, remove FinalizeFailed SectorState = "FinalizeFailed" DealsExpired SectorState = "DealsExpired" RecoverDealIDs SectorState = "RecoverDealIDs" diff --git a/extern/storage-sealing/states_failed.go b/extern/storage-sealing/states_failed.go index f32816df6f5..3976fe9e4e7 100644 --- a/extern/storage-sealing/states_failed.go +++ b/extern/storage-sealing/states_failed.go @@ -258,7 +258,7 @@ func (m *Sealing) handleDealsExpired(ctx statemachine.Context, sector SectorInfo if sector.PreCommitInfo == nil { // TODO: Create a separate state which will remove those pieces, and go back to PC1 - return xerrors.Errorf("non-precommitted sector with expired deals, can't recover from this yet") + log.Errorf("non-precommitted sector with expired deals, can't recover from this yet") } // Not much to do here, we can't go back in time to commit this sector diff --git a/extern/storage-sealing/states_sealing.go b/extern/storage-sealing/states_sealing.go index 8301c043114..c90e5d39f67 100644 --- a/extern/storage-sealing/states_sealing.go +++ b/extern/storage-sealing/states_sealing.go @@ -89,7 +89,7 @@ func (m *Sealing) handlePreCommit1(ctx statemachine.Context, sector SectorInfo) log.Warnf("invalid deals in sector %d: %v", sector.SectorNumber, err) return ctx.Send(SectorInvalidDealIDs{ Return: RetPreCommit1 }) case *ErrExpiredDeals: // Probably not much we can do here, maybe re-pack the sector? - return ctx.Send(SectorPackingFailed{xerrors.Errorf("expired dealIDs in sector: %w", err)}) + return ctx.Send(SectorDealsExpired{xerrors.Errorf("expired dealIDs in sector: %w", err)}) default: return xerrors.Errorf("checkPieces sanity check error: %w", err) } From 59f554b658ef66fea5789bdbd8c8894305f2465d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 27 Aug 2020 23:14:33 +0200 Subject: [PATCH 13/25] sealing sched: Show waiting tasks assigned to workers in sealing jobs cli --- cmd/lotus-storage-miner/sealing.go | 11 ++++++-- extern/sector-storage/sched.go | 31 +++++++++++++++-------- extern/sector-storage/stats.go | 18 ++++++++++++- extern/sector-storage/storiface/worker.go | 3 ++- 4 files changed, 49 insertions(+), 14 deletions(-) diff --git a/cmd/lotus-storage-miner/sealing.go b/cmd/lotus-storage-miner/sealing.go index 2f966dcca9f..1e34859d7d4 100644 --- a/cmd/lotus-storage-miner/sealing.go +++ b/cmd/lotus-storage-miner/sealing.go @@ -155,6 +155,9 @@ var sealingJobsCmd = &cli.Command{ // oldest first sort.Slice(lines, func(i, j int) bool { + if lines[i].RunWait != lines[j].RunWait { + return !lines[i].RunWait // already running tasks first + } return lines[i].Start.Before(lines[j].Start) }) @@ -170,10 +173,14 @@ var sealingJobsCmd = &cli.Command{ } tw := tabwriter.NewWriter(os.Stdout, 2, 4, 2, ' ', 0) - _, _ = fmt.Fprintf(tw, "ID\tSector\tWorker\tHostname\tTask\tTime\n") + _, _ = fmt.Fprintf(tw, "ID\tSector\tWorker\tHostname\tTask\tState\tTime\n") for _, l := range lines { - _, _ = fmt.Fprintf(tw, "%d\t%d\t%d\t%s\t%s\t%s\n", l.ID, l.Sector.Number, l.wid, workerHostnames[l.wid], l.Task.Short(), time.Now().Sub(l.Start).Truncate(time.Millisecond*100)) + state := "assigned" + if !l.RunWait { + state = "running" + } + _, _ = fmt.Fprintf(tw, "%d\t%d\t%d\t%s\t%s\t%s\t%s\n", l.ID, l.Sector.Number, l.wid, workerHostnames[l.wid], l.Task.Short(), state, time.Now().Sub(l.Start).Truncate(time.Millisecond*100)) } return tw.Flush() diff --git a/extern/sector-storage/sched.go b/extern/sector-storage/sched.go index 7a4b6f9efcc..43d5dc15826 100644 --- a/extern/sector-storage/sched.go +++ b/extern/sector-storage/sched.go @@ -85,6 +85,9 @@ type workerHandle struct { lk sync.Mutex + wndLk sync.Mutex + activeWindows []*schedWindow + // stats / tracking wt *workTracker @@ -123,6 +126,8 @@ type workerRequest struct { prepare WorkerAction work WorkerAction + start time.Time + index int // The index of the item in the heap. indexHeap int @@ -171,6 +176,8 @@ func (sh *scheduler) Schedule(ctx context.Context, sector abi.SectorID, taskType prepare: prepare, work: work, + start: time.Now(), + ret: ret, ctx: ctx, }: @@ -475,8 +482,6 @@ func (sh *scheduler) runWorker(wid WorkerID) { taskDone := make(chan struct{}, 1) windowsRequested := 0 - var activeWindows []*schedWindow - ctx, cancel := context.WithCancel(context.TODO()) defer cancel() @@ -510,7 +515,9 @@ func (sh *scheduler) runWorker(wid WorkerID) { select { case w := <-scheduledWindows: - activeWindows = append(activeWindows, w) + worker.wndLk.Lock() + worker.activeWindows = append(worker.activeWindows, w) + worker.wndLk.Unlock() case <-taskDone: log.Debugw("task done", "workerid", wid) case <-sh.closing: @@ -521,12 +528,14 @@ func (sh *scheduler) runWorker(wid WorkerID) { return } + worker.wndLk.Lock() + assignLoop: // process windows in order - for len(activeWindows) > 0 { + for len(worker.activeWindows) > 0 { // process tasks within a window in order - for len(activeWindows[0].todo) > 0 { - todo := activeWindows[0].todo[0] + for len(worker.activeWindows[0].todo) > 0 { + todo := worker.activeWindows[0].todo[0] needRes := ResourceTable[todo.taskType][sh.spt] sh.workersLk.RLock() @@ -548,15 +557,17 @@ func (sh *scheduler) runWorker(wid WorkerID) { go todo.respond(xerrors.Errorf("assignWorker error: %w", err)) } - activeWindows[0].todo = activeWindows[0].todo[1:] + worker.activeWindows[0].todo = worker.activeWindows[0].todo[1:] } - copy(activeWindows, activeWindows[1:]) - activeWindows[len(activeWindows)-1] = nil - activeWindows = activeWindows[:len(activeWindows)-1] + copy(worker.activeWindows, worker.activeWindows[1:]) + worker.activeWindows[len(worker.activeWindows)-1] = nil + worker.activeWindows = worker.activeWindows[:len(worker.activeWindows)-1] windowsRequested-- } + + worker.wndLk.Unlock() } }() } diff --git a/extern/sector-storage/stats.go b/extern/sector-storage/stats.go index 9abbdb83aab..a915c432056 100644 --- a/extern/sector-storage/stats.go +++ b/extern/sector-storage/stats.go @@ -1,6 +1,8 @@ package sectorstorage -import "github.com/filecoin-project/lotus/extern/sector-storage/storiface" +import ( + "github.com/filecoin-project/lotus/extern/sector-storage/storiface" +) func (m *Manager) WorkerStats() map[uint64]storiface.WorkerStats { m.sched.workersLk.RLock() @@ -29,6 +31,20 @@ func (m *Manager) WorkerJobs() map[uint64][]storiface.WorkerJob { for id, handle := range m.sched.workers { out[uint64(id)] = handle.wt.Running() + + handle.wndLk.Lock() + for _, window := range handle.activeWindows { + for _, request := range window.todo { + out[uint64(id)] = append(out[uint64(id)], storiface.WorkerJob{ + ID: 0, + Sector: request.sector, + Task: request.taskType, + RunWait: true, + Start: request.start, + }) + } + } + handle.wndLk.Unlock() } return out diff --git a/extern/sector-storage/storiface/worker.go b/extern/sector-storage/storiface/worker.go index 37c44703100..1140ed4dfd3 100644 --- a/extern/sector-storage/storiface/worker.go +++ b/extern/sector-storage/storiface/worker.go @@ -37,5 +37,6 @@ type WorkerJob struct { Sector abi.SectorID Task sealtasks.TaskType - Start time.Time + RunWait bool + Start time.Time } From f2bd680cc5fbbe013645ce414c49b71beaee3c15 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 27 Aug 2020 23:14:46 +0200 Subject: [PATCH 14/25] gofmt --- extern/sector-storage/sched.go | 2 +- extern/storage-sealing/fsm.go | 2 +- extern/storage-sealing/fsm_events.go | 4 ++-- extern/storage-sealing/states_failed.go | 6 +++--- extern/storage-sealing/states_sealing.go | 4 ++-- extern/storage-sealing/types.go | 7 ++++--- 6 files changed, 13 insertions(+), 12 deletions(-) diff --git a/extern/sector-storage/sched.go b/extern/sector-storage/sched.go index 43d5dc15826..9993c6c8e16 100644 --- a/extern/sector-storage/sched.go +++ b/extern/sector-storage/sched.go @@ -85,7 +85,7 @@ type workerHandle struct { lk sync.Mutex - wndLk sync.Mutex + wndLk sync.Mutex activeWindows []*schedWindow // stats / tracking diff --git a/extern/storage-sealing/fsm.go b/extern/storage-sealing/fsm.go index 2dc43e5d077..1f04abd56a3 100644 --- a/extern/storage-sealing/fsm.go +++ b/extern/storage-sealing/fsm.go @@ -126,7 +126,7 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto on(SectorRetryFinalize{}, FinalizeSector), ), PackingFailed: planOne(), // TODO: Deprecated, remove - DealsExpired: planOne( + DealsExpired: planOne( // SectorRemove (global) ), RecoverDealIDs: planOne( diff --git a/extern/storage-sealing/fsm_events.go b/extern/storage-sealing/fsm_events.go index 63e6e684bec..8649e6c5ea5 100644 --- a/extern/storage-sealing/fsm_events.go +++ b/extern/storage-sealing/fsm_events.go @@ -267,7 +267,7 @@ type SectorRetryCommitWait struct{} func (evt SectorRetryCommitWait) apply(state *SectorInfo) {} -type SectorInvalidDealIDs struct{ +type SectorInvalidDealIDs struct { Return ReturnState } @@ -275,7 +275,7 @@ func (evt SectorInvalidDealIDs) apply(state *SectorInfo) { state.Return = evt.Return } -type SectorUpdateDealIDs struct{ +type SectorUpdateDealIDs struct { Updates map[int]abi.DealID } diff --git a/extern/storage-sealing/states_failed.go b/extern/storage-sealing/states_failed.go index 3976fe9e4e7..85744edd4a8 100644 --- a/extern/storage-sealing/states_failed.go +++ b/extern/storage-sealing/states_failed.go @@ -89,7 +89,7 @@ func (m *Sealing) handlePreCommitFailed(ctx statemachine.Context, sector SectorI return ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("bad expired: %w", err)}) case *ErrInvalidDeals: log.Warnf("invalid deals in sector %d: %v", sector.SectorNumber, err) - return ctx.Send(SectorInvalidDealIDs{ Return: RetPreCommitFailed }) + return ctx.Send(SectorInvalidDealIDs{Return: RetPreCommitFailed}) case *ErrExpiredDeals: return ctx.Send(SectorDealsExpired{xerrors.Errorf("sector deals expired: %w", err)}) case *ErrNoPrecommit: @@ -172,7 +172,7 @@ func (m *Sealing) handleCommitFailed(ctx statemachine.Context, sector SectorInfo return ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("bad ticket: %w", err)}) case *ErrInvalidDeals: log.Warnf("invalid deals in sector %d: %v", sector.SectorNumber, err) - return ctx.Send(SectorInvalidDealIDs{ Return: RetCommitFailed }) + return ctx.Send(SectorInvalidDealIDs{Return: RetCommitFailed}) case *ErrExpiredDeals: return ctx.Send(SectorDealsExpired{xerrors.Errorf("sector deals expired: %w", err)}) case nil: @@ -211,7 +211,7 @@ func (m *Sealing) handleCommitFailed(ctx statemachine.Context, sector SectorInfo return ctx.Send(SectorRetryPreCommit{}) case *ErrInvalidDeals: log.Warnf("invalid deals in sector %d: %v", sector.SectorNumber, err) - return ctx.Send(SectorInvalidDealIDs{ Return: RetCommitFailed }) + return ctx.Send(SectorInvalidDealIDs{Return: RetCommitFailed}) case *ErrExpiredDeals: return ctx.Send(SectorDealsExpired{xerrors.Errorf("sector deals expired: %w", err)}) case *ErrCommitWaitFailed: diff --git a/extern/storage-sealing/states_sealing.go b/extern/storage-sealing/states_sealing.go index c90e5d39f67..7693f26ad7f 100644 --- a/extern/storage-sealing/states_sealing.go +++ b/extern/storage-sealing/states_sealing.go @@ -87,7 +87,7 @@ func (m *Sealing) handlePreCommit1(ctx statemachine.Context, sector SectorInfo) return nil case *ErrInvalidDeals: log.Warnf("invalid deals in sector %d: %v", sector.SectorNumber, err) - return ctx.Send(SectorInvalidDealIDs{ Return: RetPreCommit1 }) + return ctx.Send(SectorInvalidDealIDs{Return: RetPreCommit1}) case *ErrExpiredDeals: // Probably not much we can do here, maybe re-pack the sector? return ctx.Send(SectorDealsExpired{xerrors.Errorf("expired dealIDs in sector: %w", err)}) default: @@ -159,7 +159,7 @@ func (m *Sealing) handlePreCommitting(ctx statemachine.Context, sector SectorInf return ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("bad ticket: %w", err)}) case *ErrInvalidDeals: log.Warnf("invalid deals in sector %d: %v", sector.SectorNumber, err) - return ctx.Send(SectorInvalidDealIDs{ Return: RetPreCommitting }) + return ctx.Send(SectorInvalidDealIDs{Return: RetPreCommitting}) case *ErrExpiredDeals: return ctx.Send(SectorDealsExpired{xerrors.Errorf("sector deals expired: %w", err)}) case *ErrPrecommitOnChain: diff --git a/extern/storage-sealing/types.go b/extern/storage-sealing/types.go index ea72d969151..99cce77149e 100644 --- a/extern/storage-sealing/types.go +++ b/extern/storage-sealing/types.go @@ -55,11 +55,12 @@ type Log struct { } type ReturnState string + const ( - RetPreCommit1 = ReturnState(PreCommit1) - RetPreCommitting = ReturnState(PreCommitting) + RetPreCommit1 = ReturnState(PreCommit1) + RetPreCommitting = ReturnState(PreCommitting) RetPreCommitFailed = ReturnState(PreCommitFailed) - RetCommitFailed = ReturnState(CommitFailed) + RetCommitFailed = ReturnState(CommitFailed) ) type SectorInfo struct { From 7fdffc0340e4de241cb5f65a842a2f1a7c03d585 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 27 Aug 2020 23:28:52 +0200 Subject: [PATCH 15/25] sealing sched: Give more priority to tasks (re)moving data --- extern/sector-storage/request_queue.go | 5 +++++ extern/sector-storage/sealtasks/task.go | 24 +++++++++++++++--------- 2 files changed, 20 insertions(+), 9 deletions(-) diff --git a/extern/sector-storage/request_queue.go b/extern/sector-storage/request_queue.go index 85d3abf4677..9247ce24a9a 100644 --- a/extern/sector-storage/request_queue.go +++ b/extern/sector-storage/request_queue.go @@ -7,6 +7,11 @@ type requestQueue []*workerRequest func (q requestQueue) Len() int { return len(q) } func (q requestQueue) Less(i, j int) bool { + oneMuchLess, muchLess := q[i].taskType.MuchLess(q[j].taskType) + if oneMuchLess { + return muchLess + } + if q[i].priority != q[j].priority { return q[i].priority > q[j].priority } diff --git a/extern/sector-storage/sealtasks/task.go b/extern/sector-storage/sealtasks/task.go index ad5ce01bb7b..4174373a6cf 100644 --- a/extern/sector-storage/sealtasks/task.go +++ b/extern/sector-storage/sealtasks/task.go @@ -17,15 +17,15 @@ const ( ) var order = map[TaskType]int{ - TTAddPiece: 7, - TTPreCommit1: 6, - TTPreCommit2: 5, - TTCommit2: 4, - TTCommit1: 3, - TTFetch: 2, - TTFinalize: 1, - TTUnseal: 0, - TTReadUnsealed: 0, + TTAddPiece: 6, // least priority + TTPreCommit1: 5, + TTPreCommit2: 4, + TTCommit2: 3, + TTCommit1: 2, + TTUnseal: 1, + TTFetch: -1, + TTReadUnsealed: -1, + TTFinalize: -2, // most priority } var shortNames = map[TaskType]string{ @@ -43,6 +43,12 @@ var shortNames = map[TaskType]string{ TTReadUnsealed: "RD ", } +func (a TaskType) MuchLess(b TaskType) (bool, bool) { + oa, ob := order[a], order[b] + oneNegative := oa^ob < 0 + return oneNegative, oa < ob +} + func (a TaskType) Less(b TaskType) bool { return order[a] < order[b] } From 59d2034cbb1d66c3735bfff011455a8e1497c621 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 27 Aug 2020 23:58:37 +0200 Subject: [PATCH 16/25] sealing sched: Wait a bit for tasks to come in on restart --- extern/sector-storage/manager.go | 29 ++++++++++++----------------- extern/sector-storage/sched.go | 17 +++++++++++++++-- 2 files changed, 27 insertions(+), 19 deletions(-) diff --git a/extern/sector-storage/manager.go b/extern/sector-storage/manager.go index b633850b36b..46ac4302d16 100644 --- a/extern/sector-storage/manager.go +++ b/extern/sector-storage/manager.go @@ -6,8 +6,8 @@ import ( "io" "net/http" - "github.com/filecoin-project/lotus/extern/sector-storage/fsutil" + "github.com/hashicorp/go-multierror" "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log/v2" "github.com/mitchellh/go-homedir" @@ -17,6 +17,7 @@ import ( "github.com/filecoin-project/specs-storage/storage" "github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper" + "github.com/filecoin-project/lotus/extern/sector-storage/fsutil" "github.com/filecoin-project/lotus/extern/sector-storage/sealtasks" "github.com/filecoin-project/lotus/extern/sector-storage/stores" "github.com/filecoin-project/lotus/extern/sector-storage/storiface" @@ -463,25 +464,19 @@ func (m *Manager) Remove(ctx context.Context, sector abi.SectorID) error { return xerrors.Errorf("acquiring sector lock: %w", err) } - unsealed := stores.FTUnsealed - { - unsealedStores, err := m.index.StorageFindSector(ctx, sector, stores.FTUnsealed, 0, false) - if err != nil { - return xerrors.Errorf("finding unsealed sector: %w", err) - } + var err error - if len(unsealedStores) == 0 { // can be already removed - unsealed = stores.FTNone - } + if rerr := m.storage.Remove(ctx, sector, stores.FTSealed, true); rerr != nil { + err = multierror.Append(err, xerrors.Errorf("removing sector (sealed): %w", rerr)) + } + if rerr := m.storage.Remove(ctx, sector, stores.FTCache, true); rerr != nil { + err = multierror.Append(err, xerrors.Errorf("removing sector (cache): %w", rerr)) + } + if rerr := m.storage.Remove(ctx, sector, stores.FTUnsealed, true); rerr != nil { + err = multierror.Append(err, xerrors.Errorf("removing sector (unsealed): %w", rerr)) } - selector := newExistingSelector(m.index, sector, stores.FTCache|stores.FTSealed, false) - - return m.sched.Schedule(ctx, sector, sealtasks.TTFinalize, selector, - schedFetch(sector, stores.FTCache|stores.FTSealed|unsealed, stores.PathStorage, stores.AcquireMove), - func(ctx context.Context, w Worker) error { - return w.Remove(ctx, sector) - }) + return err } func (m *Manager) StorageLocal(ctx context.Context) (map[stores.ID]string, error) { diff --git a/extern/sector-storage/sched.go b/extern/sector-storage/sched.go index 9993c6c8e16..e7ad7d6b0fb 100644 --- a/extern/sector-storage/sched.go +++ b/extern/sector-storage/sched.go @@ -21,6 +21,7 @@ type schedPrioCtxKey int var SchedPriorityKey schedPrioCtxKey var DefaultSchedPriority = 0 var SelectorTimeout = 5 * time.Second +var InitWait = 3 * time.Second var ( SchedWindows = 2 @@ -221,6 +222,9 @@ func (sh *scheduler) runSched() { go sh.runWorkerWatcher() + iw := time.After(InitWait) + var initialised bool + for { select { case w := <-sh.newWorkers: @@ -231,18 +235,27 @@ func (sh *scheduler) runSched() { case req := <-sh.schedule: sh.schedQueue.Push(req) - sh.trySched() + if initialised { + sh.trySched() + } if sh.testSync != nil { sh.testSync <- struct{}{} } case req := <-sh.windowRequests: sh.openWindows = append(sh.openWindows, req) - sh.trySched() + if initialised { + sh.trySched() + } case ireq := <-sh.info: ireq(sh.diag()) + case <-iw: + initialised = true + iw = nil + + sh.trySched() case <-sh.closing: sh.schedClose() return From a6492b1ed6c51d42bf47708c201e13fbd4335340 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 27 Aug 2020 23:59:01 +0200 Subject: [PATCH 17/25] storagefsm: Auto-retry failed remove --- extern/storage-sealing/fsm.go | 6 ++++++ extern/storage-sealing/states_failed.go | 8 ++++++++ 2 files changed, 14 insertions(+) diff --git a/extern/storage-sealing/fsm.go b/extern/storage-sealing/fsm.go index 1f04abd56a3..4af860a545d 100644 --- a/extern/storage-sealing/fsm.go +++ b/extern/storage-sealing/fsm.go @@ -143,6 +143,9 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto on(SectorRemoved{}, Removed), on(SectorRemoveFailed{}, RemoveFailed), ), + RemoveFailed: planOne( + // SectorRemove (global) + ), Faulty: planOne( on(SectorFaultReported{}, FaultReported), ), @@ -304,6 +307,9 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta case Removed: return nil, processed, nil + case RemoveFailed: + return m.handleRemoveFailed, processed, nil + // Faults case Faulty: return m.handleFaulty, processed, nil diff --git a/extern/storage-sealing/states_failed.go b/extern/storage-sealing/states_failed.go index 85744edd4a8..7b6a7a89212 100644 --- a/extern/storage-sealing/states_failed.go +++ b/extern/storage-sealing/states_failed.go @@ -244,6 +244,14 @@ func (m *Sealing) handleFinalizeFailed(ctx statemachine.Context, sector SectorIn return ctx.Send(SectorRetryFinalize{}) } +func (m *Sealing) handleRemoveFailed(ctx statemachine.Context, sector SectorInfo) error { + if err := failedCooldown(ctx, sector); err != nil { + return err + } + + return ctx.Send(SectorRemove{}) +} + func (m *Sealing) handleDealsExpired(ctx statemachine.Context, sector SectorInfo) error { // First make vary sure the sector isn't committed si, err := m.api.StateSectorGetInfo(ctx.Context(), m.maddr, sector.SectorNumber, nil) From 1097d29213aede2aa47d4784397c6c8842289279 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 28 Aug 2020 00:03:42 +0200 Subject: [PATCH 18/25] sealing sched: Call trySched less when there are many tasks --- extern/sector-storage/sched.go | 37 +++++++++++++++++++++++++--------- 1 file changed, 28 insertions(+), 9 deletions(-) diff --git a/extern/sector-storage/sched.go b/extern/sector-storage/sched.go index e7ad7d6b0fb..e625435493c 100644 --- a/extern/sector-storage/sched.go +++ b/extern/sector-storage/sched.go @@ -226,6 +226,8 @@ func (sh *scheduler) runSched() { var initialised bool for { + var doSched bool + select { case w := <-sh.newWorkers: sh.newWorker(w) @@ -235,31 +237,48 @@ func (sh *scheduler) runSched() { case req := <-sh.schedule: sh.schedQueue.Push(req) - if initialised { - sh.trySched() - } + doSched = true if sh.testSync != nil { sh.testSync <- struct{}{} } case req := <-sh.windowRequests: sh.openWindows = append(sh.openWindows, req) - if initialised { - sh.trySched() - } - + doSched = true case ireq := <-sh.info: ireq(sh.diag()) case <-iw: initialised = true iw = nil - - sh.trySched() + doSched = true case <-sh.closing: sh.schedClose() return } + + if doSched && initialised { + // First gather any pending tasks, so we go through the scheduling loop + // once for every added task + loop: + for { + select { + case req := <-sh.schedule: + sh.schedQueue.Push(req) + if sh.testSync != nil { + sh.testSync <- struct{}{} + } + case req := <-sh.windowRequests: + sh.openWindows = append(sh.openWindows, req) + default: + break loop + } + } + + sh.trySched() + } + + } } From 398cbd7f2c89b67895d6940f023584d81493bd75 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 28 Aug 2020 01:09:45 +0200 Subject: [PATCH 19/25] deps: Update markets --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index e42adc64fdc..69ebe73ed95 100644 --- a/go.mod +++ b/go.mod @@ -30,7 +30,7 @@ require ( github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03 github.com/filecoin-project/go-data-transfer v0.6.2 github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f - github.com/filecoin-project/go-fil-markets v0.5.7 + github.com/filecoin-project/go-fil-markets v0.5.8-0.20200827230805-30e8f6d42465 github.com/filecoin-project/go-jsonrpc v0.1.2-0.20200822201400-474f4fdccc52 github.com/filecoin-project/go-multistore v0.0.3 github.com/filecoin-project/go-padreader v0.0.0-20200210211231-548257017ca6 diff --git a/go.sum b/go.sum index 8f7f4ca9df8..1e6c49c2708 100644 --- a/go.sum +++ b/go.sum @@ -247,8 +247,8 @@ github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f h1 github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f/go.mod h1:Eaox7Hvus1JgPrL5+M3+h7aSPHc0cVqpSxA+TxIEpZQ= github.com/filecoin-project/go-fil-markets v0.5.6-0.20200814234959-80b1788108ac/go.mod h1:umicPCaN99ysHTiYOmwhuLxTFbOwcsI+mdw/t96vvM4= github.com/filecoin-project/go-fil-markets v0.5.6/go.mod h1:SJApXAKr5jyGpbzDEOhvemui0pih7hhT8r2MXJxCP1E= -github.com/filecoin-project/go-fil-markets v0.5.7 h1:kzyMHqez8ssxchj5s9M1hkC3CTwRGh2MeglJGfUksQU= -github.com/filecoin-project/go-fil-markets v0.5.7/go.mod h1:KnvFG3kSQ77vKYSY/QdrXET81wVCBByHXjG7AyxnbUw= +github.com/filecoin-project/go-fil-markets v0.5.8-0.20200827230805-30e8f6d42465 h1:74yonPhkVakfqUHcgfJ+vQOfCJQNiUBKn8XN9Z6F0S0= +github.com/filecoin-project/go-fil-markets v0.5.8-0.20200827230805-30e8f6d42465/go.mod h1:KnvFG3kSQ77vKYSY/QdrXET81wVCBByHXjG7AyxnbUw= github.com/filecoin-project/go-jsonrpc v0.1.2-0.20200817153016-2ea5cbaf5ec0/go.mod h1:XBBpuKIMaXIIzeqzO1iucq4GvbF8CxmXRFoezRh+Cx4= github.com/filecoin-project/go-jsonrpc v0.1.2-0.20200822201400-474f4fdccc52 h1:FXtCp0ybqdQL9knb3OGDpkNTaBbPxgkqPeWKotUwkH0= github.com/filecoin-project/go-jsonrpc v0.1.2-0.20200822201400-474f4fdccc52/go.mod h1:XBBpuKIMaXIIzeqzO1iucq4GvbF8CxmXRFoezRh+Cx4= From 6d1682a27eb678d6a15760c2869f0644ad2c49c3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 28 Aug 2020 11:43:56 +0200 Subject: [PATCH 20/25] storagefsm: wire up RecoverDealIDs fully --- extern/sector-storage/manager.go | 1 - extern/sector-storage/sched.go | 1 - extern/storage-sealing/fsm.go | 2 ++ extern/storage-sealing/states_failed.go | 8 ++++---- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/extern/sector-storage/manager.go b/extern/sector-storage/manager.go index 46ac4302d16..2d5741e6698 100644 --- a/extern/sector-storage/manager.go +++ b/extern/sector-storage/manager.go @@ -6,7 +6,6 @@ import ( "io" "net/http" - "github.com/hashicorp/go-multierror" "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log/v2" diff --git a/extern/sector-storage/sched.go b/extern/sector-storage/sched.go index e625435493c..181fc4882d3 100644 --- a/extern/sector-storage/sched.go +++ b/extern/sector-storage/sched.go @@ -278,7 +278,6 @@ func (sh *scheduler) runSched() { sh.trySched() } - } } diff --git a/extern/storage-sealing/fsm.go b/extern/storage-sealing/fsm.go index 4af860a545d..dd0cbabfac4 100644 --- a/extern/storage-sealing/fsm.go +++ b/extern/storage-sealing/fsm.go @@ -298,6 +298,8 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta fallthrough case DealsExpired: return m.handleDealsExpired, processed, nil + case RecoverDealIDs: + return m.handleRecoverDealIDs, processed, nil // Post-seal case Proving: diff --git a/extern/storage-sealing/states_failed.go b/extern/storage-sealing/states_failed.go index 7b6a7a89212..e313fd712f9 100644 --- a/extern/storage-sealing/states_failed.go +++ b/extern/storage-sealing/states_failed.go @@ -294,25 +294,25 @@ func (m *Sealing) handleRecoverDealIDs(ctx statemachine.Context, sector SectorIn proposal, err := m.api.StateMarketStorageDeal(ctx.Context(), p.DealInfo.DealID, tok) if err != nil { - log.Warn("getting deal %d for piece %d: %+v", p.DealInfo.DealID, i, err) + log.Warnf("getting deal %d for piece %d: %+v", p.DealInfo.DealID, i, err) toFix = append(toFix, i) continue } if proposal.Provider != m.maddr { - log.Warn("piece %d (of %d) of sector %d refers deal %d with wrong provider: %s != %s", i, len(sector.Pieces), sector.SectorNumber, p.DealInfo.DealID, proposal.Provider, m.maddr) + log.Warnf("piece %d (of %d) of sector %d refers deal %d with wrong provider: %s != %s", i, len(sector.Pieces), sector.SectorNumber, p.DealInfo.DealID, proposal.Provider, m.maddr) toFix = append(toFix, i) continue } if proposal.PieceCID != p.Piece.PieceCID { - log.Warn("piece %d (of %d) of sector %d refers deal %d with wrong PieceCID: %x != %x", i, len(sector.Pieces), sector.SectorNumber, p.DealInfo.DealID, p.Piece.PieceCID, proposal.PieceCID) + log.Warnf("piece %d (of %d) of sector %d refers deal %d with wrong PieceCID: %x != %x", i, len(sector.Pieces), sector.SectorNumber, p.DealInfo.DealID, p.Piece.PieceCID, proposal.PieceCID) toFix = append(toFix, i) continue } if p.Piece.Size != proposal.PieceSize { - log.Warn("piece %d (of %d) of sector %d refers deal %d with different size: %d != %d", i, len(sector.Pieces), sector.SectorNumber, p.DealInfo.DealID, p.Piece.Size, proposal.PieceSize) + log.Warnf("piece %d (of %d) of sector %d refers deal %d with different size: %d != %d", i, len(sector.Pieces), sector.SectorNumber, p.DealInfo.DealID, p.Piece.Size, proposal.PieceSize) toFix = append(toFix, i) continue } From 5ee85dc2633bef431ff8eb7f4fc6297db87bba16 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 28 Aug 2020 16:33:41 +0200 Subject: [PATCH 21/25] sectorstorage: Fix tests --- extern/sector-storage/sched_test.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/extern/sector-storage/sched_test.go b/extern/sector-storage/sched_test.go index fcfe891e7a2..6abbee717c5 100644 --- a/extern/sector-storage/sched_test.go +++ b/extern/sector-storage/sched_test.go @@ -22,6 +22,10 @@ import ( "github.com/filecoin-project/specs-storage/storage" ) +func init() { + InitWait = 10 * time.Millisecond +} + func TestWithPriority(t *testing.T) { ctx := context.Background() From 11b11e416b0bd367ed3f112e533b1bf7b461d7ca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 28 Aug 2020 18:26:17 +0200 Subject: [PATCH 22/25] sectorstorage: Compact assigned windows --- cmd/lotus-storage-miner/sealing.go | 8 +-- extern/sector-storage/resources.go | 12 ++--- extern/sector-storage/sched.go | 57 +++++++++++++++++++- extern/sector-storage/sched_test.go | 65 +++++++++++++++++++++++ extern/sector-storage/stats.go | 4 +- extern/sector-storage/storiface/worker.go | 2 +- 6 files changed, 134 insertions(+), 14 deletions(-) diff --git a/cmd/lotus-storage-miner/sealing.go b/cmd/lotus-storage-miner/sealing.go index 1e34859d7d4..5cc5c419af0 100644 --- a/cmd/lotus-storage-miner/sealing.go +++ b/cmd/lotus-storage-miner/sealing.go @@ -156,7 +156,7 @@ var sealingJobsCmd = &cli.Command{ // oldest first sort.Slice(lines, func(i, j int) bool { if lines[i].RunWait != lines[j].RunWait { - return !lines[i].RunWait // already running tasks first + return lines[i].RunWait < lines[j].RunWait } return lines[i].Start.Before(lines[j].Start) }) @@ -176,9 +176,9 @@ var sealingJobsCmd = &cli.Command{ _, _ = fmt.Fprintf(tw, "ID\tSector\tWorker\tHostname\tTask\tState\tTime\n") for _, l := range lines { - state := "assigned" - if !l.RunWait { - state = "running" + state := "running" + if l.RunWait != 0 { + state = fmt.Sprintf("assigned(%d)", l.RunWait-1) } _, _ = fmt.Fprintf(tw, "%d\t%d\t%d\t%s\t%s\t%s\t%s\n", l.ID, l.Sector.Number, l.wid, workerHostnames[l.wid], l.Task.Short(), state, time.Now().Sub(l.Start).Truncate(time.Millisecond*100)) } diff --git a/extern/sector-storage/resources.go b/extern/sector-storage/resources.go index d2c5646fae5..2fa7972673a 100644 --- a/extern/sector-storage/resources.go +++ b/extern/sector-storage/resources.go @@ -22,17 +22,17 @@ func (r Resources) MultiThread() bool { var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources{ sealtasks.TTAddPiece: { - abi.RegisteredSealProof_StackedDrg64GiBV1: Resources{ // This is probably a bit conservative - MaxMemory: 64 << 30, - MinMemory: 64 << 30, + abi.RegisteredSealProof_StackedDrg64GiBV1: Resources{ + MaxMemory: 8 << 30, + MinMemory: 8 << 30, Threads: 1, BaseMinMemory: 1 << 30, }, - abi.RegisteredSealProof_StackedDrg32GiBV1: Resources{ // This is probably a bit conservative - MaxMemory: 32 << 30, - MinMemory: 32 << 30, + abi.RegisteredSealProof_StackedDrg32GiBV1: Resources{ + MaxMemory: 4 << 30, + MinMemory: 4 << 30, Threads: 1, diff --git a/extern/sector-storage/sched.go b/extern/sector-storage/sched.go index 181fc4882d3..120ea596926 100644 --- a/extern/sector-storage/sched.go +++ b/extern/sector-storage/sched.go @@ -153,7 +153,7 @@ func newScheduler(spt abi.RegisteredSealProof) *scheduler { workerClosing: make(chan WorkerID), schedule: make(chan *workerRequest), - windowRequests: make(chan *schedWindowRequest), + windowRequests: make(chan *schedWindowRequest, 20), schedQueue: &requestQueue{}, @@ -561,6 +561,8 @@ func (sh *scheduler) runWorker(wid WorkerID) { worker.wndLk.Lock() + windowsRequested -= sh.workerCompactWindows(worker, wid) + assignLoop: // process windows in order for len(worker.activeWindows) > 0 { @@ -588,6 +590,7 @@ func (sh *scheduler) runWorker(wid WorkerID) { go todo.respond(xerrors.Errorf("assignWorker error: %w", err)) } + // Note: we're not freeing window.allocated resources here very much on purpose worker.activeWindows[0].todo = worker.activeWindows[0].todo[1:] } @@ -603,6 +606,58 @@ func (sh *scheduler) runWorker(wid WorkerID) { }() } +func (sh *scheduler) workerCompactWindows(worker *workerHandle, wid WorkerID) int { + // move tasks from older windows to newer windows if older windows + // still can fit them + if len(worker.activeWindows) > 1 { + for wi, window := range worker.activeWindows[1:] { + lower := worker.activeWindows[wi] + var moved []int + + for ti, todo := range window.todo { + needRes := ResourceTable[todo.taskType][sh.spt] + if !lower.allocated.canHandleRequest(needRes, wid, worker.info.Resources) { + continue + } + + moved = append(moved, ti) + lower.todo = append(lower.todo, todo) + lower.allocated.add(worker.info.Resources, needRes) + window.allocated.free(worker.info.Resources, needRes) + } + + if len(moved) > 0 { + newTodo := make([]*workerRequest, 0, len(window.todo)-len(moved)) + for i, t := range window.todo { + if moved[0] == i { + moved = moved[1:] + continue + } + + newTodo = append(newTodo, t) + } + window.todo = newTodo + } + } + } + + var compacted int + var newWindows []*schedWindow + + for _, window := range worker.activeWindows { + if len(window.todo) == 0 { + compacted++ + continue + } + + newWindows = append(newWindows, window) + } + + worker.activeWindows = newWindows + + return compacted +} + func (sh *scheduler) assignWorker(taskDone chan struct{}, wid WorkerID, w *workerHandle, req *workerRequest) error { needRes := ResourceTable[req.taskType][sh.spt] diff --git a/extern/sector-storage/sched_test.go b/extern/sector-storage/sched_test.go index 6abbee717c5..0bc0404e4c3 100644 --- a/extern/sector-storage/sched_test.go +++ b/extern/sector-storage/sched_test.go @@ -522,3 +522,68 @@ func BenchmarkTrySched(b *testing.B) { b.Run("1w-500q", test(1, 500)) b.Run("200w-400q", test(200, 400)) } + +func TestWindowCompact(t *testing.T) { + sh := scheduler{ + spt: abi.RegisteredSealProof_StackedDrg32GiBV1, + } + + test := func(start [][]sealtasks.TaskType, expect [][]sealtasks.TaskType) func(t *testing.T) { + return func(t *testing.T) { + wh := &workerHandle{ + info: storiface.WorkerInfo{ + Resources: decentWorkerResources, + }, + } + + for _, windowTasks := range start { + window := &schedWindow{} + + for _, task := range windowTasks { + window.todo = append(window.todo, &workerRequest{taskType: task}) + window.allocated.add(wh.info.Resources, ResourceTable[task][sh.spt]) + } + + wh.activeWindows = append(wh.activeWindows, window) + } + + n := sh.workerCompactWindows(wh, 0) + require.Equal(t, len(start)-len(expect), n) + + for wi, tasks := range expect { + var expectRes activeResources + + for ti, task := range tasks { + require.Equal(t, task, wh.activeWindows[wi].todo[ti].taskType, "%d, %d", wi, ti) + expectRes.add(wh.info.Resources, ResourceTable[task][sh.spt]) + } + + require.Equal(t, expectRes.cpuUse, wh.activeWindows[wi].allocated.cpuUse, "%d", wi) + require.Equal(t, expectRes.gpuUsed, wh.activeWindows[wi].allocated.gpuUsed, "%d", wi) + require.Equal(t, expectRes.memUsedMin, wh.activeWindows[wi].allocated.memUsedMin, "%d", wi) + require.Equal(t, expectRes.memUsedMax, wh.activeWindows[wi].allocated.memUsedMax, "%d", wi) + } + + } + } + + t.Run("2-pc1-windows", test( + [][]sealtasks.TaskType{{sealtasks.TTPreCommit1}, {sealtasks.TTPreCommit1}}, + [][]sealtasks.TaskType{{sealtasks.TTPreCommit1, sealtasks.TTPreCommit1}}), + ) + + t.Run("1-window", test( + [][]sealtasks.TaskType{{sealtasks.TTPreCommit1, sealtasks.TTPreCommit1}}, + [][]sealtasks.TaskType{{sealtasks.TTPreCommit1, sealtasks.TTPreCommit1}}), + ) + + t.Run("2-pc2-windows", test( + [][]sealtasks.TaskType{{sealtasks.TTPreCommit2}, {sealtasks.TTPreCommit2}}, + [][]sealtasks.TaskType{{sealtasks.TTPreCommit2}, {sealtasks.TTPreCommit2}}), + ) + + t.Run("2pc1-pc1ap", test( + [][]sealtasks.TaskType{{sealtasks.TTPreCommit1, sealtasks.TTPreCommit1}, {sealtasks.TTPreCommit1, sealtasks.TTAddPiece}}, + [][]sealtasks.TaskType{{sealtasks.TTPreCommit1, sealtasks.TTPreCommit1, sealtasks.TTAddPiece}, {sealtasks.TTPreCommit1}}), + ) +} diff --git a/extern/sector-storage/stats.go b/extern/sector-storage/stats.go index a915c432056..7f95e3bc37d 100644 --- a/extern/sector-storage/stats.go +++ b/extern/sector-storage/stats.go @@ -33,13 +33,13 @@ func (m *Manager) WorkerJobs() map[uint64][]storiface.WorkerJob { out[uint64(id)] = handle.wt.Running() handle.wndLk.Lock() - for _, window := range handle.activeWindows { + for wi, window := range handle.activeWindows { for _, request := range window.todo { out[uint64(id)] = append(out[uint64(id)], storiface.WorkerJob{ ID: 0, Sector: request.sector, Task: request.taskType, - RunWait: true, + RunWait: wi + 1, Start: request.start, }) } diff --git a/extern/sector-storage/storiface/worker.go b/extern/sector-storage/storiface/worker.go index 1140ed4dfd3..37e4aad1d02 100644 --- a/extern/sector-storage/storiface/worker.go +++ b/extern/sector-storage/storiface/worker.go @@ -37,6 +37,6 @@ type WorkerJob struct { Sector abi.SectorID Task sealtasks.TaskType - RunWait bool + RunWait int // 0 - running, 1+ - assigned Start time.Time } From 4a75e1e4b4ff74eb34151dcb7cf7b06009d6f613 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 28 Aug 2020 19:38:55 +0200 Subject: [PATCH 23/25] sectorstorage: Don't require tasks within a window to run in order --- extern/sector-storage/sched.go | 26 +++++++++++++++++++------- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/extern/sector-storage/sched.go b/extern/sector-storage/sched.go index 120ea596926..3ca6fa1bf6f 100644 --- a/extern/sector-storage/sched.go +++ b/extern/sector-storage/sched.go @@ -566,21 +566,31 @@ func (sh *scheduler) runWorker(wid WorkerID) { assignLoop: // process windows in order for len(worker.activeWindows) > 0 { - // process tasks within a window in order - for len(worker.activeWindows[0].todo) > 0 { - todo := worker.activeWindows[0].todo[0] - needRes := ResourceTable[todo.taskType][sh.spt] + firstWindow := worker.activeWindows[0] + // process tasks within a window, preferring tasks at lower indexes + for len(firstWindow.todo) > 0 { sh.workersLk.RLock() + + tidx := -1 + worker.lk.Lock() - ok := worker.preparing.canHandleRequest(needRes, wid, worker.info.Resources) + for t, todo := range firstWindow.todo { + needRes := ResourceTable[todo.taskType][sh.spt] + if worker.preparing.canHandleRequest(needRes, wid, worker.info.Resources) { + tidx = t + break + } + } worker.lk.Unlock() - if !ok { + if tidx == -1 { sh.workersLk.RUnlock() break assignLoop } + todo := firstWindow.todo[tidx] + log.Debugf("assign worker sector %d", todo.sector.Number) err := sh.assignWorker(taskDone, wid, worker, todo) sh.workersLk.RUnlock() @@ -591,7 +601,9 @@ func (sh *scheduler) runWorker(wid WorkerID) { } // Note: we're not freeing window.allocated resources here very much on purpose - worker.activeWindows[0].todo = worker.activeWindows[0].todo[1:] + copy(firstWindow.todo[tidx:], firstWindow.todo[tidx+1:]) + firstWindow.todo[len(firstWindow.todo)-1] = nil + firstWindow.todo = firstWindow.todo[:len(firstWindow.todo)-1] } copy(worker.activeWindows, worker.activeWindows[1:]) From 9d0c8ae3dde08240d199c0493cefe013bafdfc02 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 28 Aug 2020 21:38:21 +0200 Subject: [PATCH 24/25] sectorstorage: update sched tests for new logic --- extern/sector-storage/sched.go | 10 ++--- extern/sector-storage/sched_resources.go | 14 +++---- extern/sector-storage/sched_test.go | 53 +++++++++++++++++++++++- 3 files changed, 64 insertions(+), 13 deletions(-) diff --git a/extern/sector-storage/sched.go b/extern/sector-storage/sched.go index 3ca6fa1bf6f..16e51f9a66d 100644 --- a/extern/sector-storage/sched.go +++ b/extern/sector-storage/sched.go @@ -359,7 +359,7 @@ func (sh *scheduler) trySched() { } // TODO: allow bigger windows - if !windows[wnd].allocated.canHandleRequest(needRes, windowRequest.worker, worker.info.Resources) { + if !windows[wnd].allocated.canHandleRequest(needRes, windowRequest.worker, "schedAcceptable", worker.info.Resources) { continue } @@ -430,11 +430,11 @@ func (sh *scheduler) trySched() { log.Debugf("SCHED try assign sqi:%d sector %d to window %d", sqi, task.sector.Number, wnd) // TODO: allow bigger windows - if !windows[wnd].allocated.canHandleRequest(needRes, wid, wr) { + if !windows[wnd].allocated.canHandleRequest(needRes, wid, "schedAssign", wr) { continue } - log.Debugf("SCHED ASSIGNED sqi:%d sector %d to window %d", sqi, task.sector.Number, wnd) + log.Debugf("SCHED ASSIGNED sqi:%d sector %d task %s to window %d", sqi, task.sector.Number, task.taskType, wnd) windows[wnd].allocated.add(wr, needRes) @@ -577,7 +577,7 @@ func (sh *scheduler) runWorker(wid WorkerID) { worker.lk.Lock() for t, todo := range firstWindow.todo { needRes := ResourceTable[todo.taskType][sh.spt] - if worker.preparing.canHandleRequest(needRes, wid, worker.info.Resources) { + if worker.preparing.canHandleRequest(needRes, wid, "startPreparing", worker.info.Resources) { tidx = t break } @@ -628,7 +628,7 @@ func (sh *scheduler) workerCompactWindows(worker *workerHandle, wid WorkerID) in for ti, todo := range window.todo { needRes := ResourceTable[todo.taskType][sh.spt] - if !lower.allocated.canHandleRequest(needRes, wid, worker.info.Resources) { + if !lower.allocated.canHandleRequest(needRes, wid, "compactWindows", worker.info.Resources) { continue } diff --git a/extern/sector-storage/sched_resources.go b/extern/sector-storage/sched_resources.go index f468d5fe918..92a3b32adfe 100644 --- a/extern/sector-storage/sched_resources.go +++ b/extern/sector-storage/sched_resources.go @@ -7,7 +7,7 @@ import ( ) func (a *activeResources) withResources(id WorkerID, wr storiface.WorkerResources, r Resources, locker sync.Locker, cb func() error) error { - for !a.canHandleRequest(r, id, wr) { + for !a.canHandleRequest(r, id, "withResources", wr) { if a.cond == nil { a.cond = sync.NewCond(locker) } @@ -52,37 +52,37 @@ func (a *activeResources) free(wr storiface.WorkerResources, r Resources) { a.memUsedMax -= r.MaxMemory } -func (a *activeResources) canHandleRequest(needRes Resources, wid WorkerID, res storiface.WorkerResources) bool { +func (a *activeResources) canHandleRequest(needRes Resources, wid WorkerID, caller string, res storiface.WorkerResources) bool { // TODO: dedupe needRes.BaseMinMemory per task type (don't add if that task is already running) minNeedMem := res.MemReserved + a.memUsedMin + needRes.MinMemory + needRes.BaseMinMemory if minNeedMem > res.MemPhysical { - log.Debugf("sched: not scheduling on worker %d; not enough physical memory - need: %dM, have %dM", wid, minNeedMem/mib, res.MemPhysical/mib) + log.Debugf("sched: not scheduling on worker %d for %s; not enough physical memory - need: %dM, have %dM", wid, caller, minNeedMem/mib, res.MemPhysical/mib) return false } maxNeedMem := res.MemReserved + a.memUsedMax + needRes.MaxMemory + needRes.BaseMinMemory if maxNeedMem > res.MemSwap+res.MemPhysical { - log.Debugf("sched: not scheduling on worker %d; not enough virtual memory - need: %dM, have %dM", wid, maxNeedMem/mib, (res.MemSwap+res.MemPhysical)/mib) + log.Debugf("sched: not scheduling on worker %d for %s; not enough virtual memory - need: %dM, have %dM", wid, caller, maxNeedMem/mib, (res.MemSwap+res.MemPhysical)/mib) return false } if needRes.MultiThread() { if a.cpuUse > 0 { - log.Debugf("sched: not scheduling on worker %d; multicore process needs %d threads, %d in use, target %d", wid, res.CPUs, a.cpuUse, res.CPUs) + log.Debugf("sched: not scheduling on worker %d for %s; multicore process needs %d threads, %d in use, target %d", wid, caller, res.CPUs, a.cpuUse, res.CPUs) return false } } else { if a.cpuUse+uint64(needRes.Threads) > res.CPUs { - log.Debugf("sched: not scheduling on worker %d; not enough threads, need %d, %d in use, target %d", wid, needRes.Threads, a.cpuUse, res.CPUs) + log.Debugf("sched: not scheduling on worker %d for %s; not enough threads, need %d, %d in use, target %d", wid, caller, needRes.Threads, a.cpuUse, res.CPUs) return false } } if len(res.GPUs) > 0 && needRes.CanGPU { if a.gpuUsed { - log.Debugf("sched: not scheduling on worker %d; GPU in use", wid) + log.Debugf("sched: not scheduling on worker %d for %s; GPU in use", wid, caller) return false } } diff --git a/extern/sector-storage/sched_test.go b/extern/sector-storage/sched_test.go index 0bc0404e4c3..4aa1dc49be0 100644 --- a/extern/sector-storage/sched_test.go +++ b/extern/sector-storage/sched_test.go @@ -5,6 +5,7 @@ import ( "fmt" "io" "runtime" + "sort" "sync" "testing" "time" @@ -305,7 +306,8 @@ func TestSched(t *testing.T) { done: map[string]chan struct{}{}, } - for _, task := range tasks { + for i, task := range tasks { + log.Info("TASK", i) task(t, sched, index, &rm) } @@ -419,6 +421,45 @@ func TestSched(t *testing.T) { ) } + diag := func() task { + return func(t *testing.T, s *scheduler, index *stores.Index, meta *runMeta) { + time.Sleep(20 * time.Millisecond) + for _, request := range s.diag().Requests { + log.Infof("!!! sDIAG: sid(%d) task(%s)", request.Sector.Number, request.TaskType) + } + + wj := (&Manager{sched: s}).WorkerJobs() + + type line struct { + storiface.WorkerJob + wid uint64 + } + + lines := make([]line, 0) + + for wid, jobs := range wj { + for _, job := range jobs { + lines = append(lines, line{ + WorkerJob: job, + wid: wid, + }) + } + } + + // oldest first + sort.Slice(lines, func(i, j int) bool { + if lines[i].RunWait != lines[j].RunWait { + return lines[i].RunWait < lines[j].RunWait + } + return lines[i].Start.Before(lines[j].Start) + }) + + for _, l := range lines { + log.Infof("!!! wDIAG: rw(%d) sid(%d) t(%s)", l.RunWait, l.Sector.Number, l.Task) + } + } + } + // run this one a bunch of times, it had a very annoying tendency to fail randomly for i := 0; i < 40; i++ { t.Run("pc1-pc2-prio", testFunc([]workerSpec{ @@ -427,6 +468,8 @@ func TestSched(t *testing.T) { // fill queues twoPC1("w0", 0, taskStarted), twoPC1("w1", 2, taskNotScheduled), + sched("w2", "fred", 4, sealtasks.TTPreCommit1), + taskNotScheduled("w2"), // windowed @@ -439,10 +482,18 @@ func TestSched(t *testing.T) { sched("t3", "fred", 10, sealtasks.TTPreCommit2), taskNotScheduled("t3"), + diag(), + twoPC1Act("w0", taskDone), twoPC1Act("w1", taskStarted), + taskNotScheduled("w2"), twoPC1Act("w1", taskDone), + taskStarted("w2"), + + taskDone("w2"), + + diag(), taskStarted("t3"), taskNotScheduled("t1"), From a62e44c4e502f4341564f9e0c699e02286eea30e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Sat, 29 Aug 2020 02:13:38 +0200 Subject: [PATCH 25/25] Fix lint --- chain/beacon/beacon.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/chain/beacon/beacon.go b/chain/beacon/beacon.go index 7b998c04f05..23b062beabd 100644 --- a/chain/beacon/beacon.go +++ b/chain/beacon/beacon.go @@ -37,6 +37,10 @@ func ValidateBlockValues(b RandomBeacon, h *types.BlockHeader, prevEntry types.B return nil } + if len(h.BeaconEntries) == 0 { + return xerrors.Errorf("expected to have beacon entries in this block, but didn't find any") + } + last := h.BeaconEntries[len(h.BeaconEntries)-1] if last.Round != maxRound { return xerrors.Errorf("expected final beacon entry in block to be at round %d, got %d", maxRound, last.Round)