-
Notifications
You must be signed in to change notification settings - Fork 59
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Index publishing work #673
Changes from all commits
566b389
5a1a1b5
209b432
7fba642
09e27d4
a85e8b3
c6619ed
7f70337
17c3a6c
8713155
31e067c
383dfa8
e251d80
7165da0
8547a2d
7f49c96
d1271cd
068e7f3
e0faa0e
9f56617
9160420
e8772da
eb2fe1f
3d2e3c8
06ec150
7a2e429
c1c9ba8
93e4c64
ea0c8f1
8790cca
d26bd86
44fb837
ddc832f
a0db7a7
0f62c29
13b70d3
59b99bf
a6e18af
2460e15
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -354,7 +354,7 @@ func (p *Provider) HandleQueryStream(stream rmnet.RetrievalQueryStream) { | |
answer.Size = uint64(pieceInfo.Deals[0].Length.Unpadded()) // TODO: verify on intermediate | ||
answer.PieceCIDFound = retrievalmarket.QueryItemAvailable | ||
|
||
storageDeals, err := storageDealsForPiece(query.PieceCID != nil, query.PayloadCID, pieceInfo, p.pieceStore) | ||
storageDeals, err := p.storageDealsForPiece(query.PieceCID != nil, query.PayloadCID, pieceInfo) | ||
if err != nil { | ||
log.Errorf("Retrieval query: storageDealsForPiece: %s", err) | ||
answer.Status = retrievalmarket.QueryResponseError | ||
|
@@ -388,48 +388,61 @@ func (p *Provider) HandleQueryStream(stream rmnet.RetrievalQueryStream) { | |
sendResp(answer) | ||
} | ||
|
||
func (p *Provider) getPieceInfoFromCid(ctx context.Context, payloadCID, pieceCID cid.Cid) (piecestore.PieceInfo, bool, error) { | ||
cidInfo, err := p.pieceStore.GetCIDInfo(payloadCID) | ||
// Given the CID of a block, find a piece that contains that block. | ||
// If the client has specified which piece they want, return that piece. | ||
// Otherwise prefer pieces that are already unsealed. | ||
func (p *Provider) getPieceInfoFromCid(ctx context.Context, payloadCID, clientPieceCID cid.Cid) (piecestore.PieceInfo, bool, error) { | ||
// Get all pieces that contain the target block | ||
piecesWithTargetBlock, err := p.dagStore.GetPiecesContainingBlock(payloadCID) | ||
if err != nil { | ||
return piecestore.PieceInfoUndefined, false, xerrors.Errorf("get cid info: %w", err) | ||
return piecestore.PieceInfoUndefined, false, xerrors.Errorf("getting pieces for cid %s: %w", payloadCID, err) | ||
} | ||
|
||
// For each piece that contains the target block | ||
var lastErr error | ||
var sealedPieceInfo *piecestore.PieceInfo | ||
|
||
for _, pieceBlockLocation := range cidInfo.PieceBlockLocations { | ||
pieceInfo, err := p.pieceStore.GetPieceInfo(pieceBlockLocation.PieceCID) | ||
for _, pieceWithTargetBlock := range piecesWithTargetBlock { | ||
// Get the deals for the piece | ||
pieceInfo, err := p.pieceStore.GetPieceInfo(pieceWithTargetBlock) | ||
if err != nil { | ||
lastErr = err | ||
continue | ||
} | ||
|
||
// if client wants to retrieve the payload from a specific piece, just return that piece. | ||
if pieceCID.Defined() && pieceInfo.PieceCID.Equals(pieceCID) { | ||
if clientPieceCID.Defined() && pieceInfo.PieceCID.Equals(clientPieceCID) { | ||
return pieceInfo, p.pieceInUnsealedSector(ctx, pieceInfo), nil | ||
} | ||
|
||
// if client dosen't have a preference for a particular piece, prefer a piece | ||
// if client doesn't have a preference for a particular piece, prefer a piece | ||
// for which an unsealed sector exists. | ||
if pieceCID.Equals(cid.Undef) { | ||
if clientPieceCID.Equals(cid.Undef) { | ||
if p.pieceInUnsealedSector(ctx, pieceInfo) { | ||
// The piece is in an unsealed sector, so just return it | ||
return pieceInfo, true, nil | ||
} | ||
|
||
if sealedPieceInfo == nil { | ||
// The piece is not in an unsealed sector, so save it but keep | ||
// checking other pieces to see if there is one that is in an | ||
// unsealed sector | ||
sealedPieceInfo = &pieceInfo | ||
} | ||
} | ||
|
||
} | ||
|
||
// Found a piece containing the target block, piece is in a sealed sector | ||
if sealedPieceInfo != nil { | ||
return *sealedPieceInfo, false, nil | ||
} | ||
|
||
// Couldn't find a piece containing the target block | ||
if lastErr == nil { | ||
lastErr = xerrors.Errorf("unknown pieceCID %s", pieceCID.String()) | ||
lastErr = xerrors.Errorf("unknown pieceCID %s", clientPieceCID.String()) | ||
} | ||
|
||
// Error finding a piece containing the target block | ||
return piecestore.PieceInfoUndefined, false, xerrors.Errorf("could not locate piece: %w", lastErr) | ||
} | ||
|
||
|
@@ -448,6 +461,65 @@ func (p *Provider) pieceInUnsealedSector(ctx context.Context, pieceInfo piecesto | |
return false | ||
} | ||
|
||
func (p *Provider) storageDealsForPiece(clientSpecificPiece bool, payloadCID cid.Cid, pieceInfo piecestore.PieceInfo) ([]abi.DealID, error) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is an existing problem (not introduced by this PR) but this method has a weird logic to it -
To me this says: make it two methods -- and also, consider the interrelated logic from the the HandleQueryStream/getPieceInfoFromCid and see if there's a way to simply. Definitely non-blocking, just noting. |
||
var storageDeals []abi.DealID | ||
var err error | ||
if clientSpecificPiece { | ||
// If the user wants to retrieve the payload from a specific piece, | ||
// we only need to inspect storage deals made for that piece to quote a price. | ||
for _, d := range pieceInfo.Deals { | ||
storageDeals = append(storageDeals, d.DealID) | ||
} | ||
} else { | ||
// If the user does NOT want to retrieve from a specific piece, we'll have to inspect all storage deals | ||
// made for that piece to quote a price. | ||
storageDeals, err = p.getAllDealsContainingPayload(payloadCID) | ||
if err != nil { | ||
return nil, xerrors.Errorf("failed to fetch deals for payload: %w", err) | ||
} | ||
} | ||
|
||
if len(storageDeals) == 0 { | ||
return nil, xerrors.New("no storage deals found") | ||
} | ||
|
||
return storageDeals, nil | ||
} | ||
|
||
func (p *Provider) getAllDealsContainingPayload(payloadCID cid.Cid) ([]abi.DealID, error) { | ||
// Get all pieces that contain the target block | ||
piecesWithTargetBlock, err := p.dagStore.GetPiecesContainingBlock(payloadCID) | ||
if err != nil { | ||
return nil, xerrors.Errorf("getting pieces for cid %s: %w", payloadCID, err) | ||
} | ||
|
||
// For each piece that contains the target block | ||
var lastErr error | ||
var dealsIds []abi.DealID | ||
for _, pieceWithTargetBlock := range piecesWithTargetBlock { | ||
// Get the deals for the piece | ||
pieceInfo, err := p.pieceStore.GetPieceInfo(pieceWithTargetBlock) | ||
if err != nil { | ||
lastErr = err | ||
continue | ||
} | ||
|
||
for _, d := range pieceInfo.Deals { | ||
dealsIds = append(dealsIds, d.DealID) | ||
} | ||
} | ||
|
||
if lastErr == nil && len(dealsIds) == 0 { | ||
return nil, xerrors.New("no deals found") | ||
} | ||
|
||
if lastErr != nil && len(dealsIds) == 0 { | ||
return nil, xerrors.Errorf("failed to fetch deals containing payload %s: %w", payloadCID, lastErr) | ||
} | ||
|
||
return dealsIds, nil | ||
} | ||
|
||
// GetDynamicAsk quotes a dynamic price for the retrieval deal by calling the user configured | ||
// dynamic pricing function. It passes the static price parameters set in the Ask Store to the pricing function. | ||
func (p *Provider) GetDynamicAsk(ctx context.Context, input retrievalmarket.PricingInput, storageDeals []abi.DealID) (retrievalmarket.Ask, error) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did it become more flaky in this PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.