diff --git a/api/api_full.go b/api/api_full.go index 956231eb90f..6a56f04737e 100644 --- a/api/api_full.go +++ b/api/api_full.go @@ -884,7 +884,7 @@ func (o *QueryOffer) Order(client address.Address) RetrievalOrder { Client: client, Miner: o.Miner, - MinerPeer: o.MinerPeer, + MinerPeer: &o.MinerPeer, } } @@ -903,6 +903,8 @@ type RetrievalOrder struct { Root cid.Cid Piece *cid.Cid Size uint64 + + LocalStore *multistore.StoreID // if specified, get data from local store // TODO: support offset Total types.BigInt UnsealPrice types.BigInt @@ -910,7 +912,7 @@ type RetrievalOrder struct { PaymentIntervalIncrease uint64 Client address.Address Miner address.Address - MinerPeer retrievalmarket.RetrievalPeer + MinerPeer *retrievalmarket.RetrievalPeer } type InvocResult struct { diff --git a/build/openrpc/full.json.gz b/build/openrpc/full.json.gz index f1b8a36b758..297a6411e87 100644 Binary files a/build/openrpc/full.json.gz and b/build/openrpc/full.json.gz differ diff --git a/cli/client.go b/cli/client.go index 14f4fb552dc..347a09b5a43 100644 --- a/cli/client.go +++ b/cli/client.go @@ -1001,7 +1001,7 @@ var clientFindCmd = &cli.Command{ }, } -const DefaultMaxRetrievePrice = 1 +const DefaultMaxRetrievePrice = "0.01" var clientRetrieveCmd = &cli.Command{ Name: "retrieve", @@ -1022,12 +1022,15 @@ var clientRetrieveCmd = &cli.Command{ }, &cli.StringFlag{ Name: "maxPrice", - Usage: fmt.Sprintf("maximum price the client is willing to consider (default: %d FIL)", DefaultMaxRetrievePrice), + Usage: fmt.Sprintf("maximum price the client is willing to consider (default: %s FIL)", DefaultMaxRetrievePrice), }, &cli.StringFlag{ Name: "pieceCid", Usage: "require data to be retrieved from a specific Piece CID", }, + &cli.BoolFlag{ + Name: "allow-local", + }, }, Action: func(cctx *cli.Context) error { if cctx.NArg() != 2 { @@ -1057,18 +1060,6 @@ var clientRetrieveCmd = &cli.Command{ return err } - // Check if we already have this data locally - - /*has, err := api.ClientHasLocal(ctx, file) - if err != nil { - return err - } - - if has { - fmt.Println("Success: Already in local storage") - return nil - }*/ // TODO: fix - var pieceCid *cid.Cid if cctx.String("pieceCid") != "" { parsed, err := cid.Parse(cctx.String("pieceCid")) @@ -1078,69 +1069,93 @@ var clientRetrieveCmd = &cli.Command{ pieceCid = &parsed } - var offer api.QueryOffer - minerStrAddr := cctx.String("miner") - if minerStrAddr == "" { // Local discovery - offers, err := fapi.ClientFindData(ctx, file, pieceCid) + var order *lapi.RetrievalOrder + if cctx.Bool("allow-local") { + imports, err := fapi.ClientListImports(ctx) + if err != nil { + return err + } - var cleaned []api.QueryOffer - // filter out offers that errored - for _, o := range offers { - if o.Err == "" { - cleaned = append(cleaned, o) + for _, i := range imports { + if i.Root != nil && i.Root.Equals(file) { + order = &lapi.RetrievalOrder{ + Root: file, + LocalStore: &i.Key, + + Total: big.Zero(), + UnsealPrice: big.Zero(), + } + break } } + } - offers = cleaned + if order == nil { + var offer api.QueryOffer + minerStrAddr := cctx.String("miner") + if minerStrAddr == "" { // Local discovery + offers, err := fapi.ClientFindData(ctx, file, pieceCid) - // sort by price low to high - sort.Slice(offers, func(i, j int) bool { - return offers[i].MinPrice.LessThan(offers[j].MinPrice) - }) - if err != nil { - return err - } + var cleaned []api.QueryOffer + // filter out offers that errored + for _, o := range offers { + if o.Err == "" { + cleaned = append(cleaned, o) + } + } - // TODO: parse offer strings from `client find`, make this smarter - if len(offers) < 1 { - fmt.Println("Failed to find file") - return nil - } - offer = offers[0] - } else { // Directed retrieval - minerAddr, err := address.NewFromString(minerStrAddr) - if err != nil { - return err + offers = cleaned + + // sort by price low to high + sort.Slice(offers, func(i, j int) bool { + return offers[i].MinPrice.LessThan(offers[j].MinPrice) + }) + if err != nil { + return err + } + + // TODO: parse offer strings from `client find`, make this smarter + if len(offers) < 1 { + fmt.Println("Failed to find file") + return nil + } + offer = offers[0] + } else { // Directed retrieval + minerAddr, err := address.NewFromString(minerStrAddr) + if err != nil { + return err + } + offer, err = fapi.ClientMinerQueryOffer(ctx, minerAddr, file, pieceCid) + if err != nil { + return err + } } - offer, err = fapi.ClientMinerQueryOffer(ctx, minerAddr, file, pieceCid) - if err != nil { - return err + if offer.Err != "" { + return fmt.Errorf("The received offer errored: %s", offer.Err) } - } - if offer.Err != "" { - return fmt.Errorf("The received offer errored: %s", offer.Err) - } - maxPrice := types.FromFil(DefaultMaxRetrievePrice) + maxPrice := types.MustParseFIL(DefaultMaxRetrievePrice) - if cctx.String("maxPrice") != "" { - maxPriceFil, err := types.ParseFIL(cctx.String("maxPrice")) - if err != nil { - return xerrors.Errorf("parsing maxPrice: %w", err) + if cctx.String("maxPrice") != "" { + maxPrice, err = types.ParseFIL(cctx.String("maxPrice")) + if err != nil { + return xerrors.Errorf("parsing maxPrice: %w", err) + } } - maxPrice = types.BigInt(maxPriceFil) - } + if offer.MinPrice.GreaterThan(big.Int(maxPrice)) { + return xerrors.Errorf("failed to find offer satisfying maxPrice: %s", maxPrice) + } - if offer.MinPrice.GreaterThan(maxPrice) { - return xerrors.Errorf("failed to find offer satisfying maxPrice: %s", maxPrice) + o := offer.Order(payer) + order = &o } - ref := &lapi.FileRef{ Path: cctx.Args().Get(1), IsCAR: cctx.Bool("car"), } - updates, err := fapi.ClientRetrieveWithEvents(ctx, offer.Order(payer), ref) + + updates, err := fapi.ClientRetrieveWithEvents(ctx, *order, ref) if err != nil { return xerrors.Errorf("error setting up retrieval: %w", err) } diff --git a/documentation/en/api-methods.md b/documentation/en/api-methods.md index 8367ef2df31..2830be0bdb0 100644 --- a/documentation/en/api-methods.md +++ b/documentation/en/api-methods.md @@ -1383,6 +1383,7 @@ Inputs: }, "Piece": null, "Size": 42, + "LocalStore": 12, "Total": "0", "UnsealPrice": "0", "PaymentInterval": 42, @@ -1436,6 +1437,7 @@ Inputs: }, "Piece": null, "Size": 42, + "LocalStore": 12, "Total": "0", "UnsealPrice": "0", "PaymentInterval": 42, diff --git a/node/impl/client/client.go b/node/impl/client/client.go index 387057e46ca..cdef4d02b3b 100644 --- a/node/impl/client/client.go +++ b/node/impl/client/client.go @@ -57,6 +57,7 @@ import ( "github.com/filecoin-project/lotus/node/impl/paych" "github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/repo/importmgr" + "github.com/filecoin-project/lotus/node/repo/retrievalstoremgr" ) var DefaultHashFunction = uint64(mh.BLAKE2B_MIN + 31) @@ -77,6 +78,7 @@ type API struct { Chain *store.ChainStore Imports dtypes.ClientImportMgr + Mds dtypes.ClientMultiDstore CombinedBstore dtypes.ClientBlockstore // TODO: try to remove RetrievalStoreMgr dtypes.ClientRetrievalStoreManager @@ -582,86 +584,102 @@ func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref } } - if order.MinerPeer.ID == "" { - mi, err := a.StateMinerInfo(ctx, order.Miner, types.EmptyTSK) - if err != nil { - finish(err) - return - } + var store retrievalstoremgr.RetrievalStore - order.MinerPeer = retrievalmarket.RetrievalPeer{ - ID: *mi.PeerId, - Address: order.Miner, - } - } + if order.LocalStore == nil { + if order.MinerPeer == nil || order.MinerPeer.ID == "" { + mi, err := a.StateMinerInfo(ctx, order.Miner, types.EmptyTSK) + if err != nil { + finish(err) + return + } - if order.Size == 0 { - finish(xerrors.Errorf("cannot make retrieval deal for zero bytes")) - return - } + order.MinerPeer = &retrievalmarket.RetrievalPeer{ + ID: *mi.PeerId, + Address: order.Miner, + } + } - /*id, st, err := a.imgr().NewStore() - if err != nil { - return err - } - if err := a.imgr().AddLabel(id, "source", "retrieval"); err != nil { - return err - }*/ + if order.Size == 0 { + finish(xerrors.Errorf("cannot make retrieval deal for zero bytes")) + return + } - ppb := types.BigDiv(order.Total, types.NewInt(order.Size)) + /*id, st, err := a.imgr().NewStore() + if err != nil { + return err + } + if err := a.imgr().AddLabel(id, "source", "retrieval"); err != nil { + return err + }*/ - params, err := rm.NewParamsV1(ppb, order.PaymentInterval, order.PaymentIntervalIncrease, shared.AllSelector(), order.Piece, order.UnsealPrice) - if err != nil { - finish(xerrors.Errorf("Error in retrieval params: %s", err)) - return - } + ppb := types.BigDiv(order.Total, types.NewInt(order.Size)) - store, err := a.RetrievalStoreMgr.NewStore() - if err != nil { - finish(xerrors.Errorf("Error setting up new store: %w", err)) - return - } + params, err := rm.NewParamsV1(ppb, order.PaymentInterval, order.PaymentIntervalIncrease, shared.AllSelector(), order.Piece, order.UnsealPrice) + if err != nil { + finish(xerrors.Errorf("Error in retrieval params: %s", err)) + return + } - defer func() { - _ = a.RetrievalStoreMgr.ReleaseStore(store) - }() + store, err = a.RetrievalStoreMgr.NewStore() + if err != nil { + finish(xerrors.Errorf("Error setting up new store: %w", err)) + return + } - // Subscribe to events before retrieving to avoid losing events. - subscribeEvents := make(chan retrievalSubscribeEvent, 1) - subscribeCtx, cancel := context.WithCancel(ctx) - defer cancel() - unsubscribe := a.Retrieval.SubscribeToEvents(func(event rm.ClientEvent, state rm.ClientDealState) { - // We'll check the deal IDs inside readSubscribeEvents. - if state.PayloadCID.Equals(order.Root) { - select { - case <-subscribeCtx.Done(): - case subscribeEvents <- retrievalSubscribeEvent{event, state}: + defer func() { + _ = a.RetrievalStoreMgr.ReleaseStore(store) + }() + + // Subscribe to events before retrieving to avoid losing events. + subscribeEvents := make(chan retrievalSubscribeEvent, 1) + subscribeCtx, cancel := context.WithCancel(ctx) + defer cancel() + unsubscribe := a.Retrieval.SubscribeToEvents(func(event rm.ClientEvent, state rm.ClientDealState) { + // We'll check the deal IDs inside readSubscribeEvents. + if state.PayloadCID.Equals(order.Root) { + select { + case <-subscribeCtx.Done(): + case subscribeEvents <- retrievalSubscribeEvent{event, state}: + } } + }) + + dealID, err := a.Retrieval.Retrieve( + ctx, + order.Root, + params, + order.Total, + *order.MinerPeer, + order.Client, + order.Miner, + store.StoreID()) + + if err != nil { + unsubscribe() + finish(xerrors.Errorf("Retrieve failed: %w", err)) + return } - }) - dealID, err := a.Retrieval.Retrieve( - ctx, - order.Root, - params, - order.Total, - order.MinerPeer, - order.Client, - order.Miner, - store.StoreID()) + err = readSubscribeEvents(ctx, dealID, subscribeEvents, events) - if err != nil { unsubscribe() - finish(xerrors.Errorf("Retrieve failed: %w", err)) - return - } - - err = readSubscribeEvents(ctx, dealID, subscribeEvents, events) + if err != nil { + finish(xerrors.Errorf("Retrieve: %w", err)) + return + } + } else { + // local retrieval + st, err := ((*multistore.MultiStore)(a.Mds)).Get(*order.LocalStore) + if err != nil { + finish(xerrors.Errorf("Retrieve: %w", err)) + return + } - unsubscribe() - if err != nil { - finish(xerrors.Errorf("Retrieve: %w", err)) - return + store = &multiStoreRetrievalStore{ + storeID: *order.LocalStore, + store: st, + } } // If ref is nil, it only fetches the data into the configured blockstore. @@ -701,6 +719,19 @@ func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref return } +type multiStoreRetrievalStore struct { + storeID multistore.StoreID + store *multistore.Store +} + +func (mrs *multiStoreRetrievalStore) StoreID() *multistore.StoreID { + return &mrs.storeID +} + +func (mrs *multiStoreRetrievalStore) DAGService() ipld.DAGService { + return mrs.store.DAG +} + func (a *API) ClientQueryAsk(ctx context.Context, p peer.ID, miner address.Address) (*storagemarket.StorageAsk, error) { mi, err := a.StateMinerInfo(ctx, miner, types.EmptyTSK) if err != nil {