diff --git a/api/api_full.go b/api/api_full.go index 2d8a4e5150b..7f3d7b093f4 100644 --- a/api/api_full.go +++ b/api/api_full.go @@ -323,7 +323,7 @@ type FullNode interface { StateMinerAvailableBalance(context.Context, address.Address, types.TipSetKey) (types.BigInt, error) // StateSectorPreCommitInfo returns the PreCommit info for the specified miner's sector StateSectorPreCommitInfo(context.Context, address.Address, abi.SectorNumber, types.TipSetKey) (miner.SectorPreCommitOnChainInfo, error) - // StateSectorGetInfo returns the on-chain info for the specified miner's sector + // StateSectorGetInfo returns the on-chain info for the specified miner's sector. Returns null in case the sector info isn't found // NOTE: returned info.Expiration may not be accurate in some cases, use StateSectorExpiration to get accurate // expiration epoch StateSectorGetInfo(context.Context, address.Address, abi.SectorNumber, types.TipSetKey) (*miner.SectorOnChainInfo, error) diff --git a/api/api_storage.go b/api/api_storage.go index 65ecf12eaa4..14aa5ff971d 100644 --- a/api/api_storage.go +++ b/api/api_storage.go @@ -120,16 +120,18 @@ type SectorLog struct { } type SectorInfo struct { - SectorID abi.SectorNumber - State SectorState - CommD *cid.Cid - CommR *cid.Cid - Proof []byte - Deals []abi.DealID - Ticket SealTicket - Seed SealSeed - Retries uint64 - ToUpgrade bool + SectorID abi.SectorNumber + State SectorState + CommD *cid.Cid + CommR *cid.Cid + Proof []byte + Deals []abi.DealID + Ticket SealTicket + Seed SealSeed + PreCommitMsg *cid.Cid + CommitMsg *cid.Cid + Retries uint64 + ToUpgrade bool LastErr string diff --git a/chain/beacon/beacon.go b/chain/beacon/beacon.go index 7b998c04f05..23b062beabd 100644 --- a/chain/beacon/beacon.go +++ b/chain/beacon/beacon.go @@ -37,6 +37,10 @@ func ValidateBlockValues(b RandomBeacon, h *types.BlockHeader, prevEntry types.B return nil } + if len(h.BeaconEntries) == 0 { + return xerrors.Errorf("expected to have beacon entries in this block, but didn't find any") + } + last := h.BeaconEntries[len(h.BeaconEntries)-1] if last.Round != maxRound { return xerrors.Errorf("expected final beacon entry in block to be at round %d, got %d", maxRound, last.Round) diff --git a/chain/stmgr/utils.go b/chain/stmgr/utils.go index 098021f6f35..a95374b6938 100644 --- a/chain/stmgr/utils.go +++ b/chain/stmgr/utils.go @@ -157,7 +157,7 @@ func MinerSectorInfo(ctx context.Context, sm *StateManager, maddr address.Addres return nil, err } if !ok { - return nil, xerrors.New("sector not found") + return nil, nil } return sectorInfo, nil diff --git a/cmd/lotus-storage-miner/info.go b/cmd/lotus-storage-miner/info.go index 3118e6afa94..dbb93c9726b 100644 --- a/cmd/lotus-storage-miner/info.go +++ b/cmd/lotus-storage-miner/info.go @@ -236,8 +236,10 @@ var stateList = []stateMeta{ {col: 39, state: "Total"}, {col: color.FgGreen, state: sealing.Proving}, + {col: color.FgBlue, state: sealing.Empty}, + {col: color.FgBlue, state: sealing.WaitDeals}, + {col: color.FgRed, state: sealing.UndefinedSectorState}, - {col: color.FgYellow, state: sealing.Empty}, {col: color.FgYellow, state: sealing.Packing}, {col: color.FgYellow, state: sealing.PreCommit1}, {col: color.FgYellow, state: sealing.PreCommit2}, @@ -245,9 +247,13 @@ var stateList = []stateMeta{ {col: color.FgYellow, state: sealing.PreCommitWait}, {col: color.FgYellow, state: sealing.WaitSeed}, {col: color.FgYellow, state: sealing.Committing}, + {col: color.FgYellow, state: sealing.SubmitCommit}, {col: color.FgYellow, state: sealing.CommitWait}, {col: color.FgYellow, state: sealing.FinalizeSector}, + {col: color.FgCyan, state: sealing.Removing}, + {col: color.FgCyan, state: sealing.Removed}, + {col: color.FgRed, state: sealing.FailedUnrecoverable}, {col: color.FgRed, state: sealing.SealPreCommit1Failed}, {col: color.FgRed, state: sealing.SealPreCommit2Failed}, @@ -259,6 +265,9 @@ var stateList = []stateMeta{ {col: color.FgRed, state: sealing.Faulty}, {col: color.FgRed, state: sealing.FaultReported}, {col: color.FgRed, state: sealing.FaultedFinal}, + {col: color.FgRed, state: sealing.RemoveFailed}, + {col: color.FgRed, state: sealing.DealsExpired}, + {col: color.FgRed, state: sealing.RecoverDealIDs}, } func init() { diff --git a/cmd/lotus-storage-miner/sealing.go b/cmd/lotus-storage-miner/sealing.go index 2f966dcca9f..5cc5c419af0 100644 --- a/cmd/lotus-storage-miner/sealing.go +++ b/cmd/lotus-storage-miner/sealing.go @@ -155,6 +155,9 @@ var sealingJobsCmd = &cli.Command{ // oldest first sort.Slice(lines, func(i, j int) bool { + if lines[i].RunWait != lines[j].RunWait { + return lines[i].RunWait < lines[j].RunWait + } return lines[i].Start.Before(lines[j].Start) }) @@ -170,10 +173,14 @@ var sealingJobsCmd = &cli.Command{ } tw := tabwriter.NewWriter(os.Stdout, 2, 4, 2, ' ', 0) - _, _ = fmt.Fprintf(tw, "ID\tSector\tWorker\tHostname\tTask\tTime\n") + _, _ = fmt.Fprintf(tw, "ID\tSector\tWorker\tHostname\tTask\tState\tTime\n") for _, l := range lines { - _, _ = fmt.Fprintf(tw, "%d\t%d\t%d\t%s\t%s\t%s\n", l.ID, l.Sector.Number, l.wid, workerHostnames[l.wid], l.Task.Short(), time.Now().Sub(l.Start).Truncate(time.Millisecond*100)) + state := "running" + if l.RunWait != 0 { + state = fmt.Sprintf("assigned(%d)", l.RunWait-1) + } + _, _ = fmt.Fprintf(tw, "%d\t%d\t%d\t%s\t%s\t%s\t%s\n", l.ID, l.Sector.Number, l.wid, workerHostnames[l.wid], l.Task.Short(), state, time.Now().Sub(l.Start).Truncate(time.Millisecond*100)) } return tw.Flush() diff --git a/cmd/lotus-storage-miner/sectors.go b/cmd/lotus-storage-miner/sectors.go index e1815572e5f..8bd8ff6a5b9 100644 --- a/cmd/lotus-storage-miner/sectors.go +++ b/cmd/lotus-storage-miner/sectors.go @@ -97,6 +97,8 @@ var sectorsStatusCmd = &cli.Command{ fmt.Printf("TicketH:\t%d\n", status.Ticket.Epoch) fmt.Printf("Seed:\t\t%x\n", status.Seed.Value) fmt.Printf("SeedH:\t\t%d\n", status.Seed.Epoch) + fmt.Printf("Precommit:\t%s\n", status.PreCommitMsg) + fmt.Printf("Commit:\t\t%s\n", status.CommitMsg) fmt.Printf("Proof:\t\t%x\n", status.Proof) fmt.Printf("Deals:\t\t%v\n", status.Deals) fmt.Printf("Retries:\t%d\n", status.Retries) diff --git a/documentation/en/api-methods.md b/documentation/en/api-methods.md index bd395fc3e80..7f1d79be456 100644 --- a/documentation/en/api-methods.md +++ b/documentation/en/api-methods.md @@ -3603,7 +3603,7 @@ Response: ``` ### StateSectorGetInfo -StateSectorGetInfo returns the on-chain info for the specified miner's sector +StateSectorGetInfo returns the on-chain info for the specified miner's sector. Returns null in case the sector info isn't found NOTE: returned info.Expiration may not be accurate in some cases, use StateSectorExpiration to get accurate expiration epoch diff --git a/extern/sector-storage/manager.go b/extern/sector-storage/manager.go index b633850b36b..2d5741e6698 100644 --- a/extern/sector-storage/manager.go +++ b/extern/sector-storage/manager.go @@ -6,8 +6,7 @@ import ( "io" "net/http" - "github.com/filecoin-project/lotus/extern/sector-storage/fsutil" - + "github.com/hashicorp/go-multierror" "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log/v2" "github.com/mitchellh/go-homedir" @@ -17,6 +16,7 @@ import ( "github.com/filecoin-project/specs-storage/storage" "github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper" + "github.com/filecoin-project/lotus/extern/sector-storage/fsutil" "github.com/filecoin-project/lotus/extern/sector-storage/sealtasks" "github.com/filecoin-project/lotus/extern/sector-storage/stores" "github.com/filecoin-project/lotus/extern/sector-storage/storiface" @@ -463,25 +463,19 @@ func (m *Manager) Remove(ctx context.Context, sector abi.SectorID) error { return xerrors.Errorf("acquiring sector lock: %w", err) } - unsealed := stores.FTUnsealed - { - unsealedStores, err := m.index.StorageFindSector(ctx, sector, stores.FTUnsealed, 0, false) - if err != nil { - return xerrors.Errorf("finding unsealed sector: %w", err) - } + var err error - if len(unsealedStores) == 0 { // can be already removed - unsealed = stores.FTNone - } + if rerr := m.storage.Remove(ctx, sector, stores.FTSealed, true); rerr != nil { + err = multierror.Append(err, xerrors.Errorf("removing sector (sealed): %w", rerr)) + } + if rerr := m.storage.Remove(ctx, sector, stores.FTCache, true); rerr != nil { + err = multierror.Append(err, xerrors.Errorf("removing sector (cache): %w", rerr)) + } + if rerr := m.storage.Remove(ctx, sector, stores.FTUnsealed, true); rerr != nil { + err = multierror.Append(err, xerrors.Errorf("removing sector (unsealed): %w", rerr)) } - selector := newExistingSelector(m.index, sector, stores.FTCache|stores.FTSealed, false) - - return m.sched.Schedule(ctx, sector, sealtasks.TTFinalize, selector, - schedFetch(sector, stores.FTCache|stores.FTSealed|unsealed, stores.PathStorage, stores.AcquireMove), - func(ctx context.Context, w Worker) error { - return w.Remove(ctx, sector) - }) + return err } func (m *Manager) StorageLocal(ctx context.Context) (map[stores.ID]string, error) { diff --git a/extern/sector-storage/request_queue.go b/extern/sector-storage/request_queue.go index 85d3abf4677..9247ce24a9a 100644 --- a/extern/sector-storage/request_queue.go +++ b/extern/sector-storage/request_queue.go @@ -7,6 +7,11 @@ type requestQueue []*workerRequest func (q requestQueue) Len() int { return len(q) } func (q requestQueue) Less(i, j int) bool { + oneMuchLess, muchLess := q[i].taskType.MuchLess(q[j].taskType) + if oneMuchLess { + return muchLess + } + if q[i].priority != q[j].priority { return q[i].priority > q[j].priority } diff --git a/extern/sector-storage/resources.go b/extern/sector-storage/resources.go index d2c5646fae5..2fa7972673a 100644 --- a/extern/sector-storage/resources.go +++ b/extern/sector-storage/resources.go @@ -22,17 +22,17 @@ func (r Resources) MultiThread() bool { var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources{ sealtasks.TTAddPiece: { - abi.RegisteredSealProof_StackedDrg64GiBV1: Resources{ // This is probably a bit conservative - MaxMemory: 64 << 30, - MinMemory: 64 << 30, + abi.RegisteredSealProof_StackedDrg64GiBV1: Resources{ + MaxMemory: 8 << 30, + MinMemory: 8 << 30, Threads: 1, BaseMinMemory: 1 << 30, }, - abi.RegisteredSealProof_StackedDrg32GiBV1: Resources{ // This is probably a bit conservative - MaxMemory: 32 << 30, - MinMemory: 32 << 30, + abi.RegisteredSealProof_StackedDrg32GiBV1: Resources{ + MaxMemory: 4 << 30, + MinMemory: 4 << 30, Threads: 1, diff --git a/extern/sector-storage/sched.go b/extern/sector-storage/sched.go index 7a4b6f9efcc..16e51f9a66d 100644 --- a/extern/sector-storage/sched.go +++ b/extern/sector-storage/sched.go @@ -21,6 +21,7 @@ type schedPrioCtxKey int var SchedPriorityKey schedPrioCtxKey var DefaultSchedPriority = 0 var SelectorTimeout = 5 * time.Second +var InitWait = 3 * time.Second var ( SchedWindows = 2 @@ -85,6 +86,9 @@ type workerHandle struct { lk sync.Mutex + wndLk sync.Mutex + activeWindows []*schedWindow + // stats / tracking wt *workTracker @@ -123,6 +127,8 @@ type workerRequest struct { prepare WorkerAction work WorkerAction + start time.Time + index int // The index of the item in the heap. indexHeap int @@ -147,7 +153,7 @@ func newScheduler(spt abi.RegisteredSealProof) *scheduler { workerClosing: make(chan WorkerID), schedule: make(chan *workerRequest), - windowRequests: make(chan *schedWindowRequest), + windowRequests: make(chan *schedWindowRequest, 20), schedQueue: &requestQueue{}, @@ -171,6 +177,8 @@ func (sh *scheduler) Schedule(ctx context.Context, sector abi.SectorID, taskType prepare: prepare, work: work, + start: time.Now(), + ret: ret, ctx: ctx, }: @@ -214,7 +222,12 @@ func (sh *scheduler) runSched() { go sh.runWorkerWatcher() + iw := time.After(InitWait) + var initialised bool + for { + var doSched bool + select { case w := <-sh.newWorkers: sh.newWorker(w) @@ -224,22 +237,47 @@ func (sh *scheduler) runSched() { case req := <-sh.schedule: sh.schedQueue.Push(req) - sh.trySched() + doSched = true if sh.testSync != nil { sh.testSync <- struct{}{} } case req := <-sh.windowRequests: sh.openWindows = append(sh.openWindows, req) - sh.trySched() - + doSched = true case ireq := <-sh.info: ireq(sh.diag()) + case <-iw: + initialised = true + iw = nil + doSched = true case <-sh.closing: sh.schedClose() return } + + if doSched && initialised { + // First gather any pending tasks, so we go through the scheduling loop + // once for every added task + loop: + for { + select { + case req := <-sh.schedule: + sh.schedQueue.Push(req) + if sh.testSync != nil { + sh.testSync <- struct{}{} + } + case req := <-sh.windowRequests: + sh.openWindows = append(sh.openWindows, req) + default: + break loop + } + } + + sh.trySched() + } + } } @@ -321,7 +359,7 @@ func (sh *scheduler) trySched() { } // TODO: allow bigger windows - if !windows[wnd].allocated.canHandleRequest(needRes, windowRequest.worker, worker.info.Resources) { + if !windows[wnd].allocated.canHandleRequest(needRes, windowRequest.worker, "schedAcceptable", worker.info.Resources) { continue } @@ -392,11 +430,11 @@ func (sh *scheduler) trySched() { log.Debugf("SCHED try assign sqi:%d sector %d to window %d", sqi, task.sector.Number, wnd) // TODO: allow bigger windows - if !windows[wnd].allocated.canHandleRequest(needRes, wid, wr) { + if !windows[wnd].allocated.canHandleRequest(needRes, wid, "schedAssign", wr) { continue } - log.Debugf("SCHED ASSIGNED sqi:%d sector %d to window %d", sqi, task.sector.Number, wnd) + log.Debugf("SCHED ASSIGNED sqi:%d sector %d task %s to window %d", sqi, task.sector.Number, task.taskType, wnd) windows[wnd].allocated.add(wr, needRes) @@ -475,8 +513,6 @@ func (sh *scheduler) runWorker(wid WorkerID) { taskDone := make(chan struct{}, 1) windowsRequested := 0 - var activeWindows []*schedWindow - ctx, cancel := context.WithCancel(context.TODO()) defer cancel() @@ -510,7 +546,9 @@ func (sh *scheduler) runWorker(wid WorkerID) { select { case w := <-scheduledWindows: - activeWindows = append(activeWindows, w) + worker.wndLk.Lock() + worker.activeWindows = append(worker.activeWindows, w) + worker.wndLk.Unlock() case <-taskDone: log.Debugw("task done", "workerid", wid) case <-sh.closing: @@ -521,24 +559,38 @@ func (sh *scheduler) runWorker(wid WorkerID) { return } + worker.wndLk.Lock() + + windowsRequested -= sh.workerCompactWindows(worker, wid) + assignLoop: // process windows in order - for len(activeWindows) > 0 { - // process tasks within a window in order - for len(activeWindows[0].todo) > 0 { - todo := activeWindows[0].todo[0] - needRes := ResourceTable[todo.taskType][sh.spt] + for len(worker.activeWindows) > 0 { + firstWindow := worker.activeWindows[0] + // process tasks within a window, preferring tasks at lower indexes + for len(firstWindow.todo) > 0 { sh.workersLk.RLock() + + tidx := -1 + worker.lk.Lock() - ok := worker.preparing.canHandleRequest(needRes, wid, worker.info.Resources) + for t, todo := range firstWindow.todo { + needRes := ResourceTable[todo.taskType][sh.spt] + if worker.preparing.canHandleRequest(needRes, wid, "startPreparing", worker.info.Resources) { + tidx = t + break + } + } worker.lk.Unlock() - if !ok { + if tidx == -1 { sh.workersLk.RUnlock() break assignLoop } + todo := firstWindow.todo[tidx] + log.Debugf("assign worker sector %d", todo.sector.Number) err := sh.assignWorker(taskDone, wid, worker, todo) sh.workersLk.RUnlock() @@ -548,19 +600,76 @@ func (sh *scheduler) runWorker(wid WorkerID) { go todo.respond(xerrors.Errorf("assignWorker error: %w", err)) } - activeWindows[0].todo = activeWindows[0].todo[1:] + // Note: we're not freeing window.allocated resources here very much on purpose + copy(firstWindow.todo[tidx:], firstWindow.todo[tidx+1:]) + firstWindow.todo[len(firstWindow.todo)-1] = nil + firstWindow.todo = firstWindow.todo[:len(firstWindow.todo)-1] } - copy(activeWindows, activeWindows[1:]) - activeWindows[len(activeWindows)-1] = nil - activeWindows = activeWindows[:len(activeWindows)-1] + copy(worker.activeWindows, worker.activeWindows[1:]) + worker.activeWindows[len(worker.activeWindows)-1] = nil + worker.activeWindows = worker.activeWindows[:len(worker.activeWindows)-1] windowsRequested-- } + + worker.wndLk.Unlock() } }() } +func (sh *scheduler) workerCompactWindows(worker *workerHandle, wid WorkerID) int { + // move tasks from older windows to newer windows if older windows + // still can fit them + if len(worker.activeWindows) > 1 { + for wi, window := range worker.activeWindows[1:] { + lower := worker.activeWindows[wi] + var moved []int + + for ti, todo := range window.todo { + needRes := ResourceTable[todo.taskType][sh.spt] + if !lower.allocated.canHandleRequest(needRes, wid, "compactWindows", worker.info.Resources) { + continue + } + + moved = append(moved, ti) + lower.todo = append(lower.todo, todo) + lower.allocated.add(worker.info.Resources, needRes) + window.allocated.free(worker.info.Resources, needRes) + } + + if len(moved) > 0 { + newTodo := make([]*workerRequest, 0, len(window.todo)-len(moved)) + for i, t := range window.todo { + if moved[0] == i { + moved = moved[1:] + continue + } + + newTodo = append(newTodo, t) + } + window.todo = newTodo + } + } + } + + var compacted int + var newWindows []*schedWindow + + for _, window := range worker.activeWindows { + if len(window.todo) == 0 { + compacted++ + continue + } + + newWindows = append(newWindows, window) + } + + worker.activeWindows = newWindows + + return compacted +} + func (sh *scheduler) assignWorker(taskDone chan struct{}, wid WorkerID, w *workerHandle, req *workerRequest) error { needRes := ResourceTable[req.taskType][sh.spt] diff --git a/extern/sector-storage/sched_resources.go b/extern/sector-storage/sched_resources.go index f468d5fe918..92a3b32adfe 100644 --- a/extern/sector-storage/sched_resources.go +++ b/extern/sector-storage/sched_resources.go @@ -7,7 +7,7 @@ import ( ) func (a *activeResources) withResources(id WorkerID, wr storiface.WorkerResources, r Resources, locker sync.Locker, cb func() error) error { - for !a.canHandleRequest(r, id, wr) { + for !a.canHandleRequest(r, id, "withResources", wr) { if a.cond == nil { a.cond = sync.NewCond(locker) } @@ -52,37 +52,37 @@ func (a *activeResources) free(wr storiface.WorkerResources, r Resources) { a.memUsedMax -= r.MaxMemory } -func (a *activeResources) canHandleRequest(needRes Resources, wid WorkerID, res storiface.WorkerResources) bool { +func (a *activeResources) canHandleRequest(needRes Resources, wid WorkerID, caller string, res storiface.WorkerResources) bool { // TODO: dedupe needRes.BaseMinMemory per task type (don't add if that task is already running) minNeedMem := res.MemReserved + a.memUsedMin + needRes.MinMemory + needRes.BaseMinMemory if minNeedMem > res.MemPhysical { - log.Debugf("sched: not scheduling on worker %d; not enough physical memory - need: %dM, have %dM", wid, minNeedMem/mib, res.MemPhysical/mib) + log.Debugf("sched: not scheduling on worker %d for %s; not enough physical memory - need: %dM, have %dM", wid, caller, minNeedMem/mib, res.MemPhysical/mib) return false } maxNeedMem := res.MemReserved + a.memUsedMax + needRes.MaxMemory + needRes.BaseMinMemory if maxNeedMem > res.MemSwap+res.MemPhysical { - log.Debugf("sched: not scheduling on worker %d; not enough virtual memory - need: %dM, have %dM", wid, maxNeedMem/mib, (res.MemSwap+res.MemPhysical)/mib) + log.Debugf("sched: not scheduling on worker %d for %s; not enough virtual memory - need: %dM, have %dM", wid, caller, maxNeedMem/mib, (res.MemSwap+res.MemPhysical)/mib) return false } if needRes.MultiThread() { if a.cpuUse > 0 { - log.Debugf("sched: not scheduling on worker %d; multicore process needs %d threads, %d in use, target %d", wid, res.CPUs, a.cpuUse, res.CPUs) + log.Debugf("sched: not scheduling on worker %d for %s; multicore process needs %d threads, %d in use, target %d", wid, caller, res.CPUs, a.cpuUse, res.CPUs) return false } } else { if a.cpuUse+uint64(needRes.Threads) > res.CPUs { - log.Debugf("sched: not scheduling on worker %d; not enough threads, need %d, %d in use, target %d", wid, needRes.Threads, a.cpuUse, res.CPUs) + log.Debugf("sched: not scheduling on worker %d for %s; not enough threads, need %d, %d in use, target %d", wid, caller, needRes.Threads, a.cpuUse, res.CPUs) return false } } if len(res.GPUs) > 0 && needRes.CanGPU { if a.gpuUsed { - log.Debugf("sched: not scheduling on worker %d; GPU in use", wid) + log.Debugf("sched: not scheduling on worker %d for %s; GPU in use", wid, caller) return false } } diff --git a/extern/sector-storage/sched_test.go b/extern/sector-storage/sched_test.go index fcfe891e7a2..4aa1dc49be0 100644 --- a/extern/sector-storage/sched_test.go +++ b/extern/sector-storage/sched_test.go @@ -5,6 +5,7 @@ import ( "fmt" "io" "runtime" + "sort" "sync" "testing" "time" @@ -22,6 +23,10 @@ import ( "github.com/filecoin-project/specs-storage/storage" ) +func init() { + InitWait = 10 * time.Millisecond +} + func TestWithPriority(t *testing.T) { ctx := context.Background() @@ -301,7 +306,8 @@ func TestSched(t *testing.T) { done: map[string]chan struct{}{}, } - for _, task := range tasks { + for i, task := range tasks { + log.Info("TASK", i) task(t, sched, index, &rm) } @@ -415,6 +421,45 @@ func TestSched(t *testing.T) { ) } + diag := func() task { + return func(t *testing.T, s *scheduler, index *stores.Index, meta *runMeta) { + time.Sleep(20 * time.Millisecond) + for _, request := range s.diag().Requests { + log.Infof("!!! sDIAG: sid(%d) task(%s)", request.Sector.Number, request.TaskType) + } + + wj := (&Manager{sched: s}).WorkerJobs() + + type line struct { + storiface.WorkerJob + wid uint64 + } + + lines := make([]line, 0) + + for wid, jobs := range wj { + for _, job := range jobs { + lines = append(lines, line{ + WorkerJob: job, + wid: wid, + }) + } + } + + // oldest first + sort.Slice(lines, func(i, j int) bool { + if lines[i].RunWait != lines[j].RunWait { + return lines[i].RunWait < lines[j].RunWait + } + return lines[i].Start.Before(lines[j].Start) + }) + + for _, l := range lines { + log.Infof("!!! wDIAG: rw(%d) sid(%d) t(%s)", l.RunWait, l.Sector.Number, l.Task) + } + } + } + // run this one a bunch of times, it had a very annoying tendency to fail randomly for i := 0; i < 40; i++ { t.Run("pc1-pc2-prio", testFunc([]workerSpec{ @@ -423,6 +468,8 @@ func TestSched(t *testing.T) { // fill queues twoPC1("w0", 0, taskStarted), twoPC1("w1", 2, taskNotScheduled), + sched("w2", "fred", 4, sealtasks.TTPreCommit1), + taskNotScheduled("w2"), // windowed @@ -435,10 +482,18 @@ func TestSched(t *testing.T) { sched("t3", "fred", 10, sealtasks.TTPreCommit2), taskNotScheduled("t3"), + diag(), + twoPC1Act("w0", taskDone), twoPC1Act("w1", taskStarted), + taskNotScheduled("w2"), twoPC1Act("w1", taskDone), + taskStarted("w2"), + + taskDone("w2"), + + diag(), taskStarted("t3"), taskNotScheduled("t1"), @@ -518,3 +573,68 @@ func BenchmarkTrySched(b *testing.B) { b.Run("1w-500q", test(1, 500)) b.Run("200w-400q", test(200, 400)) } + +func TestWindowCompact(t *testing.T) { + sh := scheduler{ + spt: abi.RegisteredSealProof_StackedDrg32GiBV1, + } + + test := func(start [][]sealtasks.TaskType, expect [][]sealtasks.TaskType) func(t *testing.T) { + return func(t *testing.T) { + wh := &workerHandle{ + info: storiface.WorkerInfo{ + Resources: decentWorkerResources, + }, + } + + for _, windowTasks := range start { + window := &schedWindow{} + + for _, task := range windowTasks { + window.todo = append(window.todo, &workerRequest{taskType: task}) + window.allocated.add(wh.info.Resources, ResourceTable[task][sh.spt]) + } + + wh.activeWindows = append(wh.activeWindows, window) + } + + n := sh.workerCompactWindows(wh, 0) + require.Equal(t, len(start)-len(expect), n) + + for wi, tasks := range expect { + var expectRes activeResources + + for ti, task := range tasks { + require.Equal(t, task, wh.activeWindows[wi].todo[ti].taskType, "%d, %d", wi, ti) + expectRes.add(wh.info.Resources, ResourceTable[task][sh.spt]) + } + + require.Equal(t, expectRes.cpuUse, wh.activeWindows[wi].allocated.cpuUse, "%d", wi) + require.Equal(t, expectRes.gpuUsed, wh.activeWindows[wi].allocated.gpuUsed, "%d", wi) + require.Equal(t, expectRes.memUsedMin, wh.activeWindows[wi].allocated.memUsedMin, "%d", wi) + require.Equal(t, expectRes.memUsedMax, wh.activeWindows[wi].allocated.memUsedMax, "%d", wi) + } + + } + } + + t.Run("2-pc1-windows", test( + [][]sealtasks.TaskType{{sealtasks.TTPreCommit1}, {sealtasks.TTPreCommit1}}, + [][]sealtasks.TaskType{{sealtasks.TTPreCommit1, sealtasks.TTPreCommit1}}), + ) + + t.Run("1-window", test( + [][]sealtasks.TaskType{{sealtasks.TTPreCommit1, sealtasks.TTPreCommit1}}, + [][]sealtasks.TaskType{{sealtasks.TTPreCommit1, sealtasks.TTPreCommit1}}), + ) + + t.Run("2-pc2-windows", test( + [][]sealtasks.TaskType{{sealtasks.TTPreCommit2}, {sealtasks.TTPreCommit2}}, + [][]sealtasks.TaskType{{sealtasks.TTPreCommit2}, {sealtasks.TTPreCommit2}}), + ) + + t.Run("2pc1-pc1ap", test( + [][]sealtasks.TaskType{{sealtasks.TTPreCommit1, sealtasks.TTPreCommit1}, {sealtasks.TTPreCommit1, sealtasks.TTAddPiece}}, + [][]sealtasks.TaskType{{sealtasks.TTPreCommit1, sealtasks.TTPreCommit1, sealtasks.TTAddPiece}, {sealtasks.TTPreCommit1}}), + ) +} diff --git a/extern/sector-storage/sealtasks/task.go b/extern/sector-storage/sealtasks/task.go index ad5ce01bb7b..4174373a6cf 100644 --- a/extern/sector-storage/sealtasks/task.go +++ b/extern/sector-storage/sealtasks/task.go @@ -17,15 +17,15 @@ const ( ) var order = map[TaskType]int{ - TTAddPiece: 7, - TTPreCommit1: 6, - TTPreCommit2: 5, - TTCommit2: 4, - TTCommit1: 3, - TTFetch: 2, - TTFinalize: 1, - TTUnseal: 0, - TTReadUnsealed: 0, + TTAddPiece: 6, // least priority + TTPreCommit1: 5, + TTPreCommit2: 4, + TTCommit2: 3, + TTCommit1: 2, + TTUnseal: 1, + TTFetch: -1, + TTReadUnsealed: -1, + TTFinalize: -2, // most priority } var shortNames = map[TaskType]string{ @@ -43,6 +43,12 @@ var shortNames = map[TaskType]string{ TTReadUnsealed: "RD ", } +func (a TaskType) MuchLess(b TaskType) (bool, bool) { + oa, ob := order[a], order[b] + oneNegative := oa^ob < 0 + return oneNegative, oa < ob +} + func (a TaskType) Less(b TaskType) bool { return order[a] < order[b] } diff --git a/extern/sector-storage/stats.go b/extern/sector-storage/stats.go index 9abbdb83aab..7f95e3bc37d 100644 --- a/extern/sector-storage/stats.go +++ b/extern/sector-storage/stats.go @@ -1,6 +1,8 @@ package sectorstorage -import "github.com/filecoin-project/lotus/extern/sector-storage/storiface" +import ( + "github.com/filecoin-project/lotus/extern/sector-storage/storiface" +) func (m *Manager) WorkerStats() map[uint64]storiface.WorkerStats { m.sched.workersLk.RLock() @@ -29,6 +31,20 @@ func (m *Manager) WorkerJobs() map[uint64][]storiface.WorkerJob { for id, handle := range m.sched.workers { out[uint64(id)] = handle.wt.Running() + + handle.wndLk.Lock() + for wi, window := range handle.activeWindows { + for _, request := range window.todo { + out[uint64(id)] = append(out[uint64(id)], storiface.WorkerJob{ + ID: 0, + Sector: request.sector, + Task: request.taskType, + RunWait: wi + 1, + Start: request.start, + }) + } + } + handle.wndLk.Unlock() } return out diff --git a/extern/sector-storage/storiface/worker.go b/extern/sector-storage/storiface/worker.go index 37c44703100..37e4aad1d02 100644 --- a/extern/sector-storage/storiface/worker.go +++ b/extern/sector-storage/storiface/worker.go @@ -37,5 +37,6 @@ type WorkerJob struct { Sector abi.SectorID Task sealtasks.TaskType - Start time.Time + RunWait int // 0 - running, 1+ - assigned + Start time.Time } diff --git a/extern/storage-sealing/cbor_gen.go b/extern/storage-sealing/cbor_gen.go index 5a513dbdb39..b0762618768 100644 --- a/extern/storage-sealing/cbor_gen.go +++ b/extern/storage-sealing/cbor_gen.go @@ -135,12 +135,34 @@ func (t *DealInfo) MarshalCBOR(w io.Writer) error { _, err := w.Write(cbg.CborNull) return err } - if _, err := w.Write([]byte{163}); err != nil { + if _, err := w.Write([]byte{164}); err != nil { return err } scratch := make([]byte, 9) + // t.PublishCid (cid.Cid) (struct) + if len("PublishCid") > cbg.MaxLength { + return xerrors.Errorf("Value in field \"PublishCid\" was too long") + } + + if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len("PublishCid"))); err != nil { + return err + } + if _, err := io.WriteString(w, string("PublishCid")); err != nil { + return err + } + + if t.PublishCid == nil { + if _, err := w.Write(cbg.CborNull); err != nil { + return err + } + } else { + if err := cbg.WriteCidBuf(scratch, w, *t.PublishCid); err != nil { + return xerrors.Errorf("failed to write cid field t.PublishCid: %w", err) + } + } + // t.DealID (abi.DealID) (uint64) if len("DealID") > cbg.MaxLength { return xerrors.Errorf("Value in field \"DealID\" was too long") @@ -224,7 +246,30 @@ func (t *DealInfo) UnmarshalCBOR(r io.Reader) error { } switch name { - // t.DealID (abi.DealID) (uint64) + // t.PublishCid (cid.Cid) (struct) + case "PublishCid": + + { + + b, err := br.ReadByte() + if err != nil { + return err + } + if b != cbg.CborNull[0] { + if err := br.UnreadByte(); err != nil { + return err + } + + c, err := cbg.ReadCid(br) + if err != nil { + return xerrors.Errorf("failed to read cid field t.PublishCid: %w", err) + } + + t.PublishCid = &c + } + + } + // t.DealID (abi.DealID) (uint64) case "DealID": { @@ -430,7 +475,7 @@ func (t *SectorInfo) MarshalCBOR(w io.Writer) error { _, err := w.Write(cbg.CborNull) return err } - if _, err := w.Write([]byte{182}); err != nil { + if _, err := w.Write([]byte{183}); err != nil { return err } @@ -860,6 +905,29 @@ func (t *SectorInfo) MarshalCBOR(w io.Writer) error { } } + // t.Return (sealing.ReturnState) (string) + if len("Return") > cbg.MaxLength { + return xerrors.Errorf("Value in field \"Return\" was too long") + } + + if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len("Return"))); err != nil { + return err + } + if _, err := io.WriteString(w, string("Return")); err != nil { + return err + } + + if len(t.Return) > cbg.MaxLength { + return xerrors.Errorf("Value in field t.Return was too long") + } + + if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len(t.Return))); err != nil { + return err + } + if _, err := io.WriteString(w, string(t.Return)); err != nil { + return err + } + // t.LastErr (string) (string) if len("LastErr") > cbg.MaxLength { return xerrors.Errorf("Value in field \"LastErr\" was too long") @@ -1362,6 +1430,17 @@ func (t *SectorInfo) UnmarshalCBOR(r io.Reader) error { } } + // t.Return (sealing.ReturnState) (string) + case "Return": + + { + sval, err := cbg.ReadStringBuf(br, scratch) + if err != nil { + return err + } + + t.Return = ReturnState(sval) + } // t.LastErr (string) (string) case "LastErr": diff --git a/extern/storage-sealing/checks.go b/extern/storage-sealing/checks.go index b415c6f8eb0..3a59ea05948 100644 --- a/extern/storage-sealing/checks.go +++ b/extern/storage-sealing/checks.go @@ -33,7 +33,7 @@ type ErrInvalidProof struct{ error } type ErrNoPrecommit struct{ error } type ErrCommitWaitFailed struct{ error } -func checkPieces(ctx context.Context, si SectorInfo, api SealingAPI) error { +func checkPieces(ctx context.Context, maddr address.Address, si SectorInfo, api SealingAPI) error { tok, height, err := api.ChainHead(ctx) if err != nil { return &ErrApi{xerrors.Errorf("getting chain head: %w", err)} @@ -55,6 +55,10 @@ func checkPieces(ctx context.Context, si SectorInfo, api SealingAPI) error { return &ErrInvalidDeals{xerrors.Errorf("getting deal %d for piece %d: %w", p.DealInfo.DealID, i, err)} } + if proposal.Provider != maddr { + return &ErrInvalidDeals{xerrors.Errorf("piece %d (of %d) of sector %d refers deal %d with wrong provider: %s != %s", i, len(si.Pieces), si.SectorNumber, p.DealInfo.DealID, proposal.Provider, maddr)} + } + if proposal.PieceCID != p.Piece.PieceCID { return &ErrInvalidDeals{xerrors.Errorf("piece %d (of %d) of sector %d refers deal %d with wrong PieceCID: %x != %x", i, len(si.Pieces), si.SectorNumber, p.DealInfo.DealID, p.Piece.PieceCID, proposal.PieceCID)} } @@ -74,6 +78,10 @@ func checkPieces(ctx context.Context, si SectorInfo, api SealingAPI) error { // checkPrecommit checks that data commitment generated in the sealing process // matches pieces, and that the seal ticket isn't expired func checkPrecommit(ctx context.Context, maddr address.Address, si SectorInfo, tok TipSetToken, height abi.ChainEpoch, api SealingAPI) (err error) { + if err := checkPieces(ctx, maddr, si, api); err != nil { + return err + } + commD, err := api.StateComputeDataCommitment(ctx, maddr, si.SectorType, si.dealIDs(), tok) if err != nil { return &ErrApi{xerrors.Errorf("calling StateComputeDataCommitment: %w", err)} @@ -176,5 +184,9 @@ func (m *Sealing) checkCommit(ctx context.Context, si SectorInfo, proof []byte, return &ErrInvalidProof{xerrors.New("invalid proof (compute error?)")} } + if err := checkPieces(ctx, m.maddr, si, m.api); err != nil { + return err + } + return nil } diff --git a/extern/storage-sealing/fsm.go b/extern/storage-sealing/fsm.go index 82db5c15af0..dd0cbabfac4 100644 --- a/extern/storage-sealing/fsm.go +++ b/extern/storage-sealing/fsm.go @@ -17,9 +17,9 @@ import ( ) func (m *Sealing) Plan(events []statemachine.Event, user interface{}) (interface{}, uint64, error) { - next, err := m.plan(events, user.(*SectorInfo)) + next, processed, err := m.plan(events, user.(*SectorInfo)) if err != nil || next == nil { - return nil, uint64(len(events)), err + return nil, processed, err } return func(ctx statemachine.Context, si SectorInfo) error { @@ -30,10 +30,10 @@ func (m *Sealing) Plan(events []statemachine.Event, user interface{}) (interface } return nil - }, uint64(len(events)), nil // TODO: This processed event count is not very correct + }, processed, nil // TODO: This processed event count is not very correct } -var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *SectorInfo) error{ +var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *SectorInfo) (uint64, error){ // Sealing UndefinedSectorState: planOne( @@ -49,31 +49,39 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto PreCommit1: planOne( on(SectorPreCommit1{}, PreCommit2), on(SectorSealPreCommit1Failed{}, SealPreCommit1Failed), - on(SectorPackingFailed{}, PackingFailed), + on(SectorDealsExpired{}, DealsExpired), + on(SectorInvalidDealIDs{}, RecoverDealIDs), ), PreCommit2: planOne( on(SectorPreCommit2{}, PreCommitting), on(SectorSealPreCommit2Failed{}, SealPreCommit2Failed), - on(SectorPackingFailed{}, PackingFailed), ), PreCommitting: planOne( on(SectorSealPreCommit1Failed{}, SealPreCommit1Failed), on(SectorPreCommitted{}, PreCommitWait), on(SectorChainPreCommitFailed{}, PreCommitFailed), on(SectorPreCommitLanded{}, WaitSeed), + on(SectorDealsExpired{}, DealsExpired), + on(SectorInvalidDealIDs{}, RecoverDealIDs), ), PreCommitWait: planOne( on(SectorChainPreCommitFailed{}, PreCommitFailed), on(SectorPreCommitLanded{}, WaitSeed), + on(SectorRetryPreCommit{}, PreCommitting), ), WaitSeed: planOne( on(SectorSeedReady{}, Committing), on(SectorChainPreCommitFailed{}, PreCommitFailed), ), Committing: planCommitting, + SubmitCommit: planOne( + on(SectorCommitSubmitted{}, CommitWait), + on(SectorCommitFailed{}, CommitFailed), + ), CommitWait: planOne( on(SectorProving{}, FinalizeSector), on(SectorCommitFailed{}, CommitFailed), + on(SectorRetrySubmitCommit{}, SubmitCommit), ), FinalizeSector: planOne( @@ -95,6 +103,8 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto on(SectorRetryWaitSeed{}, WaitSeed), on(SectorSealPreCommit1Failed{}, SealPreCommit1Failed), on(SectorPreCommitLanded{}, WaitSeed), + on(SectorDealsExpired{}, DealsExpired), + on(SectorInvalidDealIDs{}, RecoverDealIDs), ), ComputeProofFailed: planOne( on(SectorRetryComputeProof{}, Committing), @@ -109,22 +119,33 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto on(SectorChainPreCommitFailed{}, PreCommitFailed), on(SectorRetryPreCommit{}, PreCommitting), on(SectorRetryCommitWait{}, CommitWait), + on(SectorDealsExpired{}, DealsExpired), + on(SectorInvalidDealIDs{}, RecoverDealIDs), ), FinalizeFailed: planOne( on(SectorRetryFinalize{}, FinalizeSector), ), + PackingFailed: planOne(), // TODO: Deprecated, remove + DealsExpired: planOne( + // SectorRemove (global) + ), + RecoverDealIDs: planOne( + onReturning(SectorUpdateDealIDs{}), + ), // Post-seal Proving: planOne( on(SectorFaultReported{}, FaultReported), on(SectorFaulty{}, Faulty), - on(SectorRemove{}, Removing), ), Removing: planOne( on(SectorRemoved{}, Removed), on(SectorRemoveFailed{}, RemoveFailed), ), + RemoveFailed: planOne( + // SectorRemove (global) + ), Faulty: planOne( on(SectorFaultReported{}, FaultReported), ), @@ -133,7 +154,7 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto Removed: final, } -func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(statemachine.Context, SectorInfo) error, error) { +func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(statemachine.Context, SectorInfo) error, uint64, error) { ///// // First process all events @@ -170,11 +191,12 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta p := fsmPlanners[state.State] if p == nil { - return nil, xerrors.Errorf("planner for state %s not found", state.State) + return nil, 0, xerrors.Errorf("planner for state %s not found", state.State) } - if err := p(events, state); err != nil { - return nil, xerrors.Errorf("running planner for state %s failed: %w", state.State, err) + processed, err := p(events, state) + if err != nil { + return nil, 0, xerrors.Errorf("running planner for state %s failed: %w", state.State, err) } ///// @@ -182,47 +204,50 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta /* - * Empty <- incoming deals - | | - | v - *<- WaitDeals <- incoming deals - | | - | v - *<- Packing <- incoming committed capacity - | | - | v - *<- PreCommit1 <--> SealPreCommit1Failed - | | ^ ^^ - | | *----------++----\ - | v v || | - *<- PreCommit2 --------++--> SealPreCommit2Failed - | | || - | v /-------/| - * PreCommitting <-----+---> PreCommitFailed - | | | ^ - | v | | - *<- WaitSeed -----------+-----/ - | ||| ^ | - | ||| \--------*-----/ - | ||| | - | vvv v----+----> ComputeProofFailed - *<- Committing | - | | ^--> CommitFailed - | v ^ - *<- CommitWait ---/ - | | - | v - | FinalizeSector <--> FinalizeFailed - | | - | v - *<- Proving - | - v - FailedUnrecoverable - - UndefinedSectorState <- ¯\_(ツ)_/¯ - | ^ - *---------------------/ + * Empty <- incoming deals + | | + | v + *<- WaitDeals <- incoming deals + | | + | v + *<- Packing <- incoming committed capacity + | | + | v + *<- PreCommit1 <--> SealPreCommit1Failed + | | ^ ^^ + | | *----------++----\ + | v v || | + *<- PreCommit2 --------++--> SealPreCommit2Failed + | | || + | v /-------/| + * PreCommitting <-----+---> PreCommitFailed + | | | ^ + | v | | + *<- WaitSeed -----------+-----/ + | ||| ^ | + | ||| \--------*-----/ + | ||| | + | vvv v----+----> ComputeProofFailed + *<- Committing | + | | ^--> CommitFailed + | v ^ + | SubmitCommit | + | | | + | v | + *<- CommitWait ---/ + | | + | v + | FinalizeSector <--> FinalizeFailed + | | + | v + *<- Proving + | + v + FailedUnrecoverable + + UndefinedSectorState <- ¯\_(ツ)_/¯ + | ^ + *---------------------/ */ @@ -235,51 +260,63 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta case WaitDeals: log.Infof("Waiting for deals %d", state.SectorNumber) case Packing: - return m.handlePacking, nil + return m.handlePacking, processed, nil case PreCommit1: - return m.handlePreCommit1, nil + return m.handlePreCommit1, processed, nil case PreCommit2: - return m.handlePreCommit2, nil + return m.handlePreCommit2, processed, nil case PreCommitting: - return m.handlePreCommitting, nil + return m.handlePreCommitting, processed, nil case PreCommitWait: - return m.handlePreCommitWait, nil + return m.handlePreCommitWait, processed, nil case WaitSeed: - return m.handleWaitSeed, nil + return m.handleWaitSeed, processed, nil case Committing: - return m.handleCommitting, nil + return m.handleCommitting, processed, nil + case SubmitCommit: + return m.handleSubmitCommit, processed, nil case CommitWait: - return m.handleCommitWait, nil + return m.handleCommitWait, processed, nil case FinalizeSector: - return m.handleFinalizeSector, nil + return m.handleFinalizeSector, processed, nil // Handled failure modes case SealPreCommit1Failed: - return m.handleSealPrecommit1Failed, nil + return m.handleSealPrecommit1Failed, processed, nil case SealPreCommit2Failed: - return m.handleSealPrecommit2Failed, nil + return m.handleSealPrecommit2Failed, processed, nil case PreCommitFailed: - return m.handlePreCommitFailed, nil + return m.handlePreCommitFailed, processed, nil case ComputeProofFailed: - return m.handleComputeProofFailed, nil + return m.handleComputeProofFailed, processed, nil case CommitFailed: - return m.handleCommitFailed, nil + return m.handleCommitFailed, processed, nil case FinalizeFailed: - return m.handleFinalizeFailed, nil + return m.handleFinalizeFailed, processed, nil + case PackingFailed: // DEPRECATED: remove this for the next reset + state.State = DealsExpired + fallthrough + case DealsExpired: + return m.handleDealsExpired, processed, nil + case RecoverDealIDs: + return m.handleRecoverDealIDs, processed, nil // Post-seal case Proving: - return m.handleProvingSector, nil + return m.handleProvingSector, processed, nil case Removing: - return m.handleRemoving, nil + return m.handleRemoving, processed, nil case Removed: - return nil, nil + return nil, processed, nil + + case RemoveFailed: + return m.handleRemoveFailed, processed, nil // Faults case Faulty: - return m.handleFaulty, nil + return m.handleFaulty, processed, nil case FaultReported: - return m.handleFaultReported, nil + return m.handleFaultReported, processed, nil // Fatal errors case UndefinedSectorState: @@ -290,28 +327,29 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta log.Errorf("unexpected sector update state: %s", state.State) } - return nil, nil + return nil, processed, nil } -func planCommitting(events []statemachine.Event, state *SectorInfo) error { - for _, event := range events { +func planCommitting(events []statemachine.Event, state *SectorInfo) (uint64, error) { + for i, event := range events { switch e := event.User.(type) { case globalMutator: if e.applyGlobal(state) { - return nil + return uint64(i + 1), nil } case SectorCommitted: // the normal case e.apply(state) - state.State = CommitWait + state.State = SubmitCommit case SectorSeedReady: // seed changed :/ if e.SeedEpoch == state.SeedEpoch && bytes.Equal(e.SeedValue, state.SeedValue) { log.Warnf("planCommitting: got SectorSeedReady, but the seed didn't change") continue // or it didn't! } + log.Warnf("planCommitting: commit Seed changed") e.apply(state) state.State = Committing - return nil + return uint64(i + 1), nil case SectorComputeProofFailed: state.State = ComputeProofFailed case SectorSealPreCommit1Failed: @@ -321,10 +359,10 @@ func planCommitting(events []statemachine.Event, state *SectorInfo) error { case SectorRetryCommitWait: state.State = CommitWait default: - return xerrors.Errorf("planCommitting got event of unknown type %T, events: %+v", event.User, events) + return uint64(i), xerrors.Errorf("planCommitting got event of unknown type %T, events: %+v", event.User, events) } } - return nil + return uint64(len(events)), nil } func (m *Sealing) restartSectors(ctx context.Context) error { @@ -365,31 +403,38 @@ func (m *Sealing) ForceSectorState(ctx context.Context, id abi.SectorNumber, sta return m.sectors.Send(id, SectorForceState{state}) } -func final(events []statemachine.Event, state *SectorInfo) error { - return xerrors.Errorf("didn't expect any events in state %s, got %+v", state.State, events) +func final(events []statemachine.Event, state *SectorInfo) (uint64, error) { + return 0, xerrors.Errorf("didn't expect any events in state %s, got %+v", state.State, events) } -func on(mut mutator, next SectorState) func() (mutator, SectorState) { - return func() (mutator, SectorState) { - return mut, next +func on(mut mutator, next SectorState) func() (mutator, func(*SectorInfo) error) { + return func() (mutator, func(*SectorInfo) error) { + return mut, func(state *SectorInfo) error { + state.State = next + return nil + } } } -func planOne(ts ...func() (mut mutator, next SectorState)) func(events []statemachine.Event, state *SectorInfo) error { - return func(events []statemachine.Event, state *SectorInfo) error { - if len(events) != 1 { - for _, event := range events { - if gm, ok := event.User.(globalMutator); ok { - gm.applyGlobal(state) - return nil - } +func onReturning(mut mutator) func() (mutator, func(*SectorInfo) error) { + return func() (mutator, func(*SectorInfo) error) { + return mut, func(state *SectorInfo) error { + if state.Return == "" { + return xerrors.Errorf("return state not set") } - return xerrors.Errorf("planner for state %s only has a plan for a single event only, got %+v", state.State, events) + + state.State = SectorState(state.Return) + state.Return = "" + return nil } + } +} +func planOne(ts ...func() (mut mutator, next func(*SectorInfo) error)) func(events []statemachine.Event, state *SectorInfo) (uint64, error) { + return func(events []statemachine.Event, state *SectorInfo) (uint64, error) { if gm, ok := events[0].User.(globalMutator); ok { gm.applyGlobal(state) - return nil + return 1, nil } for _, t := range ts { @@ -404,15 +449,14 @@ func planOne(ts ...func() (mut mutator, next SectorState)) func(events []statema } events[0].User.(mutator).apply(state) - state.State = next - return nil + return 1, next(state) } _, ok := events[0].User.(Ignorable) if ok { - return nil + return 1, nil } - return xerrors.Errorf("planner for state %s received unexpected event %T (%+v)", state.State, events[0].User, events[0]) + return 0, xerrors.Errorf("planner for state %s received unexpected event %T (%+v)", state.State, events[0].User, events[0]) } } diff --git a/extern/storage-sealing/fsm_events.go b/extern/storage-sealing/fsm_events.go index f270b3668f7..8649e6c5ea5 100644 --- a/extern/storage-sealing/fsm_events.go +++ b/extern/storage-sealing/fsm_events.go @@ -101,10 +101,6 @@ func (evt SectorPacked) apply(state *SectorInfo) { } } -type SectorPackingFailed struct{ error } - -func (evt SectorPackingFailed) apply(*SectorInfo) {} - type SectorPreCommit1 struct { PreCommit1Out storage.PreCommit1Out TicketValue abi.SealRandomness @@ -191,13 +187,28 @@ type SectorCommitFailed struct{ error } func (evt SectorCommitFailed) FormatError(xerrors.Printer) (next error) { return evt.error } func (evt SectorCommitFailed) apply(*SectorInfo) {} +type SectorRetrySubmitCommit struct{} + +func (evt SectorRetrySubmitCommit) apply(*SectorInfo) {} + +type SectorDealsExpired struct{ error } + +func (evt SectorDealsExpired) FormatError(xerrors.Printer) (next error) { return evt.error } +func (evt SectorDealsExpired) apply(*SectorInfo) {} + type SectorCommitted struct { - Message cid.Cid - Proof []byte + Proof []byte } func (evt SectorCommitted) apply(state *SectorInfo) { state.Proof = evt.Proof +} + +type SectorCommitSubmitted struct { + Message cid.Cid +} + +func (evt SectorCommitSubmitted) apply(state *SectorInfo) { state.CommitMessage = &evt.Message } @@ -256,6 +267,24 @@ type SectorRetryCommitWait struct{} func (evt SectorRetryCommitWait) apply(state *SectorInfo) {} +type SectorInvalidDealIDs struct { + Return ReturnState +} + +func (evt SectorInvalidDealIDs) apply(state *SectorInfo) { + state.Return = evt.Return +} + +type SectorUpdateDealIDs struct { + Updates map[int]abi.DealID +} + +func (evt SectorUpdateDealIDs) apply(state *SectorInfo) { + for i, id := range evt.Updates { + state.Pieces[i].DealInfo.DealID = id + } +} + // Faults type SectorFaulty struct{} @@ -274,7 +303,10 @@ type SectorFaultedFinal struct{} type SectorRemove struct{} -func (evt SectorRemove) apply(state *SectorInfo) {} +func (evt SectorRemove) applyGlobal(state *SectorInfo) bool { + state.State = Removing + return true +} type SectorRemoved struct{} diff --git a/extern/storage-sealing/fsm_test.go b/extern/storage-sealing/fsm_test.go index f41d8c535ee..c67decbebf8 100644 --- a/extern/storage-sealing/fsm_test.go +++ b/extern/storage-sealing/fsm_test.go @@ -16,7 +16,7 @@ func init() { } func (t *test) planSingle(evt interface{}) { - _, err := t.s.plan([]statemachine.Event{{User: evt}}, t.state) + _, _, err := t.s.plan([]statemachine.Event{{User: evt}}, t.state) require.NoError(t.t, err) } @@ -58,6 +58,9 @@ func TestHappyPath(t *testing.T) { require.Equal(m.t, m.state.State, Committing) m.planSingle(SectorCommitted{}) + require.Equal(m.t, m.state.State, SubmitCommit) + + m.planSingle(SectorCommitSubmitted{}) require.Equal(m.t, m.state.State, CommitWait) m.planSingle(SectorProving{}) @@ -98,13 +101,16 @@ func TestSeedRevert(t *testing.T) { m.planSingle(SectorSeedReady{}) require.Equal(m.t, m.state.State, Committing) - _, err := m.s.plan([]statemachine.Event{{User: SectorSeedReady{SeedValue: nil, SeedEpoch: 5}}, {User: SectorCommitted{}}}, m.state) + _, _, err := m.s.plan([]statemachine.Event{{User: SectorSeedReady{SeedValue: nil, SeedEpoch: 5}}, {User: SectorCommitted{}}}, m.state) require.NoError(t, err) require.Equal(m.t, m.state.State, Committing) // not changing the seed this time - _, err = m.s.plan([]statemachine.Event{{User: SectorSeedReady{SeedValue: nil, SeedEpoch: 5}}, {User: SectorCommitted{}}}, m.state) + _, _, err = m.s.plan([]statemachine.Event{{User: SectorSeedReady{SeedValue: nil, SeedEpoch: 5}}, {User: SectorCommitted{}}}, m.state) require.NoError(t, err) + require.Equal(m.t, m.state.State, SubmitCommit) + + m.planSingle(SectorCommitSubmitted{}) require.Equal(m.t, m.state.State, CommitWait) m.planSingle(SectorProving{}) @@ -129,7 +135,8 @@ func TestPlanCommittingHandlesSectorCommitFailed(t *testing.T) { events := []statemachine.Event{{User: SectorCommitFailed{}}} - require.NoError(t, planCommitting(events, m.state)) + _, err := planCommitting(events, m.state) + require.NoError(t, err) require.Equal(t, CommitFailed, m.state.State) } diff --git a/extern/storage-sealing/sealing.go b/extern/storage-sealing/sealing.go index 7e4294e2bc3..e48679cc7ce 100644 --- a/extern/storage-sealing/sealing.go +++ b/extern/storage-sealing/sealing.go @@ -141,7 +141,7 @@ func (m *Sealing) Stop(ctx context.Context) error { return m.sectors.Stop(ctx) } func (m *Sealing) AddPieceToAnySector(ctx context.Context, size abi.UnpaddedPieceSize, r io.Reader, d DealInfo) (abi.SectorNumber, abi.PaddedPieceSize, error) { - log.Infof("Adding piece for deal %d", d.DealID) + log.Infof("Adding piece for deal %d (publish msg: %s)", d.DealID, d.PublishCid) if (padreader.PaddedSize(uint64(size))) != size { return 0, 0, xerrors.Errorf("cannot allocate unpadded piece") } diff --git a/extern/storage-sealing/sector_state.go b/extern/storage-sealing/sector_state.go index 2f57d83e891..4e674603d13 100644 --- a/extern/storage-sealing/sector_state.go +++ b/extern/storage-sealing/sector_state.go @@ -10,12 +10,13 @@ const ( WaitDeals SectorState = "WaitDeals" // waiting for more pieces (deals) to be added to the sector Packing SectorState = "Packing" // sector not in sealStore, and not on chain PreCommit1 SectorState = "PreCommit1" // do PreCommit1 - PreCommit2 SectorState = "PreCommit2" // do PreCommit1 + PreCommit2 SectorState = "PreCommit2" // do PreCommit2 PreCommitting SectorState = "PreCommitting" // on chain pre-commit PreCommitWait SectorState = "PreCommitWait" // waiting for precommit to land on chain WaitSeed SectorState = "WaitSeed" // waiting for seed - Committing SectorState = "Committing" - CommitWait SectorState = "CommitWait" // waiting for message to land on chain + Committing SectorState = "Committing" // compute PoRep + SubmitCommit SectorState = "SubmitCommit" // send commit message to the chain + CommitWait SectorState = "CommitWait" // wait for the commit message to land on chain FinalizeSector SectorState = "FinalizeSector" Proving SectorState = "Proving" // error modes @@ -25,8 +26,10 @@ const ( PreCommitFailed SectorState = "PreCommitFailed" ComputeProofFailed SectorState = "ComputeProofFailed" CommitFailed SectorState = "CommitFailed" - PackingFailed SectorState = "PackingFailed" + PackingFailed SectorState = "PackingFailed" // TODO: deprecated, remove FinalizeFailed SectorState = "FinalizeFailed" + DealsExpired SectorState = "DealsExpired" + RecoverDealIDs SectorState = "RecoverDealIDs" Faulty SectorState = "Faulty" // sector is corrupted or gone for some reason FaultReported SectorState = "FaultReported" // sector has been declared as a fault on chain diff --git a/extern/storage-sealing/states_failed.go b/extern/storage-sealing/states_failed.go index cf829f44fa3..e313fd712f9 100644 --- a/extern/storage-sealing/states_failed.go +++ b/extern/storage-sealing/states_failed.go @@ -1,12 +1,18 @@ package sealing import ( + "bytes" "time" "golang.org/x/xerrors" "github.com/filecoin-project/go-statemachine" + "github.com/filecoin-project/specs-actors/actors/abi" + "github.com/filecoin-project/specs-actors/actors/builtin/market" "github.com/filecoin-project/specs-actors/actors/builtin/miner" + "github.com/filecoin-project/specs-actors/actors/runtime/exitcode" + + "github.com/filecoin-project/lotus/extern/sector-storage/zerocomm" ) const minRetryTime = 1 * time.Minute @@ -81,6 +87,11 @@ func (m *Sealing) handlePreCommitFailed(ctx statemachine.Context, sector SectorI return ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("ticket expired error: %w", err)}) case *ErrBadTicket: return ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("bad expired: %w", err)}) + case *ErrInvalidDeals: + log.Warnf("invalid deals in sector %d: %v", sector.SectorNumber, err) + return ctx.Send(SectorInvalidDealIDs{Return: RetPreCommitFailed}) + case *ErrExpiredDeals: + return ctx.Send(SectorDealsExpired{xerrors.Errorf("sector deals expired: %w", err)}) case *ErrNoPrecommit: return ctx.Send(SectorRetryPreCommit{}) case *ErrPrecommitOnChain: @@ -88,6 +99,7 @@ func (m *Sealing) handlePreCommitFailed(ctx statemachine.Context, sector SectorI case *ErrSectorNumberAllocated: log.Errorf("handlePreCommitFailed: sector number already allocated, not proceeding: %+v", err) // TODO: check if the sector is committed (not sure how we'd end up here) + // TODO: check on-chain state, adjust local sector number counter to not give out allocated numbers return nil default: return xerrors.Errorf("checkPrecommit sanity check error: %w", err) @@ -157,7 +169,12 @@ func (m *Sealing) handleCommitFailed(ctx statemachine.Context, sector SectorInfo case *ErrExpiredTicket: return ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("ticket expired error: %w", err)}) case *ErrBadTicket: - return ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("bad expired: %w", err)}) + return ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("bad ticket: %w", err)}) + case *ErrInvalidDeals: + log.Warnf("invalid deals in sector %d: %v", sector.SectorNumber, err) + return ctx.Send(SectorInvalidDealIDs{Return: RetCommitFailed}) + case *ErrExpiredDeals: + return ctx.Send(SectorDealsExpired{xerrors.Errorf("sector deals expired: %w", err)}) case nil: return ctx.Send(SectorChainPreCommitFailed{xerrors.Errorf("no precommit: %w", err)}) case *ErrPrecommitOnChain: @@ -192,6 +209,11 @@ func (m *Sealing) handleCommitFailed(ctx statemachine.Context, sector SectorInfo return ctx.Send(SectorRetryPreCommitWait{}) case *ErrNoPrecommit: return ctx.Send(SectorRetryPreCommit{}) + case *ErrInvalidDeals: + log.Warnf("invalid deals in sector %d: %v", sector.SectorNumber, err) + return ctx.Send(SectorInvalidDealIDs{Return: RetCommitFailed}) + case *ErrExpiredDeals: + return ctx.Send(SectorDealsExpired{xerrors.Errorf("sector deals expired: %w", err)}) case *ErrCommitWaitFailed: if err := failedCooldown(ctx, sector); err != nil { return err @@ -221,3 +243,120 @@ func (m *Sealing) handleFinalizeFailed(ctx statemachine.Context, sector SectorIn return ctx.Send(SectorRetryFinalize{}) } + +func (m *Sealing) handleRemoveFailed(ctx statemachine.Context, sector SectorInfo) error { + if err := failedCooldown(ctx, sector); err != nil { + return err + } + + return ctx.Send(SectorRemove{}) +} + +func (m *Sealing) handleDealsExpired(ctx statemachine.Context, sector SectorInfo) error { + // First make vary sure the sector isn't committed + si, err := m.api.StateSectorGetInfo(ctx.Context(), m.maddr, sector.SectorNumber, nil) + if err != nil { + return xerrors.Errorf("getting sector info: %w", err) + } + if si != nil { + // TODO: this should never happen, but in case it does, try to go back to + // the proving state after running some checks + return xerrors.Errorf("sector is committed on-chain, but we're in DealsExpired") + } + + if sector.PreCommitInfo == nil { + // TODO: Create a separate state which will remove those pieces, and go back to PC1 + log.Errorf("non-precommitted sector with expired deals, can't recover from this yet") + } + + // Not much to do here, we can't go back in time to commit this sector + return ctx.Send(SectorRemove{}) +} + +func (m *Sealing) handleRecoverDealIDs(ctx statemachine.Context, sector SectorInfo) error { + tok, height, err := m.api.ChainHead(ctx.Context()) + if err != nil { + return xerrors.Errorf("getting chain head: %w", err) + } + + var toFix []int + + for i, p := range sector.Pieces { + // if no deal is associated with the piece, ensure that we added it as + // filler (i.e. ensure that it has a zero PieceCID) + if p.DealInfo == nil { + exp := zerocomm.ZeroPieceCommitment(p.Piece.Size.Unpadded()) + if !p.Piece.PieceCID.Equals(exp) { + return xerrors.Errorf("sector %d piece %d had non-zero PieceCID %+v", sector.SectorNumber, i, p.Piece.PieceCID) + } + continue + } + + proposal, err := m.api.StateMarketStorageDeal(ctx.Context(), p.DealInfo.DealID, tok) + if err != nil { + log.Warnf("getting deal %d for piece %d: %+v", p.DealInfo.DealID, i, err) + toFix = append(toFix, i) + continue + } + + if proposal.Provider != m.maddr { + log.Warnf("piece %d (of %d) of sector %d refers deal %d with wrong provider: %s != %s", i, len(sector.Pieces), sector.SectorNumber, p.DealInfo.DealID, proposal.Provider, m.maddr) + toFix = append(toFix, i) + continue + } + + if proposal.PieceCID != p.Piece.PieceCID { + log.Warnf("piece %d (of %d) of sector %d refers deal %d with wrong PieceCID: %x != %x", i, len(sector.Pieces), sector.SectorNumber, p.DealInfo.DealID, p.Piece.PieceCID, proposal.PieceCID) + toFix = append(toFix, i) + continue + } + + if p.Piece.Size != proposal.PieceSize { + log.Warnf("piece %d (of %d) of sector %d refers deal %d with different size: %d != %d", i, len(sector.Pieces), sector.SectorNumber, p.DealInfo.DealID, p.Piece.Size, proposal.PieceSize) + toFix = append(toFix, i) + continue + } + + if height >= proposal.StartEpoch { + // TODO: check if we are in an early enough state (before precommit), try to remove the offending pieces + // (tricky as we have to 'defragment' the sector while doing that, and update piece references for retrieval) + return xerrors.Errorf("can't fix sector deals: piece %d (of %d) of sector %d refers expired deal %d - should start at %d, head %d", i, len(sector.Pieces), sector.SectorNumber, p.DealInfo.DealID, proposal.StartEpoch, height) + } + } + + updates := map[int]abi.DealID{} + for _, i := range toFix { + p := sector.Pieces[i] + + if p.DealInfo.PublishCid == nil { + // TODO: check if we are in an early enough state try to remove this piece + log.Error("can't fix sector deals: piece %d (of %d) of sector %d has nil DealInfo.PublishCid (refers to deal %d)", i, len(sector.Pieces), sector.SectorNumber, p.DealInfo.DealID) + // Not much to do here (and this can only happen for old spacerace sectors) + return ctx.Send(SectorRemove{}) + } + + ml, err := m.api.StateSearchMsg(ctx.Context(), *p.DealInfo.PublishCid) + if err != nil { + return xerrors.Errorf("looking for publish deal message %s (sector %d, piece %d): %w", *p.DealInfo.PublishCid, sector.SectorNumber, i, err) + } + + if ml.Receipt.ExitCode != exitcode.Ok { + return xerrors.Errorf("looking for publish deal message %s (sector %d, piece %d): non-ok exit code: %s", *p.DealInfo.PublishCid, sector.SectorNumber, i, ml.Receipt.ExitCode) + } + + var retval market.PublishStorageDealsReturn + if err := retval.UnmarshalCBOR(bytes.NewReader(ml.Receipt.Return)); err != nil { + return xerrors.Errorf("looking for publish deal message: unmarshaling message return: %w", err) + } + + if len(retval.IDs) != 1 { + // market currently only ever sends messages with 1 deal + return xerrors.Errorf("can't recover dealIDs from publish deal message with more than 1 deal") + } + + updates[i] = retval.IDs[0] + } + + // Not much to do here, we can't go back in time to commit this sector + return ctx.Send(SectorUpdateDealIDs{Updates: updates}) +} diff --git a/extern/storage-sealing/states_sealing.go b/extern/storage-sealing/states_sealing.go index a434d1fcb30..7693f26ad7f 100644 --- a/extern/storage-sealing/states_sealing.go +++ b/extern/storage-sealing/states_sealing.go @@ -12,6 +12,7 @@ import ( "github.com/filecoin-project/specs-actors/actors/builtin" "github.com/filecoin-project/specs-actors/actors/builtin/miner" "github.com/filecoin-project/specs-actors/actors/crypto" + "github.com/filecoin-project/specs-actors/actors/runtime/exitcode" "github.com/filecoin-project/specs-storage/storage" ) @@ -79,15 +80,16 @@ func (m *Sealing) getTicket(ctx statemachine.Context, sector SectorInfo) (abi.Se } func (m *Sealing) handlePreCommit1(ctx statemachine.Context, sector SectorInfo) error { - if err := checkPieces(ctx.Context(), sector, m.api); err != nil { // Sanity check state + if err := checkPieces(ctx.Context(), m.maddr, sector, m.api); err != nil { // Sanity check state switch err.(type) { case *ErrApi: log.Errorf("handlePreCommit1: api error, not proceeding: %+v", err) return nil case *ErrInvalidDeals: - return ctx.Send(SectorPackingFailed{xerrors.Errorf("invalid dealIDs in sector: %w", err)}) + log.Warnf("invalid deals in sector %d: %v", sector.SectorNumber, err) + return ctx.Send(SectorInvalidDealIDs{Return: RetPreCommit1}) case *ErrExpiredDeals: // Probably not much we can do here, maybe re-pack the sector? - return ctx.Send(SectorPackingFailed{xerrors.Errorf("expired dealIDs in sector: %w", err)}) + return ctx.Send(SectorDealsExpired{xerrors.Errorf("expired dealIDs in sector: %w", err)}) default: return xerrors.Errorf("checkPieces sanity check error: %w", err) } @@ -155,6 +157,11 @@ func (m *Sealing) handlePreCommitting(ctx statemachine.Context, sector SectorInf return ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("ticket expired: %w", err)}) case *ErrBadTicket: return ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("bad ticket: %w", err)}) + case *ErrInvalidDeals: + log.Warnf("invalid deals in sector %d: %v", sector.SectorNumber, err) + return ctx.Send(SectorInvalidDealIDs{Return: RetPreCommitting}) + case *ErrExpiredDeals: + return ctx.Send(SectorDealsExpired{xerrors.Errorf("sector deals expired: %w", err)}) case *ErrPrecommitOnChain: return ctx.Send(SectorPreCommitLanded{TipSet: tok}) // we re-did precommit case *ErrSectorNumberAllocated: @@ -226,11 +233,18 @@ func (m *Sealing) handlePreCommitWait(ctx statemachine.Context, sector SectorInf return ctx.Send(SectorChainPreCommitFailed{err}) } - if mw.Receipt.ExitCode != 0 { + switch mw.Receipt.ExitCode { + case exitcode.Ok: + // this is what we expect + case exitcode.SysErrOutOfGas: + // gas estimator guessed a wrong number + return ctx.Send(SectorRetryPreCommit{}) + default: log.Error("sector precommit failed: ", mw.Receipt.ExitCode) err := xerrors.Errorf("sector precommit failed: %d", mw.Receipt.ExitCode) return ctx.Send(SectorChainPreCommitFailed{err}) } + log.Info("precommit message landed on chain: ", sector.SectorNumber) return ctx.Send(SectorPreCommitLanded{TipSet: mw.TipSetTok}) @@ -326,21 +340,25 @@ func (m *Sealing) handleCommitting(ctx statemachine.Context, sector SectorInfo) return ctx.Send(SectorComputeProofFailed{xerrors.Errorf("computing seal proof failed(2): %w", err)}) } + return ctx.Send(SectorCommitted{ + Proof: proof, + }) +} + +func (m *Sealing) handleSubmitCommit(ctx statemachine.Context, sector SectorInfo) error { tok, _, err := m.api.ChainHead(ctx.Context()) if err != nil { log.Errorf("handleCommitting: api error, not proceeding: %+v", err) return nil } - if err := m.checkCommit(ctx.Context(), sector, proof, tok); err != nil { + if err := m.checkCommit(ctx.Context(), sector, sector.Proof, tok); err != nil { return ctx.Send(SectorCommitFailed{xerrors.Errorf("commit check error: %w", err)}) } - // TODO: Consider splitting states and persist proof for faster recovery - params := &miner.ProveCommitSectorParams{ SectorNumber: sector.SectorNumber, - Proof: proof, + Proof: sector.Proof, } enc := new(bytes.Buffer) @@ -372,14 +390,13 @@ func (m *Sealing) handleCommitting(ctx statemachine.Context, sector SectorInfo) collateral = big.Zero() } - // TODO: check seed / ticket are up to date + // TODO: check seed / ticket / deals are up to date mcid, err := m.api.SendMsg(ctx.Context(), waddr, m.maddr, builtin.MethodsMiner.ProveCommitSector, collateral, m.feeCfg.MaxCommitGasFee, enc.Bytes()) if err != nil { return ctx.Send(SectorCommitFailed{xerrors.Errorf("pushing message to mpool: %w", err)}) } - return ctx.Send(SectorCommitted{ - Proof: proof, + return ctx.Send(SectorCommitSubmitted{ Message: mcid, }) } @@ -395,13 +412,22 @@ func (m *Sealing) handleCommitWait(ctx statemachine.Context, sector SectorInfo) return ctx.Send(SectorCommitFailed{xerrors.Errorf("failed to wait for porep inclusion: %w", err)}) } - if mw.Receipt.ExitCode != 0 { + switch mw.Receipt.ExitCode { + case exitcode.Ok: + // this is what we expect + case exitcode.SysErrOutOfGas: + // gas estimator guessed a wrong number + return ctx.Send(SectorRetrySubmitCommit{}) + default: return ctx.Send(SectorCommitFailed{xerrors.Errorf("submitting sector proof failed (exit=%d, msg=%s) (t:%x; s:%x(%d); p:%x)", mw.Receipt.ExitCode, sector.CommitMessage, sector.TicketValue, sector.SeedValue, sector.SeedEpoch, sector.Proof)}) } - _, err = m.api.StateSectorGetInfo(ctx.Context(), m.maddr, sector.SectorNumber, mw.TipSetTok) + si, err := m.api.StateSectorGetInfo(ctx.Context(), m.maddr, sector.SectorNumber, mw.TipSetTok) if err != nil { - return ctx.Send(SectorCommitFailed{xerrors.Errorf("proof validation failed, sector not found in sector set after cron: %w", err)}) + return ctx.Send(SectorCommitFailed{xerrors.Errorf("proof validation failed, calling StateSectorGetInfo: %w", err)}) + } + if si == nil { + return ctx.Send(SectorCommitFailed{xerrors.Errorf("proof validation failed, sector not found in sector set after cron")}) } return ctx.Send(SectorProving{}) diff --git a/extern/storage-sealing/types.go b/extern/storage-sealing/types.go index 9d82ea2c2f6..99cce77149e 100644 --- a/extern/storage-sealing/types.go +++ b/extern/storage-sealing/types.go @@ -30,6 +30,7 @@ type Piece struct { // DealInfo is a tuple of deal identity and its schedule type DealInfo struct { + PublishCid *cid.Cid DealID abi.DealID DealSchedule DealSchedule KeepUnsealed bool @@ -53,6 +54,15 @@ type Log struct { Kind string } +type ReturnState string + +const ( + RetPreCommit1 = ReturnState(PreCommit1) + RetPreCommitting = ReturnState(PreCommitting) + RetPreCommitFailed = ReturnState(PreCommitFailed) + RetCommitFailed = ReturnState(CommitFailed) +) + type SectorInfo struct { State SectorState SectorNumber abi.SectorNumber @@ -90,6 +100,9 @@ type SectorInfo struct { // Faults FaultReportMsg *cid.Cid + // Recovery + Return ReturnState + // Debug LastErr string diff --git a/extern/storage-sealing/upgrade_queue.go b/extern/storage-sealing/upgrade_queue.go index 12a94f042e5..870f60dbb4b 100644 --- a/extern/storage-sealing/upgrade_queue.go +++ b/extern/storage-sealing/upgrade_queue.go @@ -72,6 +72,10 @@ func (m *Sealing) tryUpgradeSector(ctx context.Context, params *miner.SectorPreC log.Errorf("error calling StateSectorGetInfo for replaced sector: %+v", err) return big.Zero() } + if ri == nil { + log.Errorf("couldn't find sector info for sector to replace: %+v", replace) + return big.Zero() + } if params.Expiration < ri.Expiration { // TODO: Some limit on this diff --git a/go.mod b/go.mod index e42adc64fdc..69ebe73ed95 100644 --- a/go.mod +++ b/go.mod @@ -30,7 +30,7 @@ require ( github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03 github.com/filecoin-project/go-data-transfer v0.6.2 github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f - github.com/filecoin-project/go-fil-markets v0.5.7 + github.com/filecoin-project/go-fil-markets v0.5.8-0.20200827230805-30e8f6d42465 github.com/filecoin-project/go-jsonrpc v0.1.2-0.20200822201400-474f4fdccc52 github.com/filecoin-project/go-multistore v0.0.3 github.com/filecoin-project/go-padreader v0.0.0-20200210211231-548257017ca6 diff --git a/go.sum b/go.sum index 8f7f4ca9df8..1e6c49c2708 100644 --- a/go.sum +++ b/go.sum @@ -247,8 +247,8 @@ github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f h1 github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f/go.mod h1:Eaox7Hvus1JgPrL5+M3+h7aSPHc0cVqpSxA+TxIEpZQ= github.com/filecoin-project/go-fil-markets v0.5.6-0.20200814234959-80b1788108ac/go.mod h1:umicPCaN99ysHTiYOmwhuLxTFbOwcsI+mdw/t96vvM4= github.com/filecoin-project/go-fil-markets v0.5.6/go.mod h1:SJApXAKr5jyGpbzDEOhvemui0pih7hhT8r2MXJxCP1E= -github.com/filecoin-project/go-fil-markets v0.5.7 h1:kzyMHqez8ssxchj5s9M1hkC3CTwRGh2MeglJGfUksQU= -github.com/filecoin-project/go-fil-markets v0.5.7/go.mod h1:KnvFG3kSQ77vKYSY/QdrXET81wVCBByHXjG7AyxnbUw= +github.com/filecoin-project/go-fil-markets v0.5.8-0.20200827230805-30e8f6d42465 h1:74yonPhkVakfqUHcgfJ+vQOfCJQNiUBKn8XN9Z6F0S0= +github.com/filecoin-project/go-fil-markets v0.5.8-0.20200827230805-30e8f6d42465/go.mod h1:KnvFG3kSQ77vKYSY/QdrXET81wVCBByHXjG7AyxnbUw= github.com/filecoin-project/go-jsonrpc v0.1.2-0.20200817153016-2ea5cbaf5ec0/go.mod h1:XBBpuKIMaXIIzeqzO1iucq4GvbF8CxmXRFoezRh+Cx4= github.com/filecoin-project/go-jsonrpc v0.1.2-0.20200822201400-474f4fdccc52 h1:FXtCp0ybqdQL9knb3OGDpkNTaBbPxgkqPeWKotUwkH0= github.com/filecoin-project/go-jsonrpc v0.1.2-0.20200822201400-474f4fdccc52/go.mod h1:XBBpuKIMaXIIzeqzO1iucq4GvbF8CxmXRFoezRh+Cx4= diff --git a/markets/storageadapter/provider.go b/markets/storageadapter/provider.go index eaa2a3ae2fb..1a66275292b 100644 --- a/markets/storageadapter/provider.go +++ b/markets/storageadapter/provider.go @@ -87,8 +87,13 @@ func (n *ProviderNodeAdapter) PublishDeals(ctx context.Context, deal storagemark } func (n *ProviderNodeAdapter) OnDealComplete(ctx context.Context, deal storagemarket.MinerDeal, pieceSize abi.UnpaddedPieceSize, pieceData io.Reader) (*storagemarket.PackingResult, error) { + if deal.PublishCid == nil { + return nil, xerrors.Errorf("deal.PublishCid can't be nil") + } + p, offset, err := n.secb.AddPiece(ctx, pieceSize, pieceData, sealing.DealInfo{ - DealID: deal.DealID, + DealID: deal.DealID, + PublishCid: deal.PublishCid, DealSchedule: sealing.DealSchedule{ StartEpoch: deal.ClientDealProposal.Proposal.StartEpoch, EndEpoch: deal.ClientDealProposal.Proposal.EndEpoch, @@ -351,7 +356,7 @@ func (n *ProviderNodeAdapter) GetChainHead(ctx context.Context) (shared.TipSetTo } func (n *ProviderNodeAdapter) WaitForMessage(ctx context.Context, mcid cid.Cid, cb func(code exitcode.ExitCode, bytes []byte, err error) error) error { - receipt, err := n.StateWaitMsg(ctx, mcid, build.MessageConfidence) + receipt, err := n.StateWaitMsg(ctx, mcid, 2*build.MessageConfidence) if err != nil { return cb(0, nil, err) } diff --git a/node/impl/storminer.go b/node/impl/storminer.go index 6c1b98db1e0..3507453dd71 100644 --- a/node/impl/storminer.go +++ b/node/impl/storminer.go @@ -153,8 +153,10 @@ func (sm *StorageMinerAPI) SectorsStatus(ctx context.Context, sid abi.SectorNumb Value: info.SeedValue, Epoch: info.SeedEpoch, }, - Retries: info.InvalidProofs, - ToUpgrade: sm.Miner.IsMarkedForUpgrade(sid), + PreCommitMsg: info.PreCommitMessage, + CommitMsg: info.CommitMessage, + Retries: info.InvalidProofs, + ToUpgrade: sm.Miner.IsMarkedForUpgrade(sid), LastErr: info.LastErr, Log: log, @@ -175,6 +177,9 @@ func (sm *StorageMinerAPI) SectorsStatus(ctx context.Context, sid abi.SectorNumb onChainInfo, err := sm.Full.StateSectorGetInfo(ctx, sm.Miner.Address(), sid, types.EmptyTSK) if err != nil { + return sInfo, err + } + if onChainInfo == nil { return sInfo, nil } sInfo.SealProof = onChainInfo.SealProof diff --git a/storage/sectorblocks/blocks.go b/storage/sectorblocks/blocks.go index 5bfafe26396..b88ebcbae90 100644 --- a/storage/sectorblocks/blocks.go +++ b/storage/sectorblocks/blocks.go @@ -102,6 +102,7 @@ func (st *SectorBlocks) AddPiece(ctx context.Context, size abi.UnpaddedPieceSize return 0, 0, err } + // TODO: DealID has very low finality here err = st.writeRef(d.DealID, sn, offset, size) if err != nil { return 0, 0, xerrors.Errorf("writeRef: %w", err)