diff --git a/ipldutil/traverser.go b/ipldutil/traverser.go index ff569309..96cbc577 100644 --- a/ipldutil/traverser.go +++ b/ipldutil/traverser.go @@ -166,7 +166,7 @@ func (t *traverser) checkState() { func (t *traverser) writeDone(err error) { select { case <-t.ctx.Done(): - case t.stateChan <- state{true, err, nil, ipld.LinkContext{}}: + case t.stateChan <- state{true, err, nil, ipld.LinkContext{Ctx: t.ctx}}: } } @@ -179,7 +179,7 @@ func (t *traverser) start() { } go func() { defer close(t.stopped) - ns, err := t.chooser(t.root, ipld.LinkContext{}) + ns, err := t.chooser(t.root, ipld.LinkContext{Ctx: t.ctx}) if err != nil { t.writeDone(err) return diff --git a/requestmanager/asyncloader/asyncloader.go b/requestmanager/asyncloader/asyncloader.go index 5869d7ac..4d0d8931 100644 --- a/requestmanager/asyncloader/asyncloader.go +++ b/requestmanager/asyncloader/asyncloader.go @@ -131,10 +131,10 @@ func (al *AsyncLoader) ProcessResponse(p peer.ID, responses map[graphsync.Reques // AsyncLoad asynchronously loads the given link for the given request ID. It returns a channel for data and a channel // for errors -- only one message will be sent over either. -func (al *AsyncLoader) AsyncLoad(p peer.ID, requestID graphsync.RequestID, link ipld.Link) <-chan types.AsyncLoadResult { +func (al *AsyncLoader) AsyncLoad(p peer.ID, requestID graphsync.RequestID, link ipld.Link, linkContext ipld.LinkContext) <-chan types.AsyncLoadResult { resultChan := make(chan types.AsyncLoadResult, 1) response := make(chan error, 1) - lr := loadattemptqueue.NewLoadRequest(p, requestID, link, resultChan) + lr := loadattemptqueue.NewLoadRequest(p, requestID, link, linkContext, resultChan) _ = al.sendSyncMessage(&loadRequestMessage{response, requestID, lr}, response) return resultChan } @@ -368,9 +368,9 @@ func setupAttemptQueue(lsys ipld.LinkSystem, allocator Allocator) (*responsecach unverifiedBlockStore := unverifiedblockstore.New(lsys.StorageWriteOpener) responseCache := responsecache.New(unverifiedBlockStore) - loadAttemptQueue := loadattemptqueue.New(func(p peer.ID, requestID graphsync.RequestID, link ipld.Link) types.AsyncLoadResult { + loadAttemptQueue := loadattemptqueue.New(func(p peer.ID, requestID graphsync.RequestID, link ipld.Link, linkContext ipld.LinkContext) types.AsyncLoadResult { // load from response cache - data, err := responseCache.AttemptLoad(requestID, link) + data, err := responseCache.AttemptLoad(requestID, link, linkContext) if err != nil { return types.AsyncLoadResult{Err: err, Local: false} } @@ -382,7 +382,7 @@ func setupAttemptQueue(lsys ipld.LinkSystem, allocator Allocator) (*responsecach return types.AsyncLoadResult{Data: data, Local: false} } // fall back to local store - if stream, err := lsys.StorageReadOpener(ipld.LinkContext{}, link); stream != nil && err == nil { + if stream, err := lsys.StorageReadOpener(linkContext, link); stream != nil && err == nil { if localData, err := ioutil.ReadAll(stream); err == nil && localData != nil { return types.AsyncLoadResult{Data: localData, Local: true} } diff --git a/requestmanager/asyncloader/asyncloader_test.go b/requestmanager/asyncloader/asyncloader_test.go index 57a7db41..b02eb23c 100644 --- a/requestmanager/asyncloader/asyncloader_test.go +++ b/requestmanager/asyncloader/asyncloader_test.go @@ -26,7 +26,7 @@ func TestAsyncLoadInitialLoadSucceedsLocallyPresent(t *testing.T) { withLoader(st, func(ctx context.Context, asyncLoader *AsyncLoader) { requestID := graphsync.RequestID(rand.Int31()) p := testutil.GeneratePeers(1)[0] - resultChan := asyncLoader.AsyncLoad(p, requestID, link) + resultChan := asyncLoader.AsyncLoad(p, requestID, link, ipld.LinkContext{}) assertSuccessResponse(ctx, t, resultChan) st.AssertLocalLoads(t, 1) }) @@ -41,7 +41,7 @@ func TestAsyncLoadInitialLoadSucceedsResponsePresent(t *testing.T) { withLoader(st, func(ctx context.Context, asyncLoader *AsyncLoader) { requestID := graphsync.RequestID(rand.Int31()) responses := map[graphsync.RequestID]metadata.Metadata{ - requestID: metadata.Metadata{ + requestID: { metadata.Item{ Link: link.Cid, BlockPresent: true, @@ -50,7 +50,7 @@ func TestAsyncLoadInitialLoadSucceedsResponsePresent(t *testing.T) { } p := testutil.GeneratePeers(1)[0] asyncLoader.ProcessResponse(p, responses, blocks) - resultChan := asyncLoader.AsyncLoad(p, requestID, link) + resultChan := asyncLoader.AsyncLoad(p, requestID, link, ipld.LinkContext{}) assertSuccessResponse(ctx, t, resultChan) st.AssertLocalLoads(t, 0) @@ -65,7 +65,7 @@ func TestAsyncLoadInitialLoadFails(t *testing.T) { requestID := graphsync.RequestID(rand.Int31()) responses := map[graphsync.RequestID]metadata.Metadata{ - requestID: metadata.Metadata{ + requestID: { metadata.Item{ Link: link.(cidlink.Link).Cid, BlockPresent: false, @@ -75,7 +75,7 @@ func TestAsyncLoadInitialLoadFails(t *testing.T) { p := testutil.GeneratePeers(1)[0] asyncLoader.ProcessResponse(p, responses, nil) - resultChan := asyncLoader.AsyncLoad(p, requestID, link) + resultChan := asyncLoader.AsyncLoad(p, requestID, link, ipld.LinkContext{}) assertFailResponse(ctx, t, resultChan) st.AssertLocalLoads(t, 0) }) @@ -87,7 +87,7 @@ func TestAsyncLoadInitialLoadIndeterminateWhenRequestNotInProgress(t *testing.T) link := testutil.NewTestLink() requestID := graphsync.RequestID(rand.Int31()) p := testutil.GeneratePeers(1)[0] - resultChan := asyncLoader.AsyncLoad(p, requestID, link) + resultChan := asyncLoader.AsyncLoad(p, requestID, link, ipld.LinkContext{}) assertFailResponse(ctx, t, resultChan) st.AssertLocalLoads(t, 1) }) @@ -105,12 +105,12 @@ func TestAsyncLoadInitialLoadIndeterminateThenSucceeds(t *testing.T) { err := asyncLoader.StartRequest(requestID, "") require.NoError(t, err) p := testutil.GeneratePeers(1)[0] - resultChan := asyncLoader.AsyncLoad(p, requestID, link) + resultChan := asyncLoader.AsyncLoad(p, requestID, link, ipld.LinkContext{}) st.AssertAttemptLoadWithoutResult(ctx, t, resultChan) responses := map[graphsync.RequestID]metadata.Metadata{ - requestID: metadata.Metadata{ + requestID: { metadata.Item{ Link: link.Cid, BlockPresent: true, @@ -133,12 +133,12 @@ func TestAsyncLoadInitialLoadIndeterminateThenFails(t *testing.T) { err := asyncLoader.StartRequest(requestID, "") require.NoError(t, err) p := testutil.GeneratePeers(1)[0] - resultChan := asyncLoader.AsyncLoad(p, requestID, link) + resultChan := asyncLoader.AsyncLoad(p, requestID, link, ipld.LinkContext{}) st.AssertAttemptLoadWithoutResult(ctx, t, resultChan) responses := map[graphsync.RequestID]metadata.Metadata{ - requestID: metadata.Metadata{ + requestID: { metadata.Item{ Link: link.(cidlink.Link).Cid, BlockPresent: false, @@ -159,7 +159,7 @@ func TestAsyncLoadInitialLoadIndeterminateThenRequestFinishes(t *testing.T) { err := asyncLoader.StartRequest(requestID, "") require.NoError(t, err) p := testutil.GeneratePeers(1)[0] - resultChan := asyncLoader.AsyncLoad(p, requestID, link) + resultChan := asyncLoader.AsyncLoad(p, requestID, link, ipld.LinkContext{}) st.AssertAttemptLoadWithoutResult(ctx, t, resultChan) asyncLoader.CompleteResponsesFor(requestID) assertFailResponse(ctx, t, resultChan) @@ -175,7 +175,7 @@ func TestAsyncLoadTwiceLoadsLocallySecondTime(t *testing.T) { withLoader(st, func(ctx context.Context, asyncLoader *AsyncLoader) { requestID := graphsync.RequestID(rand.Int31()) responses := map[graphsync.RequestID]metadata.Metadata{ - requestID: metadata.Metadata{ + requestID: { metadata.Item{ Link: link.Cid, BlockPresent: true, @@ -184,12 +184,12 @@ func TestAsyncLoadTwiceLoadsLocallySecondTime(t *testing.T) { } p := testutil.GeneratePeers(1)[0] asyncLoader.ProcessResponse(p, responses, blocks) - resultChan := asyncLoader.AsyncLoad(p, requestID, link) + resultChan := asyncLoader.AsyncLoad(p, requestID, link, ipld.LinkContext{}) assertSuccessResponse(ctx, t, resultChan) st.AssertLocalLoads(t, 0) - resultChan = asyncLoader.AsyncLoad(p, requestID, link) + resultChan = asyncLoader.AsyncLoad(p, requestID, link, ipld.LinkContext{}) assertSuccessResponse(ctx, t, resultChan) st.AssertLocalLoads(t, 1) @@ -214,7 +214,7 @@ func TestRegisterUnregister(t *testing.T) { err = asyncLoader.StartRequest(requestID2, "other") require.NoError(t, err) p := testutil.GeneratePeers(1)[0] - resultChan1 := asyncLoader.AsyncLoad(p, requestID2, link1) + resultChan1 := asyncLoader.AsyncLoad(p, requestID2, link1, ipld.LinkContext{}) assertSuccessResponse(ctx, t, resultChan1) err = asyncLoader.UnregisterPersistenceOption("other") require.EqualError(t, err, "cannot unregister while requests are in progress") @@ -239,11 +239,11 @@ func TestRequestSplittingLoadLocallyFromBlockstore(t *testing.T) { requestID1 := graphsync.RequestID(rand.Int31()) p := testutil.GeneratePeers(1)[0] - resultChan1 := asyncLoader.AsyncLoad(p, requestID1, link) + resultChan1 := asyncLoader.AsyncLoad(p, requestID1, link, ipld.LinkContext{}) requestID2 := graphsync.RequestID(rand.Int31()) err = asyncLoader.StartRequest(requestID2, "other") require.NoError(t, err) - resultChan2 := asyncLoader.AsyncLoad(p, requestID2, link) + resultChan2 := asyncLoader.AsyncLoad(p, requestID2, link, ipld.LinkContext{}) assertFailResponse(ctx, t, resultChan1) assertSuccessResponse(ctx, t, resultChan2) @@ -267,16 +267,16 @@ func TestRequestSplittingSameBlockTwoStores(t *testing.T) { err = asyncLoader.StartRequest(requestID2, "other") require.NoError(t, err) p := testutil.GeneratePeers(1)[0] - resultChan1 := asyncLoader.AsyncLoad(p, requestID1, link) - resultChan2 := asyncLoader.AsyncLoad(p, requestID2, link) + resultChan1 := asyncLoader.AsyncLoad(p, requestID1, link, ipld.LinkContext{}) + resultChan2 := asyncLoader.AsyncLoad(p, requestID2, link, ipld.LinkContext{}) responses := map[graphsync.RequestID]metadata.Metadata{ - requestID1: metadata.Metadata{ + requestID1: { metadata.Item{ Link: link.Cid, BlockPresent: true, }, }, - requestID2: metadata.Metadata{ + requestID2: { metadata.Item{ Link: link.Cid, BlockPresent: true, @@ -308,10 +308,10 @@ func TestRequestSplittingSameBlockOnlyOneResponse(t *testing.T) { err = asyncLoader.StartRequest(requestID2, "other") require.NoError(t, err) p := testutil.GeneratePeers(1)[0] - resultChan1 := asyncLoader.AsyncLoad(p, requestID1, link) - resultChan2 := asyncLoader.AsyncLoad(p, requestID2, link) + resultChan1 := asyncLoader.AsyncLoad(p, requestID1, link, ipld.LinkContext{}) + resultChan2 := asyncLoader.AsyncLoad(p, requestID2, link, ipld.LinkContext{}) responses := map[graphsync.RequestID]metadata.Metadata{ - requestID2: metadata.Metadata{ + requestID2: { metadata.Item{ Link: link.Cid, BlockPresent: true, diff --git a/requestmanager/asyncloader/loadattemptqueue/loadattemptqueue.go b/requestmanager/asyncloader/loadattemptqueue/loadattemptqueue.go index c6c318b5..618c889d 100644 --- a/requestmanager/asyncloader/loadattemptqueue/loadattemptqueue.go +++ b/requestmanager/asyncloader/loadattemptqueue/loadattemptqueue.go @@ -13,10 +13,11 @@ import ( // LoadRequest is a request to load the given link for the given request id, // with results returned to the given channel type LoadRequest struct { - p peer.ID - requestID graphsync.RequestID - link ipld.Link - resultChan chan types.AsyncLoadResult + p peer.ID + requestID graphsync.RequestID + link ipld.Link + linkContext ipld.LinkContext + resultChan chan types.AsyncLoadResult } // NewLoadRequest returns a new LoadRequest for the given request id, link, @@ -25,13 +26,14 @@ func NewLoadRequest( p peer.ID, requestID graphsync.RequestID, link ipld.Link, + linkContext ipld.LinkContext, resultChan chan types.AsyncLoadResult) LoadRequest { - return LoadRequest{p, requestID, link, resultChan} + return LoadRequest{p, requestID, link, linkContext, resultChan} } // LoadAttempter attempts to load a link to an array of bytes // and returns an async load result -type LoadAttempter func(peer.ID, graphsync.RequestID, ipld.Link) types.AsyncLoadResult +type LoadAttempter func(peer.ID, graphsync.RequestID, ipld.Link, ipld.LinkContext) types.AsyncLoadResult // LoadAttemptQueue attempts to load using the load attempter, and then can // place requests on a retry queue @@ -50,7 +52,7 @@ func New(loadAttempter LoadAttempter) *LoadAttemptQueue { // AttemptLoad attempts to loads the given load request, and if retry is true // it saves the loadrequest for retrying later func (laq *LoadAttemptQueue) AttemptLoad(lr LoadRequest, retry bool) { - response := laq.loadAttempter(lr.p, lr.requestID, lr.link) + response := laq.loadAttempter(lr.p, lr.requestID, lr.link, lr.linkContext) if response.Err != nil || response.Data != nil { lr.resultChan <- response close(lr.resultChan) diff --git a/requestmanager/asyncloader/loadattemptqueue/loadattemptqueue_test.go b/requestmanager/asyncloader/loadattemptqueue/loadattemptqueue_test.go index ccf076ea..ae992711 100644 --- a/requestmanager/asyncloader/loadattemptqueue/loadattemptqueue_test.go +++ b/requestmanager/asyncloader/loadattemptqueue/loadattemptqueue_test.go @@ -21,7 +21,7 @@ func TestAsyncLoadInitialLoadSucceeds(t *testing.T) { ctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() callCount := 0 - loadAttempter := func(peer.ID, graphsync.RequestID, ipld.Link) types.AsyncLoadResult { + loadAttempter := func(peer.ID, graphsync.RequestID, ipld.Link, ipld.LinkContext) types.AsyncLoadResult { callCount++ return types.AsyncLoadResult{ Data: testutil.RandomBytes(100), @@ -30,11 +30,12 @@ func TestAsyncLoadInitialLoadSucceeds(t *testing.T) { loadAttemptQueue := New(loadAttempter) link := testutil.NewTestLink() + linkContext := ipld.LinkContext{} requestID := graphsync.RequestID(rand.Int31()) p := testutil.GeneratePeers(1)[0] resultChan := make(chan types.AsyncLoadResult, 1) - lr := NewLoadRequest(p, requestID, link, resultChan) + lr := NewLoadRequest(p, requestID, link, linkContext, resultChan) loadAttemptQueue.AttemptLoad(lr, false) var result types.AsyncLoadResult @@ -50,7 +51,7 @@ func TestAsyncLoadInitialLoadFails(t *testing.T) { ctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() callCount := 0 - loadAttempter := func(peer.ID, graphsync.RequestID, ipld.Link) types.AsyncLoadResult { + loadAttempter := func(peer.ID, graphsync.RequestID, ipld.Link, ipld.LinkContext) types.AsyncLoadResult { callCount++ return types.AsyncLoadResult{ Err: fmt.Errorf("something went wrong"), @@ -59,11 +60,12 @@ func TestAsyncLoadInitialLoadFails(t *testing.T) { loadAttemptQueue := New(loadAttempter) link := testutil.NewTestLink() + linkContext := ipld.LinkContext{} requestID := graphsync.RequestID(rand.Int31()) resultChan := make(chan types.AsyncLoadResult, 1) p := testutil.GeneratePeers(1)[0] - lr := NewLoadRequest(p, requestID, link, resultChan) + lr := NewLoadRequest(p, requestID, link, linkContext, resultChan) loadAttemptQueue.AttemptLoad(lr, false) var result types.AsyncLoadResult @@ -78,7 +80,7 @@ func TestAsyncLoadInitialLoadIndeterminateRetryFalse(t *testing.T) { ctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() callCount := 0 - loadAttempter := func(peer.ID, graphsync.RequestID, ipld.Link) types.AsyncLoadResult { + loadAttempter := func(peer.ID, graphsync.RequestID, ipld.Link, ipld.LinkContext) types.AsyncLoadResult { var result []byte if callCount > 0 { result = testutil.RandomBytes(100) @@ -92,11 +94,12 @@ func TestAsyncLoadInitialLoadIndeterminateRetryFalse(t *testing.T) { loadAttemptQueue := New(loadAttempter) link := testutil.NewTestLink() + linkContext := ipld.LinkContext{} requestID := graphsync.RequestID(rand.Int31()) p := testutil.GeneratePeers(1)[0] resultChan := make(chan types.AsyncLoadResult, 1) - lr := NewLoadRequest(p, requestID, link, resultChan) + lr := NewLoadRequest(p, requestID, link, linkContext, resultChan) loadAttemptQueue.AttemptLoad(lr, false) var result types.AsyncLoadResult @@ -112,7 +115,7 @@ func TestAsyncLoadInitialLoadIndeterminateRetryTrueThenRetriedSuccess(t *testing defer cancel() callCount := 0 called := make(chan struct{}, 2) - loadAttempter := func(peer.ID, graphsync.RequestID, ipld.Link) types.AsyncLoadResult { + loadAttempter := func(peer.ID, graphsync.RequestID, ipld.Link, ipld.LinkContext) types.AsyncLoadResult { var result []byte called <- struct{}{} if callCount > 0 { @@ -126,10 +129,11 @@ func TestAsyncLoadInitialLoadIndeterminateRetryTrueThenRetriedSuccess(t *testing loadAttemptQueue := New(loadAttempter) link := testutil.NewTestLink() + linkContext := ipld.LinkContext{} requestID := graphsync.RequestID(rand.Int31()) resultChan := make(chan types.AsyncLoadResult, 1) p := testutil.GeneratePeers(1)[0] - lr := NewLoadRequest(p, requestID, link, resultChan) + lr := NewLoadRequest(p, requestID, link, linkContext, resultChan) loadAttemptQueue.AttemptLoad(lr, true) testutil.AssertDoesReceiveFirst(t, called, "should attempt load with no result", resultChan, ctx.Done()) @@ -148,7 +152,7 @@ func TestAsyncLoadInitialLoadIndeterminateThenRequestFinishes(t *testing.T) { defer cancel() callCount := 0 called := make(chan struct{}, 2) - loadAttempter := func(peer.ID, graphsync.RequestID, ipld.Link) types.AsyncLoadResult { + loadAttempter := func(peer.ID, graphsync.RequestID, ipld.Link, ipld.LinkContext) types.AsyncLoadResult { var result []byte called <- struct{}{} if callCount > 0 { @@ -162,10 +166,11 @@ func TestAsyncLoadInitialLoadIndeterminateThenRequestFinishes(t *testing.T) { loadAttemptQueue := New(loadAttempter) link := testutil.NewTestLink() + linkContext := ipld.LinkContext{} requestID := graphsync.RequestID(rand.Int31()) resultChan := make(chan types.AsyncLoadResult, 1) p := testutil.GeneratePeers(1)[0] - lr := NewLoadRequest(p, requestID, link, resultChan) + lr := NewLoadRequest(p, requestID, link, linkContext, resultChan) loadAttemptQueue.AttemptLoad(lr, true) testutil.AssertDoesReceiveFirst(t, called, "should attempt load with no result", resultChan, ctx.Done()) diff --git a/requestmanager/asyncloader/responsecache/responsecache.go b/requestmanager/asyncloader/responsecache/responsecache.go index cbbba358..94ecfd31 100644 --- a/requestmanager/asyncloader/responsecache/responsecache.go +++ b/requestmanager/asyncloader/responsecache/responsecache.go @@ -21,7 +21,7 @@ var log = logging.Logger("graphsync") type UnverifiedBlockStore interface { PruneBlocks(func(ipld.Link) bool) PruneBlock(ipld.Link) - VerifyBlock(ipld.Link) ([]byte, error) + VerifyBlock(ipld.Link, ipld.LinkContext) ([]byte, error) AddUnverifiedBlock(ipld.Link, []byte) } @@ -55,13 +55,13 @@ func (rc *ResponseCache) FinishRequest(requestID graphsync.RequestID) { } // AttemptLoad attempts to laod the given block from the cache -func (rc *ResponseCache) AttemptLoad(requestID graphsync.RequestID, link ipld.Link) ([]byte, error) { +func (rc *ResponseCache) AttemptLoad(requestID graphsync.RequestID, link ipld.Link, linkContext ipld.LinkContext) ([]byte, error) { rc.responseCacheLk.Lock() defer rc.responseCacheLk.Unlock() if rc.linkTracker.IsKnownMissingLink(requestID, link) { return nil, fmt.Errorf("remote peer is missing block: %s", link.String()) } - data, _ := rc.unverifiedBlockStore.VerifyBlock(link) + data, _ := rc.unverifiedBlockStore.VerifyBlock(link, linkContext) return data, nil } diff --git a/requestmanager/asyncloader/responsecache/responsecache_test.go b/requestmanager/asyncloader/responsecache/responsecache_test.go index 7702502c..dad3fc31 100644 --- a/requestmanager/asyncloader/responsecache/responsecache_test.go +++ b/requestmanager/asyncloader/responsecache/responsecache_test.go @@ -35,7 +35,7 @@ func (ubs *fakeUnverifiedBlockStore) PruneBlock(link ipld.Link) { delete(ubs.inMemoryBlocks, link) } -func (ubs *fakeUnverifiedBlockStore) VerifyBlock(lnk ipld.Link) ([]byte, error) { +func (ubs *fakeUnverifiedBlockStore) VerifyBlock(lnk ipld.Link, linkCtx ipld.LinkContext) ([]byte, error) { data, ok := ubs.inMemoryBlocks[lnk] if !ok { return nil, fmt.Errorf("Block not found") @@ -105,8 +105,9 @@ func TestResponseCacheManagingLinks(t *testing.T) { require.Len(t, fubs.blocks(), len(blks)-1, "should prune block with no references") testutil.RefuteContainsBlock(t, fubs.blocks(), blks[2]) + lnkCtx := ipld.LinkContext{} // should load block from unverified block store - data, err := responseCache.AttemptLoad(requestID2, cidlink.Link{Cid: blks[4].Cid()}) + data, err := responseCache.AttemptLoad(requestID2, cidlink.Link{Cid: blks[4].Cid()}, lnkCtx) require.NoError(t, err) require.Equal(t, blks[4].RawData(), data, "did not load correct block") @@ -115,12 +116,12 @@ func TestResponseCacheManagingLinks(t *testing.T) { testutil.RefuteContainsBlock(t, fubs.blocks(), blks[4]) // fails as it is a known missing block - data, err = responseCache.AttemptLoad(requestID1, cidlink.Link{Cid: blks[1].Cid()}) + data, err = responseCache.AttemptLoad(requestID1, cidlink.Link{Cid: blks[1].Cid()}, lnkCtx) require.Error(t, err) require.Nil(t, data, "no data should be returned for missing block") // should succeed for request 2 where it's not a missing block - data, err = responseCache.AttemptLoad(requestID2, cidlink.Link{Cid: blks[1].Cid()}) + data, err = responseCache.AttemptLoad(requestID2, cidlink.Link{Cid: blks[1].Cid()}, lnkCtx) require.NoError(t, err) require.Equal(t, blks[1].RawData(), data) @@ -129,7 +130,7 @@ func TestResponseCacheManagingLinks(t *testing.T) { testutil.RefuteContainsBlock(t, fubs.blocks(), blks[1]) // should be unknown result as block is not known missing or present in block store - data, err = responseCache.AttemptLoad(requestID1, cidlink.Link{Cid: blks[2].Cid()}) + data, err = responseCache.AttemptLoad(requestID1, cidlink.Link{Cid: blks[2].Cid()}, lnkCtx) require.NoError(t, err) require.Nil(t, data, "no data should be returned for unknown block") diff --git a/requestmanager/asyncloader/unverifiedblockstore/unverifiedblockstore.go b/requestmanager/asyncloader/unverifiedblockstore/unverifiedblockstore.go index 2b39fed0..3e600be0 100644 --- a/requestmanager/asyncloader/unverifiedblockstore/unverifiedblockstore.go +++ b/requestmanager/asyncloader/unverifiedblockstore/unverifiedblockstore.go @@ -59,7 +59,7 @@ func (ubs *UnverifiedBlockStore) PruneBlock(link ipld.Link) { // VerifyBlock verifies the data for the given link as being part of a traversal, // removes it from the unverified store, and writes it to permaneant storage. -func (ubs *UnverifiedBlockStore) VerifyBlock(lnk ipld.Link) ([]byte, error) { +func (ubs *UnverifiedBlockStore) VerifyBlock(lnk ipld.Link, linkContext ipld.LinkContext) ([]byte, error) { data, ok := ubs.inMemoryBlocks[lnk] if !ok { return nil, fmt.Errorf("block not found") @@ -68,7 +68,7 @@ func (ubs *UnverifiedBlockStore) VerifyBlock(lnk ipld.Link) ([]byte, error) { ubs.dataSize = ubs.dataSize - uint64(len(data)) log.Debugw("verified block", "total_queued_bytes", ubs.dataSize) - buffer, committer, err := ubs.storer(ipld.LinkContext{}) + buffer, committer, err := ubs.storer(linkContext) if err != nil { return nil, err } diff --git a/requestmanager/asyncloader/unverifiedblockstore/unverifiedblockstore_test.go b/requestmanager/asyncloader/unverifiedblockstore/unverifiedblockstore_test.go index 5afb21fd..35dc4d4d 100644 --- a/requestmanager/asyncloader/unverifiedblockstore/unverifiedblockstore_test.go +++ b/requestmanager/asyncloader/unverifiedblockstore/unverifiedblockstore_test.go @@ -21,7 +21,7 @@ func TestVerifyBlockPresent(t *testing.T) { require.Nil(t, reader) require.Error(t, err, "block should not be loadable till it's verified and stored") - data, err := unverifiedBlockStore.VerifyBlock(cidlink.Link{Cid: block.Cid()}) + data, err := unverifiedBlockStore.VerifyBlock(cidlink.Link{Cid: block.Cid()}, ipld.LinkContext{}) require.Nil(t, data) require.Error(t, err, "block should not be verifiable till it's added as an unverifiable block") @@ -30,7 +30,7 @@ func TestVerifyBlockPresent(t *testing.T) { require.Nil(t, reader) require.Error(t, err, "block should not be loadable till it's verified") - data, err = unverifiedBlockStore.VerifyBlock(cidlink.Link{Cid: block.Cid()}) + data, err = unverifiedBlockStore.VerifyBlock(cidlink.Link{Cid: block.Cid()}, ipld.LinkContext{}) require.NoError(t, err) require.Equal(t, block.RawData(), data, "block should be returned on verification if added") @@ -40,7 +40,7 @@ func TestVerifyBlockPresent(t *testing.T) { _, err = io.Copy(&buffer, reader) require.NoError(t, err) require.Equal(t, block.RawData(), buffer.Bytes(), "block should be stored and loadable after verification") - data, err = unverifiedBlockStore.VerifyBlock(cidlink.Link{Cid: block.Cid()}) + data, err = unverifiedBlockStore.VerifyBlock(cidlink.Link{Cid: block.Cid()}, ipld.LinkContext{}) require.Nil(t, data) require.Error(t, err, "block cannot be verified twice") } diff --git a/requestmanager/executor/executor.go b/requestmanager/executor/executor.go index 99851f6d..d79291e0 100644 --- a/requestmanager/executor/executor.go +++ b/requestmanager/executor/executor.go @@ -22,7 +22,7 @@ import ( // AsyncLoadFn is a function which given a request id and an ipld.Link, returns // a channel which will eventually return data for the link or an err -type AsyncLoadFn func(peer.ID, graphsync.RequestID, ipld.Link) <-chan types.AsyncLoadResult +type AsyncLoadFn func(peer.ID, graphsync.RequestID, ipld.Link, ipld.LinkContext) <-chan types.AsyncLoadResult // ExecutionEnv are request parameters that last between requests type ExecutionEnv struct { @@ -112,8 +112,8 @@ func (re *requestExecutor) traverse() error { if isComplete { return err } - lnk, _ := traverser.CurrentRequest() - resultChan := re.env.Loader(re.p, re.request.ID(), lnk) + lnk, linkContext := traverser.CurrentRequest() + resultChan := re.env.Loader(re.p, re.request.ID(), lnk, linkContext) var result types.AsyncLoadResult select { case result = <-resultChan: diff --git a/requestmanager/requestmanager.go b/requestmanager/requestmanager.go index bf387cd1..ec17a4a1 100644 --- a/requestmanager/requestmanager.go +++ b/requestmanager/requestmanager.go @@ -61,7 +61,7 @@ type AsyncLoader interface { StartRequest(graphsync.RequestID, string) error ProcessResponse(p peer.ID, responses map[graphsync.RequestID]metadata.Metadata, blks []blocks.Block) - AsyncLoad(p peer.ID, requestID graphsync.RequestID, link ipld.Link) <-chan types.AsyncLoadResult + AsyncLoad(p peer.ID, requestID graphsync.RequestID, link ipld.Link, linkContext ipld.LinkContext) <-chan types.AsyncLoadResult CompleteResponsesFor(requestID graphsync.RequestID) CleanupRequest(requestID graphsync.RequestID) } diff --git a/requestmanager/testloader/asyncloader.go b/requestmanager/testloader/asyncloader.go index d2ff4a65..7f07fa3e 100644 --- a/requestmanager/testloader/asyncloader.go +++ b/requestmanager/testloader/asyncloader.go @@ -102,7 +102,7 @@ func (fal *FakeAsyncLoader) VerifyStoreUsed(t *testing.T, requestID graphsync.Re fal.storesRequestedLk.RUnlock() } -func (fal *FakeAsyncLoader) asyncLoad(p peer.ID, requestID graphsync.RequestID, link ipld.Link) chan types.AsyncLoadResult { +func (fal *FakeAsyncLoader) asyncLoad(p peer.ID, requestID graphsync.RequestID, link ipld.Link, linkContext ipld.LinkContext) chan types.AsyncLoadResult { fal.responseChannelsLk.Lock() responseChannel, ok := fal.responseChannels[requestKey{p, requestID, link}] if !ok { @@ -119,8 +119,8 @@ func (fal *FakeAsyncLoader) OnAsyncLoad(cb func(graphsync.RequestID, ipld.Link, } // AsyncLoad simulates an asynchronous load with responses stubbed by ResponseOn & SuccessResponseOn -func (fal *FakeAsyncLoader) AsyncLoad(p peer.ID, requestID graphsync.RequestID, link ipld.Link) <-chan types.AsyncLoadResult { - res := fal.asyncLoad(p, requestID, link) +func (fal *FakeAsyncLoader) AsyncLoad(p peer.ID, requestID graphsync.RequestID, link ipld.Link, linkContext ipld.LinkContext) <-chan types.AsyncLoadResult { + res := fal.asyncLoad(p, requestID, link, linkContext) if fal.cb != nil { fal.cb(requestID, link, res) } @@ -146,7 +146,7 @@ func (fal *FakeAsyncLoader) CleanupRequest(requestID graphsync.RequestID) { // "asynchronous" load, this can be called AFTER the attempt to load this link -- and the client will only get // the response at that point func (fal *FakeAsyncLoader) ResponseOn(p peer.ID, requestID graphsync.RequestID, link ipld.Link, result types.AsyncLoadResult) { - responseChannel := fal.asyncLoad(p, requestID, link) + responseChannel := fal.asyncLoad(p, requestID, link, ipld.LinkContext{}) responseChannel <- result close(responseChannel) }