Skip to content

Commit

Permalink
storagefsm: Attempt to auto-recover from reorged DealIDs
Browse files Browse the repository at this point in the history
  • Loading branch information
magik6k committed Aug 27, 2020
1 parent 7806a98 commit aa9fb45
Show file tree
Hide file tree
Showing 7 changed files with 196 additions and 20 deletions.
36 changes: 35 additions & 1 deletion extern/storage-sealing/cbor_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 27 additions & 6 deletions extern/storage-sealing/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto
on(SectorSealPreCommit1Failed{}, SealPreCommit1Failed),
on(SectorPreCommitLanded{}, WaitSeed),
on(SectorDealsExpired{}, DealsExpired),
on(SectorInvalidDealIDs{}, RecoverDealIDs),
),
ComputeProofFailed: planOne(
on(SectorRetryComputeProof{}, Committing),
Expand All @@ -118,13 +119,17 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto
on(SectorRetryPreCommit{}, PreCommitting),
on(SectorRetryCommitWait{}, CommitWait),
on(SectorDealsExpired{}, DealsExpired),
on(SectorInvalidDealIDs{}, RecoverDealIDs),
),
FinalizeFailed: planOne(
on(SectorRetryFinalize{}, FinalizeSector),
),
DealsExpired: planOne(
// SectorRemove (global)
),
RecoverDealIDs: planOne(
onReturning(SectorUpdateDealIDs{}),
),

// Post-seal

Expand Down Expand Up @@ -389,13 +394,30 @@ func final(events []statemachine.Event, state *SectorInfo) (uint64, error) {
return 0, xerrors.Errorf("didn't expect any events in state %s, got %+v", state.State, events)
}

func on(mut mutator, next SectorState) func() (mutator, SectorState) {
return func() (mutator, SectorState) {
return mut, next
func on(mut mutator, next SectorState) func() (mutator, func(*SectorInfo) error) {
return func() (mutator, func(*SectorInfo) error) {
return mut, func(state *SectorInfo) error {
state.State = next
return nil
}
}
}

func planOne(ts ...func() (mut mutator, next SectorState)) func(events []statemachine.Event, state *SectorInfo) (uint64, error) {
func onReturning(mut mutator) func() (mutator, func(*SectorInfo) error) {
return func() (mutator, func(*SectorInfo) error) {
return mut, func(state *SectorInfo) error {
if state.Return == "" {
return xerrors.Errorf("return state not set")
}

state.State = SectorState(state.Return)
state.Return = ""
return nil
}
}
}

func planOne(ts ...func() (mut mutator, next func(*SectorInfo) error)) func(events []statemachine.Event, state *SectorInfo) (uint64, error) {
return func(events []statemachine.Event, state *SectorInfo) (uint64, error) {
if gm, ok := events[0].User.(globalMutator); ok {
gm.applyGlobal(state)
Expand All @@ -414,8 +436,7 @@ func planOne(ts ...func() (mut mutator, next SectorState)) func(events []statema
}

events[0].User.(mutator).apply(state)
state.State = next
return 1, nil
return 1, next(state)
}

_, ok := events[0].User.(Ignorable)
Expand Down
18 changes: 18 additions & 0 deletions extern/storage-sealing/fsm_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,24 @@ type SectorRetryCommitWait struct{}

func (evt SectorRetryCommitWait) apply(state *SectorInfo) {}

type SectorInvalidDealIDs struct{
Return ReturnState
}

func (evt SectorInvalidDealIDs) apply(state *SectorInfo) {
state.Return = evt.Return
}

type SectorUpdateDealIDs struct{
Updates map[int]abi.DealID
}

func (evt SectorUpdateDealIDs) apply(state *SectorInfo) {
for i, id := range evt.Updates {
state.Pieces[i].DealInfo.DealID = id
}
}

// Faults

type SectorFaulty struct{}
Expand Down
1 change: 1 addition & 0 deletions extern/storage-sealing/sector_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const (
PackingFailed SectorState = "PackingFailed"
FinalizeFailed SectorState = "FinalizeFailed"
DealsExpired SectorState = "DealsExpired"
RecoverDealIDs SectorState = "RecoverDealIDs"

Faulty SectorState = "Faulty" // sector is corrupted or gone for some reason
FaultReported SectorState = "FaultReported" // sector has been declared as a fault on chain
Expand Down
109 changes: 100 additions & 9 deletions extern/storage-sealing/states_failed.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
package sealing

import (
"bytes"
"time"

"golang.org/x/xerrors"

"github.com/filecoin-project/go-statemachine"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/builtin/market"
"github.com/filecoin-project/specs-actors/actors/builtin/miner"
"github.com/filecoin-project/specs-actors/actors/runtime/exitcode"

"github.com/filecoin-project/lotus/extern/sector-storage/zerocomm"
)

const minRetryTime = 1 * time.Minute
Expand Down Expand Up @@ -82,9 +88,8 @@ func (m *Sealing) handlePreCommitFailed(ctx statemachine.Context, sector SectorI
case *ErrBadTicket:
return ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("bad expired: %w", err)})
case *ErrInvalidDeals:
// TODO: Deals got reorged, figure out what to do about this
// (this will probably require tracking the deal submit message CID, and re-checking what's on chain)
return xerrors.Errorf("invalid deals in sector %d: %w", sector.SectorNumber, err)
log.Warnf("invalid deals in sector %d: %v", sector.SectorNumber, err)
return ctx.Send(SectorInvalidDealIDs{ Return: RetPreCommitFailed })
case *ErrExpiredDeals:
return ctx.Send(SectorDealsExpired{xerrors.Errorf("sector deals expired: %w", err)})
case *ErrNoPrecommit:
Expand Down Expand Up @@ -166,9 +171,8 @@ func (m *Sealing) handleCommitFailed(ctx statemachine.Context, sector SectorInfo
case *ErrBadTicket:
return ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("bad ticket: %w", err)})
case *ErrInvalidDeals:
// TODO: Deals got reorged, figure out what to do about this
// (this will probably require tracking the deal submit message CID, and re-checking what's on chain)
return xerrors.Errorf("invalid deals in sector %d: %w", sector.SectorNumber, err)
log.Warnf("invalid deals in sector %d: %v", sector.SectorNumber, err)
return ctx.Send(SectorInvalidDealIDs{ Return: RetCommitFailed })
case *ErrExpiredDeals:
return ctx.Send(SectorDealsExpired{xerrors.Errorf("sector deals expired: %w", err)})
case nil:
Expand Down Expand Up @@ -206,9 +210,8 @@ func (m *Sealing) handleCommitFailed(ctx statemachine.Context, sector SectorInfo
case *ErrNoPrecommit:
return ctx.Send(SectorRetryPreCommit{})
case *ErrInvalidDeals:
// TODO: Deals got reorged, figure out what to do about this
// (this will probably require tracking the deal submit message CID, and re-checking what's on chain)
return xerrors.Errorf("invalid deals in sector %d: %w", sector.SectorNumber, err)
log.Warnf("invalid deals in sector %d: %v", sector.SectorNumber, err)
return ctx.Send(SectorInvalidDealIDs{ Return: RetCommitFailed })
case *ErrExpiredDeals:
return ctx.Send(SectorDealsExpired{xerrors.Errorf("sector deals expired: %w", err)})
case *ErrCommitWaitFailed:
Expand Down Expand Up @@ -261,3 +264,91 @@ func (m *Sealing) handleDealsExpired(ctx statemachine.Context, sector SectorInfo
// Not much to do here, we can't go back in time to commit this sector
return ctx.Send(SectorRemove{})
}

func (m *Sealing) handleRecoverDealIDs(ctx statemachine.Context, sector SectorInfo) error {
tok, height, err := m.api.ChainHead(ctx.Context())
if err != nil {
return xerrors.Errorf("getting chain head: %w", err)
}

var toFix []int

for i, p := range sector.Pieces {
// if no deal is associated with the piece, ensure that we added it as
// filler (i.e. ensure that it has a zero PieceCID)
if p.DealInfo == nil {
exp := zerocomm.ZeroPieceCommitment(p.Piece.Size.Unpadded())
if !p.Piece.PieceCID.Equals(exp) {
return xerrors.Errorf("sector %d piece %d had non-zero PieceCID %+v", sector.SectorNumber, i, p.Piece.PieceCID)
}
continue
}

proposal, err := m.api.StateMarketStorageDeal(ctx.Context(), p.DealInfo.DealID, tok)
if err != nil {
log.Warn("getting deal %d for piece %d: %+v", p.DealInfo.DealID, i, err)
toFix = append(toFix, i)
continue
}

if proposal.Provider != m.maddr {
log.Warn("piece %d (of %d) of sector %d refers deal %d with wrong provider: %s != %s", i, len(sector.Pieces), sector.SectorNumber, p.DealInfo.DealID, proposal.Provider, m.maddr)
toFix = append(toFix, i)
continue
}

if proposal.PieceCID != p.Piece.PieceCID {
log.Warn("piece %d (of %d) of sector %d refers deal %d with wrong PieceCID: %x != %x", i, len(sector.Pieces), sector.SectorNumber, p.DealInfo.DealID, p.Piece.PieceCID, proposal.PieceCID)
toFix = append(toFix, i)
continue
}

if p.Piece.Size != proposal.PieceSize {
log.Warn("piece %d (of %d) of sector %d refers deal %d with different size: %d != %d", i, len(sector.Pieces), sector.SectorNumber, p.DealInfo.DealID, p.Piece.Size, proposal.PieceSize)
toFix = append(toFix, i)
continue
}

if height >= proposal.StartEpoch {
// TODO: check if we are in an early enough state (before precommit), try to remove the offending pieces
// (tricky as we have to 'defragment' the sector while doing that, and update piece references for retrieval)
return xerrors.Errorf("can't fix sector deals: piece %d (of %d) of sector %d refers expired deal %d - should start at %d, head %d", i, len(sector.Pieces), sector.SectorNumber, p.DealInfo.DealID, proposal.StartEpoch, height)
}
}

updates := map[int]abi.DealID{}
for _, i := range toFix {
p := sector.Pieces[i]

if p.DealInfo.PublishCid == nil {
// TODO: check if we are in an early enough state try to remove this piece
log.Error("can't fix sector deals: piece %d (of %d) of sector %d has nil DealInfo.PublishCid (refers to deal %d)", i, len(sector.Pieces), sector.SectorNumber, p.DealInfo.DealID)
// Not much to do here (and this can only happen for old spacerace sectors)
return ctx.Send(SectorRemove{})
}

ml, err := m.api.StateSearchMsg(ctx.Context(), *p.DealInfo.PublishCid)
if err != nil {
return xerrors.Errorf("looking for publish deal message %s (sector %d, piece %d): %w", *p.DealInfo.PublishCid, sector.SectorNumber, i, err)
}

if ml.Receipt.ExitCode != exitcode.Ok {
return xerrors.Errorf("looking for publish deal message %s (sector %d, piece %d): non-ok exit code: %s", *p.DealInfo.PublishCid, sector.SectorNumber, i, ml.Receipt.ExitCode)
}

var retval market.PublishStorageDealsReturn
if err := retval.UnmarshalCBOR(bytes.NewReader(ml.Receipt.Return)); err != nil {
return xerrors.Errorf("looking for publish deal message: unmarshaling message return: %w", err)
}

if len(retval.IDs) != 1 {
// market currently only ever sends messages with 1 deal
return xerrors.Errorf("can't recover dealIDs from publish deal message with more than 1 deal")
}

updates[i] = retval.IDs[0]
}

// Not much to do here, we can't go back in time to commit this sector
return ctx.Send(SectorUpdateDealIDs{Updates: updates})
}
8 changes: 4 additions & 4 deletions extern/storage-sealing/states_sealing.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ func (m *Sealing) handlePreCommit1(ctx statemachine.Context, sector SectorInfo)
log.Errorf("handlePreCommit1: api error, not proceeding: %+v", err)
return nil
case *ErrInvalidDeals:
return ctx.Send(SectorPackingFailed{xerrors.Errorf("invalid dealIDs in sector: %w", err)})
log.Warnf("invalid deals in sector %d: %v", sector.SectorNumber, err)
return ctx.Send(SectorInvalidDealIDs{ Return: RetPreCommit1 })
case *ErrExpiredDeals: // Probably not much we can do here, maybe re-pack the sector?
return ctx.Send(SectorPackingFailed{xerrors.Errorf("expired dealIDs in sector: %w", err)})
default:
Expand Down Expand Up @@ -157,9 +158,8 @@ func (m *Sealing) handlePreCommitting(ctx statemachine.Context, sector SectorInf
case *ErrBadTicket:
return ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("bad ticket: %w", err)})
case *ErrInvalidDeals:
// TODO: Deals got reorged, figure out what to do about this
// (this will probably require tracking the deal submit message CID, and re-checking what's on chain)
return xerrors.Errorf("invalid deals in sector %d: %w", sector.SectorNumber, err)
log.Warnf("invalid deals in sector %d: %v", sector.SectorNumber, err)
return ctx.Send(SectorInvalidDealIDs{ Return: RetPreCommitting })
case *ErrExpiredDeals:
return ctx.Send(SectorDealsExpired{xerrors.Errorf("sector deals expired: %w", err)})
case *ErrPrecommitOnChain:
Expand Down
11 changes: 11 additions & 0 deletions extern/storage-sealing/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,14 @@ type Log struct {
Kind string
}

type ReturnState string
const (
RetPreCommit1 = ReturnState(PreCommit1)
RetPreCommitting = ReturnState(PreCommitting)
RetPreCommitFailed = ReturnState(PreCommitFailed)
RetCommitFailed = ReturnState(CommitFailed)
)

type SectorInfo struct {
State SectorState
SectorNumber abi.SectorNumber
Expand Down Expand Up @@ -91,6 +99,9 @@ type SectorInfo struct {
// Faults
FaultReportMsg *cid.Cid

// Recovery
Return ReturnState

// Debug
LastErr string

Expand Down

0 comments on commit aa9fb45

Please sign in to comment.