Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make sure linkcontext is passed #207

Merged
merged 1 commit into from
Aug 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions ipldutil/traverser.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func (t *traverser) checkState() {
func (t *traverser) writeDone(err error) {
select {
case <-t.ctx.Done():
case t.stateChan <- state{true, err, nil, ipld.LinkContext{}}:
case t.stateChan <- state{true, err, nil, ipld.LinkContext{Ctx: t.ctx}}:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to double check that I am not missing anything. Here we are instantiating a LinkContext exclusively from the process context because there is no parent LinkContext to propagate, right? I ask because building a LinkContext directly from the process context may not propagate certain values in the context. But as this is done at the beginning and the end of the traversal I think it should be fine.

}
}

Expand All @@ -179,7 +179,7 @@ func (t *traverser) start() {
}
go func() {
defer close(t.stopped)
ns, err := t.chooser(t.root, ipld.LinkContext{})
ns, err := t.chooser(t.root, ipld.LinkContext{Ctx: t.ctx})
if err != nil {
t.writeDone(err)
return
Expand Down
10 changes: 5 additions & 5 deletions requestmanager/asyncloader/asyncloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,10 @@ func (al *AsyncLoader) ProcessResponse(p peer.ID, responses map[graphsync.Reques

// AsyncLoad asynchronously loads the given link for the given request ID. It returns a channel for data and a channel
// for errors -- only one message will be sent over either.
func (al *AsyncLoader) AsyncLoad(p peer.ID, requestID graphsync.RequestID, link ipld.Link) <-chan types.AsyncLoadResult {
func (al *AsyncLoader) AsyncLoad(p peer.ID, requestID graphsync.RequestID, link ipld.Link, linkContext ipld.LinkContext) <-chan types.AsyncLoadResult {
resultChan := make(chan types.AsyncLoadResult, 1)
response := make(chan error, 1)
lr := loadattemptqueue.NewLoadRequest(p, requestID, link, resultChan)
lr := loadattemptqueue.NewLoadRequest(p, requestID, link, linkContext, resultChan)
_ = al.sendSyncMessage(&loadRequestMessage{response, requestID, lr}, response)
return resultChan
}
Expand Down Expand Up @@ -368,9 +368,9 @@ func setupAttemptQueue(lsys ipld.LinkSystem, allocator Allocator) (*responsecach

unverifiedBlockStore := unverifiedblockstore.New(lsys.StorageWriteOpener)
responseCache := responsecache.New(unverifiedBlockStore)
loadAttemptQueue := loadattemptqueue.New(func(p peer.ID, requestID graphsync.RequestID, link ipld.Link) types.AsyncLoadResult {
loadAttemptQueue := loadattemptqueue.New(func(p peer.ID, requestID graphsync.RequestID, link ipld.Link, linkContext ipld.LinkContext) types.AsyncLoadResult {
// load from response cache
data, err := responseCache.AttemptLoad(requestID, link)
data, err := responseCache.AttemptLoad(requestID, link, linkContext)
if err != nil {
return types.AsyncLoadResult{Err: err, Local: false}
}
Expand All @@ -382,7 +382,7 @@ func setupAttemptQueue(lsys ipld.LinkSystem, allocator Allocator) (*responsecach
return types.AsyncLoadResult{Data: data, Local: false}
}
// fall back to local store
if stream, err := lsys.StorageReadOpener(ipld.LinkContext{}, link); stream != nil && err == nil {
if stream, err := lsys.StorageReadOpener(linkContext, link); stream != nil && err == nil {
if localData, err := ioutil.ReadAll(stream); err == nil && localData != nil {
return types.AsyncLoadResult{Data: localData, Local: true}
}
Expand Down
48 changes: 24 additions & 24 deletions requestmanager/asyncloader/asyncloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func TestAsyncLoadInitialLoadSucceedsLocallyPresent(t *testing.T) {
withLoader(st, func(ctx context.Context, asyncLoader *AsyncLoader) {
requestID := graphsync.RequestID(rand.Int31())
p := testutil.GeneratePeers(1)[0]
resultChan := asyncLoader.AsyncLoad(p, requestID, link)
resultChan := asyncLoader.AsyncLoad(p, requestID, link, ipld.LinkContext{})
assertSuccessResponse(ctx, t, resultChan)
st.AssertLocalLoads(t, 1)
})
Expand All @@ -41,7 +41,7 @@ func TestAsyncLoadInitialLoadSucceedsResponsePresent(t *testing.T) {
withLoader(st, func(ctx context.Context, asyncLoader *AsyncLoader) {
requestID := graphsync.RequestID(rand.Int31())
responses := map[graphsync.RequestID]metadata.Metadata{
requestID: metadata.Metadata{
requestID: {
metadata.Item{
Link: link.Cid,
BlockPresent: true,
Expand All @@ -50,7 +50,7 @@ func TestAsyncLoadInitialLoadSucceedsResponsePresent(t *testing.T) {
}
p := testutil.GeneratePeers(1)[0]
asyncLoader.ProcessResponse(p, responses, blocks)
resultChan := asyncLoader.AsyncLoad(p, requestID, link)
resultChan := asyncLoader.AsyncLoad(p, requestID, link, ipld.LinkContext{})

assertSuccessResponse(ctx, t, resultChan)
st.AssertLocalLoads(t, 0)
Expand All @@ -65,7 +65,7 @@ func TestAsyncLoadInitialLoadFails(t *testing.T) {
requestID := graphsync.RequestID(rand.Int31())

responses := map[graphsync.RequestID]metadata.Metadata{
requestID: metadata.Metadata{
requestID: {
metadata.Item{
Link: link.(cidlink.Link).Cid,
BlockPresent: false,
Expand All @@ -75,7 +75,7 @@ func TestAsyncLoadInitialLoadFails(t *testing.T) {
p := testutil.GeneratePeers(1)[0]
asyncLoader.ProcessResponse(p, responses, nil)

resultChan := asyncLoader.AsyncLoad(p, requestID, link)
resultChan := asyncLoader.AsyncLoad(p, requestID, link, ipld.LinkContext{})
assertFailResponse(ctx, t, resultChan)
st.AssertLocalLoads(t, 0)
})
Expand All @@ -87,7 +87,7 @@ func TestAsyncLoadInitialLoadIndeterminateWhenRequestNotInProgress(t *testing.T)
link := testutil.NewTestLink()
requestID := graphsync.RequestID(rand.Int31())
p := testutil.GeneratePeers(1)[0]
resultChan := asyncLoader.AsyncLoad(p, requestID, link)
resultChan := asyncLoader.AsyncLoad(p, requestID, link, ipld.LinkContext{})
assertFailResponse(ctx, t, resultChan)
st.AssertLocalLoads(t, 1)
})
Expand All @@ -105,12 +105,12 @@ func TestAsyncLoadInitialLoadIndeterminateThenSucceeds(t *testing.T) {
err := asyncLoader.StartRequest(requestID, "")
require.NoError(t, err)
p := testutil.GeneratePeers(1)[0]
resultChan := asyncLoader.AsyncLoad(p, requestID, link)
resultChan := asyncLoader.AsyncLoad(p, requestID, link, ipld.LinkContext{})

st.AssertAttemptLoadWithoutResult(ctx, t, resultChan)

responses := map[graphsync.RequestID]metadata.Metadata{
requestID: metadata.Metadata{
requestID: {
metadata.Item{
Link: link.Cid,
BlockPresent: true,
Expand All @@ -133,12 +133,12 @@ func TestAsyncLoadInitialLoadIndeterminateThenFails(t *testing.T) {
err := asyncLoader.StartRequest(requestID, "")
require.NoError(t, err)
p := testutil.GeneratePeers(1)[0]
resultChan := asyncLoader.AsyncLoad(p, requestID, link)
resultChan := asyncLoader.AsyncLoad(p, requestID, link, ipld.LinkContext{})

st.AssertAttemptLoadWithoutResult(ctx, t, resultChan)

responses := map[graphsync.RequestID]metadata.Metadata{
requestID: metadata.Metadata{
requestID: {
metadata.Item{
Link: link.(cidlink.Link).Cid,
BlockPresent: false,
Expand All @@ -159,7 +159,7 @@ func TestAsyncLoadInitialLoadIndeterminateThenRequestFinishes(t *testing.T) {
err := asyncLoader.StartRequest(requestID, "")
require.NoError(t, err)
p := testutil.GeneratePeers(1)[0]
resultChan := asyncLoader.AsyncLoad(p, requestID, link)
resultChan := asyncLoader.AsyncLoad(p, requestID, link, ipld.LinkContext{})
st.AssertAttemptLoadWithoutResult(ctx, t, resultChan)
asyncLoader.CompleteResponsesFor(requestID)
assertFailResponse(ctx, t, resultChan)
Expand All @@ -175,7 +175,7 @@ func TestAsyncLoadTwiceLoadsLocallySecondTime(t *testing.T) {
withLoader(st, func(ctx context.Context, asyncLoader *AsyncLoader) {
requestID := graphsync.RequestID(rand.Int31())
responses := map[graphsync.RequestID]metadata.Metadata{
requestID: metadata.Metadata{
requestID: {
metadata.Item{
Link: link.Cid,
BlockPresent: true,
Expand All @@ -184,12 +184,12 @@ func TestAsyncLoadTwiceLoadsLocallySecondTime(t *testing.T) {
}
p := testutil.GeneratePeers(1)[0]
asyncLoader.ProcessResponse(p, responses, blocks)
resultChan := asyncLoader.AsyncLoad(p, requestID, link)
resultChan := asyncLoader.AsyncLoad(p, requestID, link, ipld.LinkContext{})

assertSuccessResponse(ctx, t, resultChan)
st.AssertLocalLoads(t, 0)

resultChan = asyncLoader.AsyncLoad(p, requestID, link)
resultChan = asyncLoader.AsyncLoad(p, requestID, link, ipld.LinkContext{})
assertSuccessResponse(ctx, t, resultChan)
st.AssertLocalLoads(t, 1)

Expand All @@ -214,7 +214,7 @@ func TestRegisterUnregister(t *testing.T) {
err = asyncLoader.StartRequest(requestID2, "other")
require.NoError(t, err)
p := testutil.GeneratePeers(1)[0]
resultChan1 := asyncLoader.AsyncLoad(p, requestID2, link1)
resultChan1 := asyncLoader.AsyncLoad(p, requestID2, link1, ipld.LinkContext{})
assertSuccessResponse(ctx, t, resultChan1)
err = asyncLoader.UnregisterPersistenceOption("other")
require.EqualError(t, err, "cannot unregister while requests are in progress")
Expand All @@ -239,11 +239,11 @@ func TestRequestSplittingLoadLocallyFromBlockstore(t *testing.T) {
requestID1 := graphsync.RequestID(rand.Int31())
p := testutil.GeneratePeers(1)[0]

resultChan1 := asyncLoader.AsyncLoad(p, requestID1, link)
resultChan1 := asyncLoader.AsyncLoad(p, requestID1, link, ipld.LinkContext{})
requestID2 := graphsync.RequestID(rand.Int31())
err = asyncLoader.StartRequest(requestID2, "other")
require.NoError(t, err)
resultChan2 := asyncLoader.AsyncLoad(p, requestID2, link)
resultChan2 := asyncLoader.AsyncLoad(p, requestID2, link, ipld.LinkContext{})

assertFailResponse(ctx, t, resultChan1)
assertSuccessResponse(ctx, t, resultChan2)
Expand All @@ -267,16 +267,16 @@ func TestRequestSplittingSameBlockTwoStores(t *testing.T) {
err = asyncLoader.StartRequest(requestID2, "other")
require.NoError(t, err)
p := testutil.GeneratePeers(1)[0]
resultChan1 := asyncLoader.AsyncLoad(p, requestID1, link)
resultChan2 := asyncLoader.AsyncLoad(p, requestID2, link)
resultChan1 := asyncLoader.AsyncLoad(p, requestID1, link, ipld.LinkContext{})
resultChan2 := asyncLoader.AsyncLoad(p, requestID2, link, ipld.LinkContext{})
responses := map[graphsync.RequestID]metadata.Metadata{
requestID1: metadata.Metadata{
requestID1: {
metadata.Item{
Link: link.Cid,
BlockPresent: true,
},
},
requestID2: metadata.Metadata{
requestID2: {
metadata.Item{
Link: link.Cid,
BlockPresent: true,
Expand Down Expand Up @@ -308,10 +308,10 @@ func TestRequestSplittingSameBlockOnlyOneResponse(t *testing.T) {
err = asyncLoader.StartRequest(requestID2, "other")
require.NoError(t, err)
p := testutil.GeneratePeers(1)[0]
resultChan1 := asyncLoader.AsyncLoad(p, requestID1, link)
resultChan2 := asyncLoader.AsyncLoad(p, requestID2, link)
resultChan1 := asyncLoader.AsyncLoad(p, requestID1, link, ipld.LinkContext{})
resultChan2 := asyncLoader.AsyncLoad(p, requestID2, link, ipld.LinkContext{})
responses := map[graphsync.RequestID]metadata.Metadata{
requestID2: metadata.Metadata{
requestID2: {
metadata.Item{
Link: link.Cid,
BlockPresent: true,
Expand Down
16 changes: 9 additions & 7 deletions requestmanager/asyncloader/loadattemptqueue/loadattemptqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@ import (
// LoadRequest is a request to load the given link for the given request id,
// with results returned to the given channel
type LoadRequest struct {
p peer.ID
requestID graphsync.RequestID
link ipld.Link
resultChan chan types.AsyncLoadResult
p peer.ID
requestID graphsync.RequestID
link ipld.Link
linkContext ipld.LinkContext
resultChan chan types.AsyncLoadResult
}

// NewLoadRequest returns a new LoadRequest for the given request id, link,
Expand All @@ -25,13 +26,14 @@ func NewLoadRequest(
p peer.ID,
requestID graphsync.RequestID,
link ipld.Link,
linkContext ipld.LinkContext,
resultChan chan types.AsyncLoadResult) LoadRequest {
return LoadRequest{p, requestID, link, resultChan}
return LoadRequest{p, requestID, link, linkContext, resultChan}
}

// LoadAttempter attempts to load a link to an array of bytes
// and returns an async load result
type LoadAttempter func(peer.ID, graphsync.RequestID, ipld.Link) types.AsyncLoadResult
type LoadAttempter func(peer.ID, graphsync.RequestID, ipld.Link, ipld.LinkContext) types.AsyncLoadResult

// LoadAttemptQueue attempts to load using the load attempter, and then can
// place requests on a retry queue
Expand All @@ -50,7 +52,7 @@ func New(loadAttempter LoadAttempter) *LoadAttemptQueue {
// AttemptLoad attempts to loads the given load request, and if retry is true
// it saves the loadrequest for retrying later
func (laq *LoadAttemptQueue) AttemptLoad(lr LoadRequest, retry bool) {
response := laq.loadAttempter(lr.p, lr.requestID, lr.link)
response := laq.loadAttempter(lr.p, lr.requestID, lr.link, lr.linkContext)
if response.Err != nil || response.Data != nil {
lr.resultChan <- response
close(lr.resultChan)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func TestAsyncLoadInitialLoadSucceeds(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
callCount := 0
loadAttempter := func(peer.ID, graphsync.RequestID, ipld.Link) types.AsyncLoadResult {
loadAttempter := func(peer.ID, graphsync.RequestID, ipld.Link, ipld.LinkContext) types.AsyncLoadResult {
callCount++
return types.AsyncLoadResult{
Data: testutil.RandomBytes(100),
Expand All @@ -30,11 +30,12 @@ func TestAsyncLoadInitialLoadSucceeds(t *testing.T) {
loadAttemptQueue := New(loadAttempter)

link := testutil.NewTestLink()
linkContext := ipld.LinkContext{}
requestID := graphsync.RequestID(rand.Int31())
p := testutil.GeneratePeers(1)[0]

resultChan := make(chan types.AsyncLoadResult, 1)
lr := NewLoadRequest(p, requestID, link, resultChan)
lr := NewLoadRequest(p, requestID, link, linkContext, resultChan)
loadAttemptQueue.AttemptLoad(lr, false)

var result types.AsyncLoadResult
Expand All @@ -50,7 +51,7 @@ func TestAsyncLoadInitialLoadFails(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
callCount := 0
loadAttempter := func(peer.ID, graphsync.RequestID, ipld.Link) types.AsyncLoadResult {
loadAttempter := func(peer.ID, graphsync.RequestID, ipld.Link, ipld.LinkContext) types.AsyncLoadResult {
callCount++
return types.AsyncLoadResult{
Err: fmt.Errorf("something went wrong"),
Expand All @@ -59,11 +60,12 @@ func TestAsyncLoadInitialLoadFails(t *testing.T) {
loadAttemptQueue := New(loadAttempter)

link := testutil.NewTestLink()
linkContext := ipld.LinkContext{}
requestID := graphsync.RequestID(rand.Int31())
resultChan := make(chan types.AsyncLoadResult, 1)
p := testutil.GeneratePeers(1)[0]

lr := NewLoadRequest(p, requestID, link, resultChan)
lr := NewLoadRequest(p, requestID, link, linkContext, resultChan)
loadAttemptQueue.AttemptLoad(lr, false)

var result types.AsyncLoadResult
Expand All @@ -78,7 +80,7 @@ func TestAsyncLoadInitialLoadIndeterminateRetryFalse(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
callCount := 0
loadAttempter := func(peer.ID, graphsync.RequestID, ipld.Link) types.AsyncLoadResult {
loadAttempter := func(peer.ID, graphsync.RequestID, ipld.Link, ipld.LinkContext) types.AsyncLoadResult {
var result []byte
if callCount > 0 {
result = testutil.RandomBytes(100)
Expand All @@ -92,11 +94,12 @@ func TestAsyncLoadInitialLoadIndeterminateRetryFalse(t *testing.T) {
loadAttemptQueue := New(loadAttempter)

link := testutil.NewTestLink()
linkContext := ipld.LinkContext{}
requestID := graphsync.RequestID(rand.Int31())
p := testutil.GeneratePeers(1)[0]

resultChan := make(chan types.AsyncLoadResult, 1)
lr := NewLoadRequest(p, requestID, link, resultChan)
lr := NewLoadRequest(p, requestID, link, linkContext, resultChan)
loadAttemptQueue.AttemptLoad(lr, false)

var result types.AsyncLoadResult
Expand All @@ -112,7 +115,7 @@ func TestAsyncLoadInitialLoadIndeterminateRetryTrueThenRetriedSuccess(t *testing
defer cancel()
callCount := 0
called := make(chan struct{}, 2)
loadAttempter := func(peer.ID, graphsync.RequestID, ipld.Link) types.AsyncLoadResult {
loadAttempter := func(peer.ID, graphsync.RequestID, ipld.Link, ipld.LinkContext) types.AsyncLoadResult {
var result []byte
called <- struct{}{}
if callCount > 0 {
Expand All @@ -126,10 +129,11 @@ func TestAsyncLoadInitialLoadIndeterminateRetryTrueThenRetriedSuccess(t *testing
loadAttemptQueue := New(loadAttempter)

link := testutil.NewTestLink()
linkContext := ipld.LinkContext{}
requestID := graphsync.RequestID(rand.Int31())
resultChan := make(chan types.AsyncLoadResult, 1)
p := testutil.GeneratePeers(1)[0]
lr := NewLoadRequest(p, requestID, link, resultChan)
lr := NewLoadRequest(p, requestID, link, linkContext, resultChan)
loadAttemptQueue.AttemptLoad(lr, true)

testutil.AssertDoesReceiveFirst(t, called, "should attempt load with no result", resultChan, ctx.Done())
Expand All @@ -148,7 +152,7 @@ func TestAsyncLoadInitialLoadIndeterminateThenRequestFinishes(t *testing.T) {
defer cancel()
callCount := 0
called := make(chan struct{}, 2)
loadAttempter := func(peer.ID, graphsync.RequestID, ipld.Link) types.AsyncLoadResult {
loadAttempter := func(peer.ID, graphsync.RequestID, ipld.Link, ipld.LinkContext) types.AsyncLoadResult {
var result []byte
called <- struct{}{}
if callCount > 0 {
Expand All @@ -162,10 +166,11 @@ func TestAsyncLoadInitialLoadIndeterminateThenRequestFinishes(t *testing.T) {
loadAttemptQueue := New(loadAttempter)

link := testutil.NewTestLink()
linkContext := ipld.LinkContext{}
requestID := graphsync.RequestID(rand.Int31())
resultChan := make(chan types.AsyncLoadResult, 1)
p := testutil.GeneratePeers(1)[0]
lr := NewLoadRequest(p, requestID, link, resultChan)
lr := NewLoadRequest(p, requestID, link, linkContext, resultChan)
loadAttemptQueue.AttemptLoad(lr, true)

testutil.AssertDoesReceiveFirst(t, called, "should attempt load with no result", resultChan, ctx.Done())
Expand Down
Loading