Skip to content

Commit

Permalink
chore: curio: merge master (#11956)
Browse files Browse the repository at this point in the history
* Fixing dead links (#11907)

* ci: ci: create gh workflow that updates sorted pr checks (#11861)

* ci: create gh workflow that updates sorted pr checks

* ci: use grouped_by_result template for pr checks sticky comment

* chore: apply pr review suggestion

* Avoid cfg lookup on chain remove since unenabled splitstore delete is noop anyway (#11916)

Co-authored-by: zenground0 <[email protected]>

* Fix mismatched method names in comments (#11913)

Signed-off-by: forcedebug <[email protected]>

* release: v1.26.3 (#11908) (#11915) (#11922)

* deps: update dependencies to address migration memory bloat

to address memory concerns during a heavy migration

Ref: filecoin-project/go-state-types#260
Ref: whyrusleeping/cbor-gen#96
Ref: filecoin-project/go-amt-ipld#90

* release: prep v1.26.3 patch

Prep v1.26.3 patch release:
- Update changelog, version and make gen + make docsgen-cli

* deps: update cbor-gen to tagged version

deps: update cbor-gen to tagged version

* deps: update go-state-types to tagged version

deps: update go-state-types to tagged version v0.13.2

* chore: deps: update go-state-types to v0.13.3

Fixes a panic when we have fewer than 1k proposals.

---------

Co-authored-by: Phi-rjan <[email protected]>
Co-authored-by: Rod Vagg <[email protected]>
Co-authored-by: Steven Allen <[email protected]>

* Refactor `LookupID*` APIs in `StateManager` and `StateTree`

The naming of `LookupID` can cause confusion when resolving actor IDs vs
 ID addresses. To avoid this:

* Refactor `StateTree` `LookupID` to `LookupIDAddress`, because it
returns ID address.
* Refactor `StateManager` `LookupID` to
`LookupIDAddress` because it also returns ID address via a chain call to
`StateTree`.
* Introduce a new API `StateManager` dedicated to resolving address to
actor ID, called `LookupID` which returns `abi.ActorID`.

For context, see:
 * filecoin-project/lotus#11723 (comment)

* Add v13 support to invariants-checker (#11931)

Add v13 support to invariants-checker

* chore: docs: nv-skeleton documentation (#11065)

* nv-skeleton documentation

Add a tutorial for how one can create a nv-skeleton in Lotus

* Add footnote for `Add migration` step

Add footnote for `Add migration` step

* Indent migration-code

Indent migration-code to make it show properly as a footnote.

* Add ref-fvm and filecoin-ffi checklist

Add ref-fvm and filecoin-ffi checklist

* Add Filecoin-FFI steps

Add Filecoin-FFI steps

* Add step to params_butterfly.go

Add step to params_butterfly.go

* Fix typo

Fix typo

* Add links to reference PRs

Add links to reference PRs

* Update ref-fvm list

Update ref-fvm list

* feat: curio: add StorageInit api (#11918)

* feat: add StorageInit api

* remove unused variables

* fix gen check

* feat: curio: simpler reservation release logic (#11900)

* simpler release logic

* oops, plus simpler

* simpler

* fix NewLine (#11893)

* fix(events): check for sync-in-progress (#11932)

* feat(events): adjust indexes in event index db to match query patterns

Introduces a v4 migration that just adjusts indexes.

Copies some improvements from filecoin-project/lotus#11723

Closes: filecoin-project/lotus#11909

* fix(pipeline): should return if error occurred when get network version (#11902)

* fix(events): correct log msg for v4 events index db migration

* chore: remove duplicate words in strings and comments

* fix(events): register events index db migration v4

* fix: curio seal: Failed commit retry strategy (#11870)

* ffi: improved-error-handling

* curio seal: Failed commit retry strategy

* use master ffi

* mod tidy

* fix: curio: Update pgx imports, fix db_storage alloc

* feat: curioweb: Improve task_history indexes (#11911)

* mod tidy

* Event index should be unique for tipsets (#11952)

* event index should be unique for tipsets

* fix formatting

* migrate to version 5

* chore: bump build version in master (#11946)

* Bump version

Bump version in master branch in preperation for cutting v1.27.0-rc1

* chore: bump build-version

chore: bump build-version

* feat: curioweb: Show piece info on the sector page (#11955)

* curio: feat: break trees task into TreeD(prefetch) and TreeRC (#11895)

* break trees task

* fix TreeD reservation

* fix nil pointer err

* apply suggestions

* fix allocate file types

* fix dbIndex inserts

* set resource, move release func

* refactor func(), update memory

* remove extra release

---------

Signed-off-by: forcedebug <[email protected]>
Co-authored-by: parthshah1 <[email protected]>
Co-authored-by: Piotr Galar <[email protected]>
Co-authored-by: ZenGround0 <[email protected]>
Co-authored-by: zenground0 <[email protected]>
Co-authored-by: forcedebug <[email protected]>
Co-authored-by: Jiaying Wang <[email protected]>
Co-authored-by: Phi-rjan <[email protected]>
Co-authored-by: Rod Vagg <[email protected]>
Co-authored-by: Steven Allen <[email protected]>
Co-authored-by: Masih H. Derkani <[email protected]>
Co-authored-by: Lee <[email protected]>
Co-authored-by: Andrew Jackson (Ajax) <[email protected]>
Co-authored-by: beck <[email protected]>
Co-authored-by: 0x5459 <[email protected]>
Co-authored-by: Łukasz Magiera <[email protected]>
Co-authored-by: Łukasz Magiera <[email protected]>
Co-authored-by: Aarsh Shah <[email protected]>
  • Loading branch information
18 people authored May 2, 2024
1 parent dcf3577 commit 3f0fb74
Show file tree
Hide file tree
Showing 19 changed files with 650 additions and 180 deletions.
38 changes: 38 additions & 0 deletions cmd/curio/rpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@ import (
"net/http"
"net/url"
"os"
"path/filepath"
"time"

"github.com/gbrlsnchs/jwt/v3"
"github.com/google/uuid"
"github.com/gorilla/mux"
logging "github.com/ipfs/go-log/v2"
"github.com/mitchellh/go-homedir"
Expand Down Expand Up @@ -39,6 +41,8 @@ import (
"github.com/filecoin-project/lotus/storage/sealer/storiface"
)

const metaFile = "sectorstore.json"

var log = logging.Logger("curio/rpc")
var permissioned = os.Getenv("LOTUS_DISABLE_AUTH_PERMISSIONED") != "1"

Expand Down Expand Up @@ -162,6 +166,40 @@ func (p *CurioAPI) Shutdown(context.Context) error {
return nil
}

func (p *CurioAPI) StorageInit(ctx context.Context, path string, opts storiface.LocalStorageMeta) error {
path, err := homedir.Expand(path)
if err != nil {
return xerrors.Errorf("expanding local path: %w", err)
}

if err := os.MkdirAll(path, 0755); err != nil {
if !os.IsExist(err) {
return err
}
}
_, err = os.Stat(filepath.Join(path, metaFile))
if !os.IsNotExist(err) {
if err == nil {
return xerrors.Errorf("path is already initialized")
}
return err
}
if opts.ID == "" {
opts.ID = storiface.ID(uuid.New().String())
}
if !(opts.CanStore || opts.CanSeal) {
return xerrors.Errorf("must specify at least one of --store or --seal")
}
b, err := json.MarshalIndent(opts, "", " ")
if err != nil {
return xerrors.Errorf("marshaling storage config: %w", err)
}
if err := os.WriteFile(filepath.Join(path, metaFile), b, 0644); err != nil {
return xerrors.Errorf("persisting storage metadata (%s): %w", filepath.Join(path, metaFile), err)
}
return nil
}

func (p *CurioAPI) StorageAddLocal(ctx context.Context, path string) error {
path, err := homedir.Expand(path)
if err != nil {
Expand Down
30 changes: 3 additions & 27 deletions cmd/curio/storage.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
package main

import (
"encoding/json"
"fmt"
"math/bits"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
Expand All @@ -28,8 +25,6 @@ import (
"github.com/filecoin-project/lotus/storage/sealer/storiface"
)

const metaFile = "sectorstore.json"

var storageCmd = &cli.Command{
Name: "storage",
Usage: "manage sector storage",
Expand Down Expand Up @@ -122,20 +117,6 @@ over time
}

if cctx.Bool("init") {
if err := os.MkdirAll(p, 0755); err != nil {
if !os.IsExist(err) {
return err
}
}

_, err := os.Stat(filepath.Join(p, metaFile))
if !os.IsNotExist(err) {
if err == nil {
return xerrors.Errorf("path is already initialized")
}
return err
}

var maxStor int64
if cctx.IsSet("max-storage") {
maxStor, err = units.RAMInBytes(cctx.String("max-storage"))
Expand All @@ -144,7 +125,7 @@ over time
}
}

cfg := &storiface.LocalStorageMeta{
cfg := storiface.LocalStorageMeta{
ID: storiface.ID(uuid.New().String()),
Weight: cctx.Uint64("weight"),
CanSeal: cctx.Bool("seal"),
Expand All @@ -158,13 +139,8 @@ over time
return xerrors.Errorf("must specify at least one of --store or --seal")
}

b, err := json.MarshalIndent(cfg, "", " ")
if err != nil {
return xerrors.Errorf("marshaling storage config: %w", err)
}

if err := os.WriteFile(filepath.Join(p, metaFile), b, 0644); err != nil {
return xerrors.Errorf("persisting storage metadata (%s): %w", filepath.Join(p, metaFile), err)
if err := minerApi.StorageInit(ctx, p, cfg); err != nil {
return xerrors.Errorf("init storage: %w", err)
}
}

Expand Down
5 changes: 3 additions & 2 deletions cmd/curio/tasks/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,10 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps) (*harmonytask.Task
activeTasks = append(activeTasks, sdrTask)
}
if cfg.Subsystems.EnableSealSDRTrees {
treesTask := seal.NewTreesTask(sp, db, slr, cfg.Subsystems.SealSDRTreesMaxTasks)
treeDTask := seal.NewTreeDTask(sp, db, slr, cfg.Subsystems.SealSDRTreesMaxTasks)
treeRCTask := seal.NewTreeRCTask(sp, db, slr, cfg.Subsystems.SealSDRTreesMaxTasks)
finalizeTask := seal.NewFinalizeTask(cfg.Subsystems.FinalizeMaxTasks, sp, slr, db)
activeTasks = append(activeTasks, treesTask, finalizeTask)
activeTasks = append(activeTasks, treeDTask, treeRCTask, finalizeTask)
}
if cfg.Subsystems.EnableSendPrecommitMsg {
precommitTask := seal.NewSubmitPrecommitTask(sp, db, full, sender, as, cfg.Fees.MaxPreCommitGasFee)
Expand Down
146 changes: 89 additions & 57 deletions curiosrc/ffi/sdr_funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ type storageProvider struct {
}

func (l *storageProvider) AcquireSector(ctx context.Context, taskID *harmonytask.TaskID, sector storiface.SectorRef, existing, allocate storiface.SectorFileType, sealing storiface.PathType) (fspaths, ids storiface.SectorPaths, release func(), err error) {
var paths, storageIDs storiface.SectorPaths
var sectorPaths, storageIDs storiface.SectorPaths
var releaseStorage func()

var ok bool
Expand All @@ -77,7 +77,7 @@ func (l *storageProvider) AcquireSector(ctx context.Context, taskID *harmonytask

log.Debugw("using existing storage reservation", "task", taskID, "sector", sector, "existing", existing, "allocate", allocate)

paths = resv.Paths
sectorPaths = resv.Paths
storageIDs = resv.PathIDs
releaseStorage = resv.Release

Expand All @@ -87,7 +87,7 @@ func (l *storageProvider) AcquireSector(ctx context.Context, taskID *harmonytask
// present locally. Note that we do not care about 'allocate' reqeuests, those files don't exist, and are just
// proposed paths with a reservation of space.

_, checkPathIDs, err := l.storage.AcquireSector(ctx, sector, existing, storiface.FTNone, sealing, storiface.AcquireMove, storiface.AcquireInto(storiface.PathsWithIDs{Paths: paths, IDs: storageIDs}))
_, checkPathIDs, err := l.storage.AcquireSector(ctx, sector, existing, storiface.FTNone, sealing, storiface.AcquireMove, storiface.AcquireInto(storiface.PathsWithIDs{Paths: sectorPaths, IDs: storageIDs}))
if err != nil {
return storiface.SectorPaths{}, storiface.SectorPaths{}, nil, xerrors.Errorf("acquire reserved existing files: %w", err)
}
Expand All @@ -101,20 +101,20 @@ func (l *storageProvider) AcquireSector(ctx context.Context, taskID *harmonytask
// No related reservation, acquire storage as usual

var err error
paths, storageIDs, err = l.storage.AcquireSector(ctx, sector, existing, allocate, sealing, storiface.AcquireMove)
sectorPaths, storageIDs, err = l.storage.AcquireSector(ctx, sector, existing, allocate, sealing, storiface.AcquireMove)
if err != nil {
return storiface.SectorPaths{}, storiface.SectorPaths{}, nil, err
}

releaseStorage, err = l.localStore.Reserve(ctx, sector, allocate, storageIDs, storiface.FSOverheadSeal)
releaseStorage, err = l.localStore.Reserve(ctx, sector, allocate, storageIDs, storiface.FSOverheadSeal, paths.MinFreeStoragePercentage)
if err != nil {
return storiface.SectorPaths{}, storiface.SectorPaths{}, nil, xerrors.Errorf("reserving storage space: %w", err)
}
}

log.Debugf("acquired sector %d (e:%d; a:%d): %v", sector, existing, allocate, paths)
log.Debugf("acquired sector %d (e:%d; a:%d): %v", sector, existing, allocate, sectorPaths)

return paths, storageIDs, func() {
return sectorPaths, storageIDs, func() {
releaseStorage()

for _, fileType := range storiface.PathTypes {
Expand Down Expand Up @@ -194,80 +194,69 @@ func (sb *SealCalls) ensureOneCopy(ctx context.Context, sid abi.SectorID, pathID
return nil
}

func (sb *SealCalls) TreeDRC(ctx context.Context, task *harmonytask.TaskID, sector storiface.SectorRef, unsealed cid.Cid, size abi.PaddedPieceSize, data io.Reader, unpaddedData bool) (scid cid.Cid, ucid cid.Cid, err error) {
func (sb *SealCalls) TreeRC(ctx context.Context, task *harmonytask.TaskID, sector storiface.SectorRef, unsealed cid.Cid) (scid cid.Cid, ucid cid.Cid, err error) {
p1o, err := sb.makePhase1Out(unsealed, sector.ProofType)
if err != nil {
return cid.Undef, cid.Undef, xerrors.Errorf("make phase1 output: %w", err)
}

paths, pathIDs, releaseSector, err := sb.sectors.AcquireSector(ctx, task, sector, storiface.FTCache, storiface.FTSealed, storiface.PathSealing)
fspaths, pathIDs, releaseSector, err := sb.sectors.AcquireSector(ctx, task, sector, storiface.FTCache, storiface.FTSealed, storiface.PathSealing)
if err != nil {
return cid.Undef, cid.Undef, xerrors.Errorf("acquiring sector paths: %w", err)
}
defer releaseSector()

defer func() {
if err != nil {
clerr := removeDRCTrees(paths.Cache)
clerr := removeDRCTrees(fspaths.Cache, false)
if clerr != nil {
log.Errorw("removing tree files after TreeDRC error", "error", clerr, "exec-error", err, "sector", sector, "cache", paths.Cache)
log.Errorw("removing tree files after TreeDRC error", "error", clerr, "exec-error", err, "sector", sector, "cache", fspaths.Cache)
}
}
}()

treeDUnsealed, err := proof.BuildTreeD(data, unpaddedData, filepath.Join(paths.Cache, proofpaths.TreeDName), size)
// create sector-sized file at paths.Sealed; PC2 transforms it into a sealed sector in-place
ssize, err := sector.ProofType.SectorSize()
if err != nil {
return cid.Undef, cid.Undef, xerrors.Errorf("building tree-d: %w", err)
}

if treeDUnsealed != unsealed {
return cid.Undef, cid.Undef, xerrors.Errorf("tree-d cid mismatch with supplied unsealed cid")
return cid.Undef, cid.Undef, xerrors.Errorf("getting sector size: %w", err)
}

{
// create sector-sized file at paths.Sealed; PC2 transforms it into a sealed sector in-place
ssize, err := sector.ProofType.SectorSize()
if err != nil {
return cid.Undef, cid.Undef, xerrors.Errorf("getting sector size: %w", err)
}

{
// copy TreeD prefix to sealed sector, SealPreCommitPhase2 will mutate it in place into the sealed sector
// copy TreeD prefix to sealed sector, SealPreCommitPhase2 will mutate it in place into the sealed sector

// first try reflink + truncate, that should be way faster
err := reflink.Always(filepath.Join(paths.Cache, proofpaths.TreeDName), paths.Sealed)
if err == nil {
err = os.Truncate(paths.Sealed, int64(ssize))
if err != nil {
return cid.Undef, cid.Undef, xerrors.Errorf("truncating reflinked sealed file: %w", err)
}
} else {
log.Errorw("reflink treed -> sealed failed, falling back to slow copy, use single scratch btrfs or xfs filesystem", "error", err, "sector", sector, "cache", paths.Cache, "sealed", paths.Sealed)
// first try reflink + truncate, that should be way faster
err := reflink.Always(filepath.Join(fspaths.Cache, proofpaths.TreeDName), fspaths.Sealed)
if err == nil {
err = os.Truncate(fspaths.Sealed, int64(ssize))
if err != nil {
return cid.Undef, cid.Undef, xerrors.Errorf("truncating reflinked sealed file: %w", err)
}
} else {
log.Errorw("reflink treed -> sealed failed, falling back to slow copy, use single scratch btrfs or xfs filesystem", "error", err, "sector", sector, "cache", fspaths.Cache, "sealed", fspaths.Sealed)

// fallback to slow copy, copy ssize bytes from treed to sealed
dst, err := os.OpenFile(paths.Sealed, os.O_WRONLY|os.O_CREATE, 0644)
if err != nil {
return cid.Undef, cid.Undef, xerrors.Errorf("opening sealed sector file: %w", err)
}
src, err := os.Open(filepath.Join(paths.Cache, proofpaths.TreeDName))
if err != nil {
return cid.Undef, cid.Undef, xerrors.Errorf("opening treed sector file: %w", err)
}
// fallback to slow copy, copy ssize bytes from treed to sealed
dst, err := os.OpenFile(fspaths.Sealed, os.O_WRONLY|os.O_CREATE, 0644)
if err != nil {
return cid.Undef, cid.Undef, xerrors.Errorf("opening sealed sector file: %w", err)
}
src, err := os.Open(filepath.Join(fspaths.Cache, proofpaths.TreeDName))
if err != nil {
return cid.Undef, cid.Undef, xerrors.Errorf("opening treed sector file: %w", err)
}

_, err = io.CopyN(dst, src, int64(ssize))
derr := dst.Close()
_ = src.Close()
if err != nil {
return cid.Undef, cid.Undef, xerrors.Errorf("copying treed -> sealed: %w", err)
}
if derr != nil {
return cid.Undef, cid.Undef, xerrors.Errorf("closing sealed file: %w", derr)
}
_, err = io.CopyN(dst, src, int64(ssize))
derr := dst.Close()
_ = src.Close()
if err != nil {
return cid.Undef, cid.Undef, xerrors.Errorf("copying treed -> sealed: %w", err)
}
if derr != nil {
return cid.Undef, cid.Undef, xerrors.Errorf("closing sealed file: %w", derr)
}
}
}

sl, uns, err := ffi.SealPreCommitPhase2(p1o, paths.Cache, paths.Sealed)
sl, uns, err := ffi.SealPreCommitPhase2(p1o, fspaths.Cache, fspaths.Sealed)
if err != nil {
return cid.Undef, cid.Undef, xerrors.Errorf("computing seal proof: %w", err)
}
Expand All @@ -283,22 +272,28 @@ func (sb *SealCalls) TreeDRC(ctx context.Context, task *harmonytask.TaskID, sect
return sl, uns, nil
}

func removeDRCTrees(cache string) error {
// list files in cache
func removeDRCTrees(cache string, isDTree bool) error {
files, err := os.ReadDir(cache)
if err != nil {
return xerrors.Errorf("listing cache: %w", err)
}

var testFunc func(string) bool

if isDTree {
testFunc = proofpaths.IsTreeDFile
} else {
testFunc = proofpaths.IsTreeRCFile
}

for _, file := range files {
if proofpaths.IsTreeFile(file.Name()) {
if testFunc(file.Name()) {
err := os.Remove(filepath.Join(cache, file.Name()))
if err != nil {
return xerrors.Errorf("removing tree file: %w", err)
}
}
}

return nil
}

Expand Down Expand Up @@ -625,3 +620,40 @@ func (sb *SealCalls) sectorStorageType(ctx context.Context, sector storiface.Sec

return true, storiface.PathStorage, nil
}

// PreFetch fetches the sector file to local storage before SDR and TreeRC Tasks
func (sb *SealCalls) PreFetch(ctx context.Context, sector storiface.SectorRef, task *harmonytask.TaskID) (fsPath, pathID storiface.SectorPaths, releaseSector func(), err error) {
fsPath, pathID, releaseSector, err = sb.sectors.AcquireSector(ctx, task, sector, storiface.FTCache, storiface.FTNone, storiface.PathSealing)
if err != nil {
return storiface.SectorPaths{}, storiface.SectorPaths{}, nil, xerrors.Errorf("acquiring sector paths: %w", err)
}
// Don't release the storage locks. They will be released in TreeD func()
return
}

func (sb *SealCalls) TreeD(ctx context.Context, sector storiface.SectorRef, unsealed cid.Cid, size abi.PaddedPieceSize, data io.Reader, unpaddedData bool, fspaths, pathIDs storiface.SectorPaths) error {
var err error
defer func() {
if err != nil {
clerr := removeDRCTrees(fspaths.Cache, true)
if clerr != nil {
log.Errorw("removing tree files after TreeDRC error", "error", clerr, "exec-error", err, "sector", sector, "cache", fspaths.Cache)
}
}
}()

treeDUnsealed, err := proof.BuildTreeD(data, unpaddedData, filepath.Join(fspaths.Cache, proofpaths.TreeDName), size)
if err != nil {
return xerrors.Errorf("building tree-d: %w", err)
}

if treeDUnsealed != unsealed {
return xerrors.Errorf("tree-d cid mismatch with supplied unsealed cid")
}

if err := sb.ensureOneCopy(ctx, sector.ID, pathIDs, storiface.FTCache); err != nil {
return xerrors.Errorf("ensure one copy: %w", err)
}

return nil
}
Loading

0 comments on commit 3f0fb74

Please sign in to comment.