Skip to content

Commit

Permalink
Minimal alternate metadata type support (#365)
Browse files Browse the repository at this point in the history
* feat(message): minimal alternate metadata support

* Update requestmanager/reconciledloader/reconciledloader_test.go

Co-authored-by: Rod Vagg <[email protected]>

* Update graphsync.go

Co-authored-by: Rod Vagg <[email protected]>

Co-authored-by: Rod Vagg <[email protected]>
  • Loading branch information
hannahhoward and rvagg authored Mar 9, 2022
1 parent f4afe52 commit c5ec428
Show file tree
Hide file tree
Showing 7 changed files with 175 additions and 20 deletions.
14 changes: 14 additions & 0 deletions graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,11 +227,25 @@ const (
// is included a this message
LinkActionPresent = LinkAction("Present")

// LinkActionDuplicateNotSent means the linked block was present on this machine,
// but I am not sending it (most likely duplicate)
LinkActionDuplicateNotSent = LinkAction("DuplicateNotSent")

// LinkActionMissing means I did not have the linked block, so I skipped over
// this part of the traversal
LinkActionMissing = LinkAction("Missing")

// LinkActionDuplicateDAGSkipped means the DAG with this link points toward has already
// been traversed entirely in the course of this request so I am skipping over it
LinkActionDuplicateDAGSkipped = LinkAction("DuplicateDAGSkipped")
)

// DidFollowLink indicates whether the remote actually loaded the block and
// followed it in its selector traversal
func (l LinkAction) DidFollowLink() bool {
return l == LinkActionPresent || l == LinkActionDuplicateNotSent
}

// LinkMetadataIterator is used to access individual link metadata through a
// LinkMetadata object
type LinkMetadataIterator func(cid.Cid, LinkAction)
Expand Down
4 changes: 2 additions & 2 deletions message/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func TestMessageBuilding(t *testing.T) {
rb.AddResponseCode(requestID2, graphsync.RequestCompletedFull)

rb.AddLink(requestID3, links[0], graphsync.LinkActionPresent)
rb.AddLink(requestID3, links[1], graphsync.LinkActionPresent)
rb.AddLink(requestID3, links[1], graphsync.LinkActionDuplicateNotSent)

rb.AddResponseCode(requestID4, graphsync.RequestCompletedFull)
rb.AddExtensionData(requestID1, extension1)
Expand Down Expand Up @@ -101,7 +101,7 @@ func TestMessageBuilding(t *testing.T) {
require.Equal(t, graphsync.PartialResponse, response3.Status(), "did not generate partial response")
assertMetadata(t, response3, []GraphSyncLinkMetadatum{
{Link: links[0].(cidlink.Link).Cid, Action: graphsync.LinkActionPresent},
{Link: links[1].(cidlink.Link).Cid, Action: graphsync.LinkActionPresent},
{Link: links[1].(cidlink.Link).Cid, Action: graphsync.LinkActionDuplicateNotSent},
})
assertExtension(t, response3, extension2)

Expand Down
4 changes: 2 additions & 2 deletions message/ipldbind/schema.ipldsch
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ type GraphSyncLinkAction enum {
| Present ("p")
# DuplicateNotSent means the linked block was present on this machine, but I
# am not sending it (most likely duplicate)
# TODO: | DuplicateNotSent ("d")
| DuplicateNotSent ("d")
# Missing means I did not have the linked block, so I skipped over this part
# of the traversal
| Missing ("m")
# DuplicateDAGSkipped means the DAG with this link points toward has already
# been traversed entirely in the course of this request
# so I am skipping over it entirely
# TODO: | DuplicateDAGSkipped ("s")
| DuplicateDAGSkipped ("s")
} representation string

# Metadata for each "link" in the DAG being communicated, each block gets one of
Expand Down
15 changes: 9 additions & 6 deletions message/v2/ipld_roundtrip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,13 @@ func TestIPLDRoundTrip(t *testing.T) {
id2: message.NewRequest(id2, root2, selectorparse.CommonSelector_ExploreAllRecursively, graphsync.Priority(202)),
}

metadata := message.GraphSyncLinkMetadatum{Link: root2, Action: graphsync.LinkActionMissing}
metadata := []message.GraphSyncLinkMetadatum{{Link: root2, Action: graphsync.LinkActionMissing}, {Link: root1, Action: graphsync.LinkActionDuplicateNotSent}}

metadata2 := []message.GraphSyncLinkMetadatum{{Link: root2, Action: graphsync.LinkActionDuplicateDAGSkipped}, {Link: root1, Action: graphsync.LinkActionPresent}}

responses := map[graphsync.RequestID]message.GraphSyncResponse{
id1: message.NewResponse(id1, graphsync.RequestFailedContentNotFound, []message.GraphSyncLinkMetadatum{metadata}),
id2: message.NewResponse(id2, graphsync.PartialResponse, nil, extension2),
id1: message.NewResponse(id1, graphsync.RequestFailedContentNotFound, metadata),
id2: message.NewResponse(id2, graphsync.PartialResponse, metadata2, extension2),
}

blks := testutil.GenerateBlocksOfSize(2, 100)
Expand Down Expand Up @@ -115,11 +117,12 @@ func TestIPLDRoundTrip(t *testing.T) {
require.Equal(t, rtresmap[id2].Status(), graphsync.PartialResponse)
gslm1, ok := rtresmap[id1].Metadata().(message.GraphSyncLinkMetadata)
require.True(t, ok)
require.Len(t, gslm1.RawMetadata(), 1)
require.Equal(t, gslm1.RawMetadata()[0], metadata)
require.Len(t, gslm1.RawMetadata(), 2)
require.Equal(t, gslm1.RawMetadata(), metadata)
gslm2, ok := rtresmap[id2].Metadata().(message.GraphSyncLinkMetadata)
require.True(t, ok)
require.Empty(t, gslm2.RawMetadata())
require.Len(t, gslm2.RawMetadata(), 2)
require.Equal(t, gslm2.RawMetadata(), metadata2)
require.Empty(t, rtresmap[id1].ExtensionNames())
require.Len(t, rtresmap[id2].ExtensionNames(), 1)
rtext2, exists := rtresmap[id2].Extension(extension2Name)
Expand Down
4 changes: 2 additions & 2 deletions requestmanager/reconciledloader/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (rl *ReconciledLoader) blockReadOpener(lctx linking.LinkContext, link datam
}

// only attempt remote load if after reconciliation we're not on a missing path
if !rl.pathTracker.stillOnMissingRemotePath(lctx.LinkPath) {
if !rl.pathTracker.stillOnUnfollowedRemotePath(lctx.LinkPath) {
data, err := rl.loadRemote(lctx, link)
if data != nil {
return true, types.AsyncLoadResult{Data: data, Local: false}
Expand Down Expand Up @@ -164,7 +164,7 @@ func (rl *ReconciledLoader) waitRemote() (bool, error) {
path := rl.verifier.CurrentPath()
head := rl.remoteQueue.first()
rl.remoteQueue.consume()
err := rl.verifier.VerifyNext(head.link, head.action != graphsync.LinkActionMissing)
err := rl.verifier.VerifyNext(head.link, head.action.DidFollowLink())
if err != nil {
return true, err
}
Expand Down
16 changes: 8 additions & 8 deletions requestmanager/reconciledloader/pathtracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,22 @@ import (
// pathTracker is just a simple utility to track whether we're on a missing
// path for the remote
type pathTracker struct {
lastMissingRemotePath datamodel.Path
lastUnfollowedRemotePath datamodel.Path
}

// stillOnMissingRemotePath determines whether the next link load will be from
// stillOnUnfollowedRemotePath determines whether the next link load will be from
// a path missing from the remote
// if it won't be, based on the linear nature of selector traversals, it wipes
// the last missing state
func (pt *pathTracker) stillOnMissingRemotePath(newPath datamodel.Path) bool {
func (pt *pathTracker) stillOnUnfollowedRemotePath(newPath datamodel.Path) bool {
// is there a known missing path?
if pt.lastMissingRemotePath.Len() == 0 {
if pt.lastUnfollowedRemotePath.Len() == 0 {
return false
}
// are we still on it?
if newPath.Len() <= pt.lastMissingRemotePath.Len() {
if newPath.Len() <= pt.lastUnfollowedRemotePath.Len() {
// if not, reset to no known missing remote path
pt.lastMissingRemotePath = datamodel.NewPath(nil)
pt.lastUnfollowedRemotePath = datamodel.NewPath(nil)
return false
}
// otherwise we're on a missing path
Expand All @@ -34,8 +34,8 @@ func (pt *pathTracker) stillOnMissingRemotePath(newPath datamodel.Path) bool {
// at the given path
func (pt *pathTracker) recordRemoteLoadAttempt(currentPath datamodel.Path, action graphsync.LinkAction) {
// if the last remote link was missing
if action == graphsync.LinkActionMissing {
if !action.DidFollowLink() {
// record the last known missing path
pt.lastMissingRemotePath = currentPath
pt.lastUnfollowedRemotePath = currentPath
}
}
138 changes: 138 additions & 0 deletions requestmanager/reconciledloader/reconciledloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,29 @@ func TestReconciledLoader(t *testing.T) {
},
),
},
"remote skips DAG": {
root: testChain.TipLink.(cidlink.Link).Cid,
baseStore: testBCStorage,
presentRemoteBlocks: testChain.AllBlocks(),
remoteSeq: append(metadataRange(testChain, 0, 30, false),
message.GraphSyncLinkMetadatum{
Link: testChain.LinkTipIndex(30).(cidlink.Link).Cid,
Action: graphsync.LinkActionDuplicateDAGSkipped,
}),
steps: append(append([]step{
goOnline{},
injest{metadataStart: 0, metadataEnd: 31},
}, syncLoadRange(testChain, 0, 30, false)...),
// we should get an error that we're missing a block for our response
syncLoad{
loadSeq: 30,
expectedResult: types.AsyncLoadResult{Local: true, Err: graphsync.RemoteMissingBlockErr{
Link: testChain.LinkTipIndex(30),
Path: testChain.PathTipIndex(30),
}},
},
),
},
"remote missing chain that local has": {
root: testChain.TipLink.(cidlink.Link).Cid,
baseStore: testBCStorage,
Expand Down Expand Up @@ -251,6 +274,39 @@ func TestReconciledLoader(t *testing.T) {
syncLoadRange(testChain, 30, 100, true)...,
),
},
"remote skips chain that local has": {
root: testChain.TipLink.(cidlink.Link).Cid,
baseStore: testBCStorage,
presentRemoteBlocks: testChain.AllBlocks(),
presentLocalBlocks: testChain.Blocks(30, 100),
remoteSeq: append(metadataRange(testChain, 0, 30, false),
message.GraphSyncLinkMetadatum{
Link: testChain.LinkTipIndex(30).(cidlink.Link).Cid,
Action: graphsync.LinkActionDuplicateDAGSkipped,
}),
steps: append(append(append(
[]step{
goOnline{},
injest{metadataStart: 0, metadataEnd: 31},
},
// load the blocks the remote has
syncLoadRange(testChain, 0, 30, false)...),
[]step{
// load the block the remote missing says it's missing locally
syncLoadRange(testChain, 30, 31, true)[0],
asyncLoad{loadSeq: 31},
// at this point we have no more remote responses, since it's a linear chain
verifyNoAsyncResult{},
// we'd expect the remote would terminate here, since we've sent the last missing block
goOffline{},
// this will cause us to start loading locally only again
verifyAsyncResult{
expectedResult: types.AsyncLoadResult{Local: true, Data: testChain.Blocks(31, 32)[0].RawData()},
},
}...),
syncLoadRange(testChain, 30, 100, true)...,
),
},
"remote missing chain that local has partial": {
root: testChain.TipLink.(cidlink.Link).Cid,
baseStore: testBCStorage,
Expand Down Expand Up @@ -326,6 +382,62 @@ func TestReconciledLoader(t *testing.T) {
syncLoad{loadSeq: 7, expectedResult: types.AsyncLoadResult{Local: true, Data: testTree.LeafAlphaBlock.RawData()}},
},
},
"remote duplicate not sent, blocks can load from local": {
root: testTree.RootBlock.Cid(),
baseStore: testTree.Storage,
presentRemoteBlocks: []blocks.Block{
testTree.RootBlock,
testTree.MiddleListBlock,
testTree.MiddleMapBlock,
testTree.LeafAlphaBlock,
testTree.LeafBetaBlock,
},
presentLocalBlocks: nil,
remoteSeq: []message.GraphSyncLinkMetadatum{
{Link: testTree.RootBlock.Cid(), Action: graphsync.LinkActionPresent},
{Link: testTree.MiddleListBlock.Cid(), Action: graphsync.LinkActionPresent},
{Link: testTree.LeafAlphaBlock.Cid(), Action: graphsync.LinkActionPresent},
{Link: testTree.LeafAlphaBlock.Cid(), Action: graphsync.LinkActionDuplicateNotSent},
{Link: testTree.LeafBetaBlock.Cid(), Action: graphsync.LinkActionPresent},
{Link: testTree.LeafAlphaBlock.Cid(), Action: graphsync.LinkActionDuplicateNotSent},
{Link: testTree.MiddleMapBlock.Cid(), Action: graphsync.LinkActionPresent},
{Link: testTree.LeafAlphaBlock.Cid(), Action: graphsync.LinkActionDuplicateNotSent},
},
steps: []step{
goOnline{},
injest{metadataStart: 0, metadataEnd: 8},
syncLoad{loadSeq: 0, expectedResult: types.AsyncLoadResult{Local: false, Data: testTree.RootBlock.RawData()}},
syncLoad{loadSeq: 1, expectedResult: types.AsyncLoadResult{Local: false, Data: testTree.MiddleListBlock.RawData()}},
syncLoad{loadSeq: 2, expectedResult: types.AsyncLoadResult{Local: false, Data: testTree.LeafAlphaBlock.RawData()}},
syncLoad{loadSeq: 3, expectedResult: types.AsyncLoadResult{Local: true, Data: testTree.LeafAlphaBlock.RawData()}},
syncLoad{loadSeq: 4, expectedResult: types.AsyncLoadResult{Local: false, Data: testTree.LeafBetaBlock.RawData()}},
syncLoad{loadSeq: 5, expectedResult: types.AsyncLoadResult{Local: true, Data: testTree.LeafAlphaBlock.RawData()}},
syncLoad{loadSeq: 6, expectedResult: types.AsyncLoadResult{Local: false, Data: testTree.MiddleMapBlock.RawData()}},
syncLoad{loadSeq: 7, expectedResult: types.AsyncLoadResult{Local: true, Data: testTree.LeafAlphaBlock.RawData()}},
},
},
"remote duplicate not sent load from local even when present in message": {
root: testTree.RootBlock.Cid(),
baseStore: testTree.Storage,
presentRemoteBlocks: []blocks.Block{
testTree.RootBlock,
testTree.MiddleListBlock,
},
presentLocalBlocks: []blocks.Block{
testTree.MiddleListBlock,
},
remoteSeq: []message.GraphSyncLinkMetadatum{
{Link: testTree.RootBlock.Cid(), Action: graphsync.LinkActionPresent},
{Link: testTree.MiddleListBlock.Cid(), Action: graphsync.LinkActionDuplicateNotSent},
},
steps: []step{
goOnline{},
injest{metadataStart: 0, metadataEnd: 2},
syncLoad{loadSeq: 0, expectedResult: types.AsyncLoadResult{Local: false, Data: testTree.RootBlock.RawData()}},
syncLoad{loadSeq: 1, expectedResult: types.AsyncLoadResult{Local: true, Data: testTree.MiddleListBlock.RawData()}},
},
},

"remote missing branch finishes to end": {
root: testTree.RootBlock.Cid(),
baseStore: testTree.Storage,
Expand All @@ -351,6 +463,32 @@ func TestReconciledLoader(t *testing.T) {
syncLoad{loadSeq: 7, expectedResult: types.AsyncLoadResult{Local: false, Data: testTree.LeafAlphaBlock.RawData()}},
},
},

"remote skipping branch finishes to end": {
root: testTree.RootBlock.Cid(),
baseStore: testTree.Storage,
presentRemoteBlocks: []blocks.Block{
testTree.RootBlock,
testTree.MiddleMapBlock,
testTree.LeafAlphaBlock,
},
presentLocalBlocks: nil,
remoteSeq: []message.GraphSyncLinkMetadatum{
{Link: testTree.RootBlock.Cid(), Action: graphsync.LinkActionPresent},
// missing the whole list tree
{Link: testTree.MiddleListBlock.Cid(), Action: graphsync.LinkActionDuplicateDAGSkipped},
{Link: testTree.MiddleMapBlock.Cid(), Action: graphsync.LinkActionPresent},
{Link: testTree.LeafAlphaBlock.Cid(), Action: graphsync.LinkActionPresent},
},
steps: []step{
goOnline{},
injest{metadataStart: 0, metadataEnd: 4},
syncLoad{loadSeq: 0, expectedResult: types.AsyncLoadResult{Local: false, Data: testTree.RootBlock.RawData()}},
syncLoad{loadSeq: 1, expectedResult: types.AsyncLoadResult{Local: true, Err: graphsync.RemoteMissingBlockErr{Link: testTree.MiddleListNodeLnk, Path: datamodel.ParsePath("linkedList")}}},
syncLoad{loadSeq: 6, expectedResult: types.AsyncLoadResult{Local: false, Data: testTree.MiddleMapBlock.RawData()}},
syncLoad{loadSeq: 7, expectedResult: types.AsyncLoadResult{Local: false, Data: testTree.LeafAlphaBlock.RawData()}},
},
},
"remote missing branch with partial local": {
root: testTree.RootBlock.Cid(),
baseStore: testTree.Storage,
Expand Down

0 comments on commit c5ec428

Please sign in to comment.