Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sealing: Fix RecoverDealIDs loop with changed PieceCID #7117

Merged
merged 1 commit into from
Aug 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions extern/storage-sealing/checks.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (m *Sealing) checkCommit(ctx context.Context, si SectorInfo, proof []byte,
return &ErrBadSeed{xerrors.Errorf("seed epoch was not set")}
}

pci, err := m.api.StateSectorPreCommitInfo(ctx, m.maddr, si.SectorNumber, tok)
pci, err := m.Api.StateSectorPreCommitInfo(ctx, m.maddr, si.SectorNumber, tok)
if err == ErrSectorAllocated {
// not much more we can check here, basically try to wait for commit,
// and hope that this will work
Expand Down Expand Up @@ -152,7 +152,7 @@ func (m *Sealing) checkCommit(ctx context.Context, si SectorInfo, proof []byte,
return err
}

seed, err := m.api.ChainGetRandomnessFromBeacon(ctx, tok, crypto.DomainSeparationTag_InteractiveSealChallengeSeed, si.SeedEpoch, buf.Bytes())
seed, err := m.Api.ChainGetRandomnessFromBeacon(ctx, tok, crypto.DomainSeparationTag_InteractiveSealChallengeSeed, si.SeedEpoch, buf.Bytes())
if err != nil {
return &ErrApi{xerrors.Errorf("failed to get randomness for computing seal proof: %w", err)}
}
Expand Down Expand Up @@ -181,7 +181,7 @@ func (m *Sealing) checkCommit(ctx context.Context, si SectorInfo, proof []byte,
return &ErrInvalidProof{xerrors.New("invalid proof (compute error?)")}
}

if err := checkPieces(ctx, m.maddr, si, m.api); err != nil {
if err := checkPieces(ctx, m.maddr, si, m.Api); err != nil {
return err
}

Expand Down
9 changes: 8 additions & 1 deletion extern/storage-sealing/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,13 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
log.Errorw("update sector stats", "error", err)
}

// todo: drop this, use Context iface everywhere
wrapCtx := func(f func(Context, SectorInfo) error) func(statemachine.Context, SectorInfo) error {
return func(ctx statemachine.Context, info SectorInfo) error {
return f(&ctx, info)
}
}

switch state.State {
// Happy path
case Empty:
Expand Down Expand Up @@ -413,7 +420,7 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
case DealsExpired:
return m.handleDealsExpired, processed, nil
case RecoverDealIDs:
return m.handleRecoverDealIDs, processed, nil
return wrapCtx(m.HandleRecoverDealIDs), processed, nil

// Post-seal
case Proving:
Expand Down
439 changes: 439 additions & 0 deletions extern/storage-sealing/mocks/api.go

Large diffs are not rendered by default.

63 changes: 63 additions & 0 deletions extern/storage-sealing/mocks/statemachine.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 10 additions & 6 deletions extern/storage-sealing/sealing.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ type SectorLocation struct {

var ErrSectorAllocated = errors.New("sectorNumber is allocated, but PreCommit info wasn't found on chain")

//go:generate go run github.com/golang/mock/mockgen -destination=mocks/api.go -package=mocks . SealingAPI

type SealingAPI interface {
StateWaitMsg(context.Context, cid.Cid) (MsgLookup, error)
StateSearchMsg(context.Context, cid.Cid) (*MsgLookup, error)
Expand Down Expand Up @@ -80,7 +82,9 @@ type SectorStateNotifee func(before, after SectorInfo)
type AddrSel func(ctx context.Context, mi miner.MinerInfo, use api.AddrUse, goodFunds, minFunds abi.TokenAmount) (address.Address, abi.TokenAmount, error)

type Sealing struct {
api SealingAPI
Api SealingAPI
DealInfo *CurrentDealInfoManager

feeCfg config.MinerFeeConfig
events Events

Expand Down Expand Up @@ -114,7 +118,6 @@ type Sealing struct {
commiter *CommitBatcher

getConfig GetSealingConfigFunc
dealInfo *CurrentDealInfoManager
}

type openSector struct {
Expand All @@ -135,7 +138,9 @@ type pendingPiece struct {

func New(mctx context.Context, api SealingAPI, fc config.MinerFeeConfig, events Events, maddr address.Address, ds datastore.Batching, sealer sectorstorage.SectorManager, sc SectorIDCounter, verif ffiwrapper.Verifier, prov ffiwrapper.Prover, pcp PreCommitPolicy, gc GetSealingConfigFunc, notifee SectorStateNotifee, as AddrSel) *Sealing {
s := &Sealing{
api: api,
Api: api,
DealInfo: &CurrentDealInfoManager{api},

feeCfg: fc,
events: events,

Expand All @@ -159,7 +164,6 @@ func New(mctx context.Context, api SealingAPI, fc config.MinerFeeConfig, events
commiter: NewCommitBatcher(mctx, maddr, api, as, fc, gc, prov),

getConfig: gc,
dealInfo: &CurrentDealInfoManager{api},

stats: SectorStats{
bySector: map[abi.SectorID]statSectorState{},
Expand Down Expand Up @@ -229,12 +233,12 @@ func (m *Sealing) CommitPending(ctx context.Context) ([]abi.SectorID, error) {
}

func (m *Sealing) currentSealProof(ctx context.Context) (abi.RegisteredSealProof, error) {
mi, err := m.api.StateMinerInfo(ctx, m.maddr, nil)
mi, err := m.Api.StateMinerInfo(ctx, m.maddr, nil)
if err != nil {
return 0, err
}

ver, err := m.api.StateNetworkVersion(ctx, nil)
ver, err := m.Api.StateNetworkVersion(ctx, nil)
if err != nil {
return 0, err
}
Expand Down
37 changes: 23 additions & 14 deletions extern/storage-sealing/states_failed.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@ func failedCooldown(ctx statemachine.Context, sector SectorInfo) error {
}

func (m *Sealing) checkPreCommitted(ctx statemachine.Context, sector SectorInfo) (*miner.SectorPreCommitOnChainInfo, bool) {
tok, _, err := m.api.ChainHead(ctx.Context())
tok, _, err := m.Api.ChainHead(ctx.Context())
if err != nil {
log.Errorf("handleSealPrecommit1Failed(%d): temp error: %+v", sector.SectorNumber, err)
return nil, false
}

info, err := m.api.StateSectorPreCommitInfo(ctx.Context(), m.maddr, sector.SectorNumber, tok)
info, err := m.Api.StateSectorPreCommitInfo(ctx.Context(), m.maddr, sector.SectorNumber, tok)
if err != nil {
log.Errorf("handleSealPrecommit1Failed(%d): temp error: %+v", sector.SectorNumber, err)
return nil, false
Expand Down Expand Up @@ -71,14 +71,14 @@ func (m *Sealing) handleSealPrecommit2Failed(ctx statemachine.Context, sector Se
}

func (m *Sealing) handlePreCommitFailed(ctx statemachine.Context, sector SectorInfo) error {
tok, height, err := m.api.ChainHead(ctx.Context())
tok, height, err := m.Api.ChainHead(ctx.Context())
if err != nil {
log.Errorf("handlePreCommitFailed: api error, not proceeding: %+v", err)
return nil
}

if sector.PreCommitMessage != nil {
mw, err := m.api.StateSearchMsg(ctx.Context(), *sector.PreCommitMessage)
mw, err := m.Api.StateSearchMsg(ctx.Context(), *sector.PreCommitMessage)
if err != nil {
// API error
if err := failedCooldown(ctx, sector); err != nil {
Expand All @@ -105,7 +105,7 @@ func (m *Sealing) handlePreCommitFailed(ctx statemachine.Context, sector SectorI
}
}

if err := checkPrecommit(ctx.Context(), m.Address(), sector, tok, height, m.api); err != nil {
if err := checkPrecommit(ctx.Context(), m.Address(), sector, tok, height, m.Api); err != nil {
switch err.(type) {
case *ErrApi:
log.Errorf("handlePreCommitFailed: api error, not proceeding: %+v", err)
Expand Down Expand Up @@ -182,14 +182,14 @@ func (m *Sealing) handleComputeProofFailed(ctx statemachine.Context, sector Sect
}

func (m *Sealing) handleCommitFailed(ctx statemachine.Context, sector SectorInfo) error {
tok, _, err := m.api.ChainHead(ctx.Context())
tok, _, err := m.Api.ChainHead(ctx.Context())
if err != nil {
log.Errorf("handleCommitting: api error, not proceeding: %+v", err)
return nil
}

if sector.CommitMessage != nil {
mw, err := m.api.StateSearchMsg(ctx.Context(), *sector.CommitMessage)
mw, err := m.Api.StateSearchMsg(ctx.Context(), *sector.CommitMessage)
if err != nil {
// API error
if err := failedCooldown(ctx, sector); err != nil {
Expand Down Expand Up @@ -286,7 +286,7 @@ func (m *Sealing) handleTerminateFailed(ctx statemachine.Context, sector SectorI
// ignoring error as it's most likely an API error - `pci` will be nil, and we'll go back to
// the Terminating state after cooldown. If the API is still failing, well get back to here
// with the error in SectorInfo log.
pci, _ := m.api.StateSectorPreCommitInfo(ctx.Context(), m.maddr, sector.SectorNumber, nil)
pci, _ := m.Api.StateSectorPreCommitInfo(ctx.Context(), m.maddr, sector.SectorNumber, nil)
if pci != nil {
return nil // pause the fsm, needs manual user action
}
Expand All @@ -300,7 +300,7 @@ func (m *Sealing) handleTerminateFailed(ctx statemachine.Context, sector SectorI

func (m *Sealing) handleDealsExpired(ctx statemachine.Context, sector SectorInfo) error {
// First make vary sure the sector isn't committed
si, err := m.api.StateSectorGetInfo(ctx.Context(), m.maddr, sector.SectorNumber, nil)
si, err := m.Api.StateSectorGetInfo(ctx.Context(), m.maddr, sector.SectorNumber, nil)
if err != nil {
return xerrors.Errorf("getting sector info: %w", err)
}
Expand All @@ -319,8 +319,8 @@ func (m *Sealing) handleDealsExpired(ctx statemachine.Context, sector SectorInfo
return ctx.Send(SectorRemove{})
}

func (m *Sealing) handleRecoverDealIDs(ctx statemachine.Context, sector SectorInfo) error {
tok, height, err := m.api.ChainHead(ctx.Context())
func (m *Sealing) HandleRecoverDealIDs(ctx Context, sector SectorInfo) error {
tok, height, err := m.Api.ChainHead(ctx.Context())
if err != nil {
return xerrors.Errorf("getting chain head: %w", err)
}
Expand All @@ -340,7 +340,7 @@ func (m *Sealing) handleRecoverDealIDs(ctx statemachine.Context, sector SectorIn
continue
}

proposal, err := m.api.StateMarketStorageDealProposal(ctx.Context(), p.DealInfo.DealID, tok)
proposal, err := m.Api.StateMarketStorageDealProposal(ctx.Context(), p.DealInfo.DealID, tok)
if err != nil {
log.Warnf("getting deal %d for piece %d: %+v", p.DealInfo.DealID, i, err)
toFix = append(toFix, i)
Expand Down Expand Up @@ -389,11 +389,16 @@ func (m *Sealing) handleRecoverDealIDs(ctx statemachine.Context, sector SectorIn
mdp := market.DealProposal(*p.DealInfo.DealProposal)
dp = &mdp
}
res, err := m.dealInfo.GetCurrentDealInfo(ctx.Context(), tok, dp, *p.DealInfo.PublishCid)
res, err := m.DealInfo.GetCurrentDealInfo(ctx.Context(), tok, dp, *p.DealInfo.PublishCid)
if err != nil {
failed[i] = xerrors.Errorf("getting current deal info for piece %d: %w", i, err)
}

if res.MarketDeal.Proposal.PieceCID != p.Piece.PieceCID {
failed[i] = xerrors.Errorf("recovered piece (%d) deal in sector %d (dealid %d) has different PieceCID %s != %s", i, sector.SectorNumber, p.DealInfo.DealID, p.Piece.PieceCID, res.MarketDeal.Proposal.PieceCID)
continue
}

updates[i] = res.DealID
}

Expand All @@ -409,7 +414,11 @@ func (m *Sealing) handleRecoverDealIDs(ctx statemachine.Context, sector SectorIn
}

// todo: try to remove bad pieces (hard; see the todo above)
return xerrors.Errorf("failed to recover some deals: %w", merr)

// for now removing sectors is probably better than having them stuck in RecoverDealIDs
// and expire anyways
log.Errorf("removing sector %d: deals expired or unrecoverable: %+v", sector.SectorNumber, merr)
return ctx.Send(SectorRemove{})
}

// Not much to do here, we can't go back in time to commit this sector
Expand Down
Loading