Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storageadapter: Look at precommits on-chain since deal publish msg #5398

Merged
merged 3 commits into from
Jan 21, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions markets/loggers/loggers.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,22 @@ var log = logging.Logger("markets")

// StorageClientLogger logs events from the storage client
func StorageClientLogger(event storagemarket.ClientEvent, deal storagemarket.ClientDeal) {
log.Infow("storage event", "name", storagemarket.ClientEvents[event], "proposal CID", deal.ProposalCid, "state", storagemarket.DealStates[deal.State], "message", deal.Message)
log.Infow("storage client event", "name", storagemarket.ClientEvents[event], "proposal CID", deal.ProposalCid, "state", storagemarket.DealStates[deal.State], "message", deal.Message)
}

// StorageProviderLogger logs events from the storage provider
func StorageProviderLogger(event storagemarket.ProviderEvent, deal storagemarket.MinerDeal) {
log.Infow("storage event", "name", storagemarket.ProviderEvents[event], "proposal CID", deal.ProposalCid, "state", storagemarket.DealStates[deal.State], "message", deal.Message)
log.Infow("storage provider event", "name", storagemarket.ProviderEvents[event], "proposal CID", deal.ProposalCid, "state", storagemarket.DealStates[deal.State], "message", deal.Message)
}

// RetrievalClientLogger logs events from the retrieval client
func RetrievalClientLogger(event retrievalmarket.ClientEvent, deal retrievalmarket.ClientDealState) {
log.Infow("retrieval event", "name", retrievalmarket.ClientEvents[event], "deal ID", deal.ID, "state", retrievalmarket.DealStatuses[deal.Status], "message", deal.Message)
log.Infow("retrieval client event", "name", retrievalmarket.ClientEvents[event], "deal ID", deal.ID, "state", retrievalmarket.DealStatuses[deal.Status], "message", deal.Message)
}

// RetrievalProviderLogger logs events from the retrieval provider
func RetrievalProviderLogger(event retrievalmarket.ProviderEvent, deal retrievalmarket.ProviderDealState) {
log.Infow("retrieval event", "name", retrievalmarket.ProviderEvents[event], "deal ID", deal.ID, "receiver", deal.Receiver, "state", retrievalmarket.DealStatuses[deal.Status], "message", deal.Message)
log.Infow("retrieval provider event", "name", retrievalmarket.ProviderEvents[event], "deal ID", deal.ID, "receiver", deal.Receiver, "state", retrievalmarket.DealStatuses[deal.Status], "message", deal.Message)
}

// DataTransferLogger logs events from the data transfer module
Expand Down
44 changes: 37 additions & 7 deletions markets/storageadapter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/filecoin-project/go-address"
cborutil "github.com/filecoin-project/go-cbor-util"
"github.com/ipfs/go-cid"
cbor "github.com/ipfs/go-ipld-cbor"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-fil-markets/shared"
Expand All @@ -22,8 +23,11 @@ import (
market2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/market"

"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/api/apibstore"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/actors/adt"
marketactor "github.com/filecoin-project/lotus/chain/actors/builtin/market"
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
"github.com/filecoin-project/lotus/chain/events"
"github.com/filecoin-project/lotus/chain/events/state"
"github.com/filecoin-project/lotus/chain/market"
Expand All @@ -34,9 +38,7 @@ import (
)

type ClientNodeAdapter struct {
full.StateAPI
full.ChainAPI
full.MpoolAPI
*clientApi

fundmgr *market.FundManager
ev *events.Events
Expand All @@ -46,14 +48,42 @@ type ClientNodeAdapter struct {
type clientApi struct {
full.ChainAPI
full.StateAPI
full.MpoolAPI
}

func (ca *clientApi) diffPreCommits(ctx context.Context, actor address.Address, pre, cur types.TipSetKey) (*miner.PreCommitChanges, error) {
store := adt.WrapStore(ctx, cbor.NewCborStore(apibstore.NewAPIBlockstore(ca)))

preAct, err := ca.StateGetActor(ctx, actor, pre)
if err != nil {
return nil, xerrors.Errorf("getting pre actor: %w", err)
}
curAct, err := ca.StateGetActor(ctx, actor, cur)
if err != nil {
return nil, xerrors.Errorf("getting cur actor: %w", err)
}

preSt, err := miner.Load(store, preAct)
if err != nil {
return nil, xerrors.Errorf("loading miner actor: %w", err)
}
curSt, err := miner.Load(store, curAct)
if err != nil {
return nil, xerrors.Errorf("loading miner actor: %w", err)
}

diff, err := miner.DiffPreCommits(preSt, curSt)
if err != nil {
return nil, xerrors.Errorf("diff precommits: %w", err)
}

return diff, err
}

func NewClientNodeAdapter(stateapi full.StateAPI, chain full.ChainAPI, mpool full.MpoolAPI, fundmgr *market.FundManager) storagemarket.StorageClientNode {
capi := &clientApi{chain, stateapi}
capi := &clientApi{chain, stateapi, mpool}
return &ClientNodeAdapter{
StateAPI: stateapi,
ChainAPI: chain,
MpoolAPI: mpool,
clientApi: capi,

fundmgr: fundmgr,
ev: events.NewEvents(context.TODO(), capi),
Expand Down
27 changes: 15 additions & 12 deletions markets/storageadapter/getcurrentdealinfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/filecoin-project/go-state-types/exitcode"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/actors/builtin/market"
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
"github.com/filecoin-project/lotus/chain/types"
"github.com/ipfs/go-cid"
"golang.org/x/xerrors"
Expand All @@ -18,47 +19,49 @@ type getCurrentDealInfoAPI interface {
StateLookupID(context.Context, address.Address, types.TipSetKey) (address.Address, error)
StateMarketStorageDeal(context.Context, abi.DealID, types.TipSetKey) (*api.MarketDeal, error)
StateSearchMsg(context.Context, cid.Cid) (*api.MsgLookup, error)

diffPreCommits(ctx context.Context, actor address.Address, pre, cur types.TipSetKey) (*miner.PreCommitChanges, error)
}

// GetCurrentDealInfo gets current information on a deal, and corrects the deal ID as needed
func GetCurrentDealInfo(ctx context.Context, ts *types.TipSet, api getCurrentDealInfoAPI, dealID abi.DealID, proposal market.DealProposal, publishCid *cid.Cid) (abi.DealID, *api.MarketDeal, error) {
func GetCurrentDealInfo(ctx context.Context, ts *types.TipSet, api getCurrentDealInfoAPI, dealID abi.DealID, proposal market.DealProposal, publishCid *cid.Cid) (abi.DealID, *api.MarketDeal, types.TipSetKey, error) {
marketDeal, dealErr := api.StateMarketStorageDeal(ctx, dealID, ts.Key())
if dealErr == nil {
equal, err := checkDealEquality(ctx, ts, api, proposal, marketDeal.Proposal)
if err != nil {
return dealID, nil, err
return dealID, nil, types.EmptyTSK, err
}
if equal {
return dealID, marketDeal, nil
return dealID, marketDeal, types.EmptyTSK, nil
}
dealErr = xerrors.Errorf("Deal proposals did not match")
}
if publishCid == nil {
return dealID, nil, dealErr
return dealID, nil, types.EmptyTSK, dealErr
}
// attempt deal id correction
lookup, err := api.StateSearchMsg(ctx, *publishCid)
if err != nil {
return dealID, nil, err
return dealID, nil, types.EmptyTSK, err
}

if lookup.Receipt.ExitCode != exitcode.Ok {
return dealID, nil, xerrors.Errorf("looking for publish deal message %s: non-ok exit code: %s", *publishCid, lookup.Receipt.ExitCode)
return dealID, nil, types.EmptyTSK, xerrors.Errorf("looking for publish deal message %s: non-ok exit code: %s", *publishCid, lookup.Receipt.ExitCode)
}

var retval market.PublishStorageDealsReturn
if err := retval.UnmarshalCBOR(bytes.NewReader(lookup.Receipt.Return)); err != nil {
return dealID, nil, xerrors.Errorf("looking for publish deal message: unmarshaling message return: %w", err)
return dealID, nil, types.EmptyTSK, xerrors.Errorf("looking for publish deal message: unmarshaling message return: %w", err)
}

if len(retval.IDs) != 1 {
// market currently only ever sends messages with 1 deal
return dealID, nil, xerrors.Errorf("can't recover dealIDs from publish deal message with more than 1 deal")
return dealID, nil, types.EmptyTSK, xerrors.Errorf("can't recover dealIDs from publish deal message with more than 1 deal")
}

if retval.IDs[0] == dealID {
// DealID did not change, so we are stuck with the original lookup error
return dealID, nil, dealErr
return dealID, nil, lookup.TipSet, dealErr
}

dealID = retval.IDs[0]
Expand All @@ -67,13 +70,13 @@ func GetCurrentDealInfo(ctx context.Context, ts *types.TipSet, api getCurrentDea
if err == nil {
equal, err := checkDealEquality(ctx, ts, api, proposal, marketDeal.Proposal)
if err != nil {
return dealID, nil, err
return dealID, nil, types.EmptyTSK, err
}
if !equal {
return dealID, nil, xerrors.Errorf("Deal proposals did not match")
return dealID, nil, types.EmptyTSK, xerrors.Errorf("Deal proposals did not match")
}
}
return dealID, marketDeal, err
return dealID, marketDeal, lookup.TipSet, err
}

func checkDealEquality(ctx context.Context, ts *types.TipSet, api getCurrentDealInfoAPI, p1, p2 market.DealProposal) (bool, error) {
Expand Down
7 changes: 6 additions & 1 deletion markets/storageadapter/getcurrentdealinfo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/filecoin-project/go-state-types/exitcode"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/actors/builtin/market"
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
test "github.com/filecoin-project/lotus/chain/events/state/mock"
"github.com/filecoin-project/lotus/chain/types"
"github.com/ipfs/go-cid"
Expand Down Expand Up @@ -209,7 +210,7 @@ func TestGetCurrentDealInfo(t *testing.T) {
MarketDeals: marketDeals,
}

dealID, marketDeal, err := GetCurrentDealInfo(ctx, ts, api, startDealID, proposal, data.publishCid)
dealID, marketDeal, _, err := GetCurrentDealInfo(ctx, ts, api, startDealID, proposal, data.publishCid)
require.Equal(t, data.expectedDealID, dealID)
require.Equal(t, data.expectedMarketDeal, marketDeal)
if data.expectedError == nil {
Expand All @@ -236,6 +237,10 @@ type mockGetCurrentDealInfoAPI struct {
MarketDeals map[marketDealKey]*api.MarketDeal
}

func (mapi *mockGetCurrentDealInfoAPI) diffPreCommits(ctx context.Context, actor address.Address, pre, cur types.TipSetKey) (*miner.PreCommitChanges, error) {
return &miner.PreCommitChanges{}, nil
}

func (mapi *mockGetCurrentDealInfoAPI) StateMarketStorageDeal(ctx context.Context, dealID abi.DealID, ts types.TipSetKey) (*api.MarketDeal, error) {
deal, ok := mapi.MarketDeals[marketDealKey{dealID, ts}]
if !ok {
Expand Down
50 changes: 38 additions & 12 deletions markets/storageadapter/ondealsectorcommitted.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,18 @@ import (
"context"
"sync"

"github.com/ipfs/go-cid"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-state-types/abi"

"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/actors/builtin/market"
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
"github.com/filecoin-project/lotus/chain/events"
"github.com/filecoin-project/lotus/chain/types"
"github.com/ipfs/go-cid"
"golang.org/x/xerrors"
)

type sectorCommittedEventsAPI interface {
Expand All @@ -32,7 +34,7 @@ func OnDealSectorPreCommitted(ctx context.Context, api getCurrentDealInfoAPI, ev

// First check if the deal is already active, and if so, bail out
checkFunc := func(ts *types.TipSet) (done bool, more bool, err error) {
isActive, err := checkIfDealAlreadyActive(ctx, api, ts, dealID, proposal, publishCid)
di, isActive, publishTs, err := checkIfDealAlreadyActive(ctx, api, ts, dealID, proposal, publishCid)
if err != nil {
// Note: the error returned from here will end up being returned
// from OnDealSectorPreCommitted so no need to call the callback
Expand All @@ -46,6 +48,30 @@ func OnDealSectorPreCommitted(ctx context.Context, api getCurrentDealInfoAPI, ev
return true, false, nil
}

if publishTs == types.EmptyTSK {
lookup, err := api.StateSearchMsg(ctx, *publishCid)
if err != nil {
return false, false, err
}
if lookup != nil { // can be nil in tests
publishTs = lookup.TipSet
}
}

diff, err := api.diffPreCommits(ctx, provider, publishTs, ts.Key())
if err != nil {
return false, false, err
}

for _, info := range diff.Added {
for _, d := range info.Info.DealIDs {
if d == di {
cb(info.Info.SectorNumber, false, nil)
return true, false, nil
}
}
}

magik6k marked this conversation as resolved.
Show resolved Hide resolved
// Not yet active, start matching against incoming messages
return false, true, nil
}
Expand Down Expand Up @@ -88,7 +114,7 @@ func OnDealSectorPreCommitted(ctx context.Context, api getCurrentDealInfoAPI, ev

// When the deal is published, the deal ID may change, so get the
// current deal ID from the publish message CID
dealID, _, err = GetCurrentDealInfo(ctx, ts, api, dealID, proposal, publishCid)
dealID, _, _, err = GetCurrentDealInfo(ctx, ts, api, dealID, proposal, publishCid)
if err != nil {
return false, err
}
Expand Down Expand Up @@ -130,7 +156,7 @@ func OnDealSectorCommitted(ctx context.Context, api getCurrentDealInfoAPI, event

// First check if the deal is already active, and if so, bail out
checkFunc := func(ts *types.TipSet) (done bool, more bool, err error) {
isActive, err := checkIfDealAlreadyActive(ctx, api, ts, dealID, proposal, publishCid)
_, isActive, _, err := checkIfDealAlreadyActive(ctx, api, ts, dealID, proposal, publishCid)
if err != nil {
// Note: the error returned from here will end up being returned
// from OnDealSectorCommitted so no need to call the callback
Expand Down Expand Up @@ -186,7 +212,7 @@ func OnDealSectorCommitted(ctx context.Context, api getCurrentDealInfoAPI, event
}

// Get the deal info
_, sd, err := GetCurrentDealInfo(ctx, ts, api, dealID, proposal, publishCid)
_, sd, _, err := GetCurrentDealInfo(ctx, ts, api, dealID, proposal, publishCid)
if err != nil {
return false, xerrors.Errorf("failed to look up deal on chain: %w", err)
}
Expand Down Expand Up @@ -216,22 +242,22 @@ func OnDealSectorCommitted(ctx context.Context, api getCurrentDealInfoAPI, event
return nil
}

func checkIfDealAlreadyActive(ctx context.Context, api getCurrentDealInfoAPI, ts *types.TipSet, dealID abi.DealID, proposal market.DealProposal, publishCid *cid.Cid) (bool, error) {
_, sd, err := GetCurrentDealInfo(ctx, ts, api, dealID, proposal, publishCid)
func checkIfDealAlreadyActive(ctx context.Context, api getCurrentDealInfoAPI, ts *types.TipSet, dealID abi.DealID, proposal market.DealProposal, publishCid *cid.Cid) (abi.DealID, bool, types.TipSetKey, error) {
di, sd, publishTs, err := GetCurrentDealInfo(ctx, ts, api, dealID, proposal, publishCid)
if err != nil {
// TODO: This may be fine for some errors
return false, xerrors.Errorf("failed to look up deal on chain: %w", err)
return 0, false, types.EmptyTSK, xerrors.Errorf("failed to look up deal on chain: %w", err)
}

// Sector with deal is already active
if sd.State.SectorStartEpoch > 0 {
return true, nil
return 0, true, publishTs, nil
}

// Sector was slashed
if sd.State.SlashEpoch > 0 {
return false, xerrors.Errorf("deal %d was slashed at epoch %d", dealID, sd.State.SlashEpoch)
return 0, false, types.EmptyTSK, xerrors.Errorf("deal %d was slashed at epoch %d", dealID, sd.State.SlashEpoch)
}

return false, nil
return di, false, publishTs, nil
}
3 changes: 1 addition & 2 deletions markets/storageadapter/ondealsectorcommitted_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,7 @@ func TestOnDealSectorPreCommitted(t *testing.T) {
deals: map[abi.DealID]*api.MarketDeal{},
},
},
expectedCBCallCount: 1,
expectedCBError: errors.New("handling applied event: something went wrong"),
expectedCBCallCount: 0,
expectedError: errors.New("failed to set up called handler: something went wrong"),
},
"proposed deal epoch timeout": {
Expand Down
Loading