Skip to content

Commit

Permalink
Use do not send blocks for pause/resume & prevent processing of block…
Browse files Browse the repository at this point in the history
…s on cancelled requests (#333)

* feat(executor): run block hooks ahead of advancement

* feat(requestmanager): shutdown online lines once cancelled

* refactor(executor): switch to sending doNotSendFirstBlocks for pause/resume
  • Loading branch information
hannahhoward authored Jan 14, 2022
1 parent 795beb9 commit 98dd895
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 104 deletions.
35 changes: 17 additions & 18 deletions requestmanager/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (

"github.com/hannahhoward/go-pubsub"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
"github.com/ipfs/go-peertaskqueue/peertask"
"github.com/ipld/go-ipld-prime"
Expand Down Expand Up @@ -46,23 +45,23 @@ const (
)

type inProgressRequestStatus struct {
ctx context.Context
span trace.Span
startTime time.Time
cancelFn func()
p peer.ID
terminalError error
pauseMessages chan struct{}
state graphsync.RequestState
lastResponse atomic.Value
onTerminated []chan<- error
request gsmsg.GraphSyncRequest
doNotSendCids *cid.Set
nodeStyleChooser traversal.LinkTargetNodePrototypeChooser
inProgressChan chan graphsync.ResponseProgress
inProgressErr chan error
traverser ipldutil.Traverser
traverserCancel context.CancelFunc
ctx context.Context
span trace.Span
startTime time.Time
cancelFn func()
p peer.ID
terminalError error
pauseMessages chan struct{}
state graphsync.RequestState
lastResponse atomic.Value
onTerminated []chan<- error
request gsmsg.GraphSyncRequest
doNotSendFirstBlocks int64
nodeStyleChooser traversal.LinkTargetNodePrototypeChooser
inProgressChan chan graphsync.ResponseProgress
inProgressErr chan error
traverser ipldutil.Traverser
traverserCancel context.CancelFunc
}

// PeerHandler is an interface that can send requests to peers
Expand Down
37 changes: 19 additions & 18 deletions requestmanager/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,17 @@ import (
"context"
"sync/atomic"

"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
"github.com/ipfs/go-peertaskqueue/peertask"
"github.com/ipld/go-ipld-prime"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/ipld/go-ipld-prime/traversal"
"github.com/libp2p/go-libp2p-core/peer"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"

"github.com/ipfs/go-graphsync"
"github.com/ipfs/go-graphsync/cidset"
"github.com/ipfs/go-graphsync/donotsendfirstblocks"
"github.com/ipfs/go-graphsync/ipldutil"
gsmsg "github.com/ipfs/go-graphsync/message"
"github.com/ipfs/go-graphsync/requestmanager/hooks"
Expand Down Expand Up @@ -102,17 +100,17 @@ func (e *Executor) ExecuteTask(ctx context.Context, pid peer.ID, task *peertask.

// RequestTask are parameters for a single request execution
type RequestTask struct {
Ctx context.Context
Span trace.Span
Request gsmsg.GraphSyncRequest
LastResponse *atomic.Value
DoNotSendCids *cid.Set
PauseMessages <-chan struct{}
Traverser ipldutil.Traverser
P peer.ID
InProgressErr chan error
Empty bool
InitialRequest bool
Ctx context.Context
Span trace.Span
Request gsmsg.GraphSyncRequest
LastResponse *atomic.Value
DoNotSendFirstBlocks int64
PauseMessages <-chan struct{}
Traverser ipldutil.Traverser
P peer.ID
InProgressErr chan error
Empty bool
InitialRequest bool
}

func (e *Executor) traverse(rt RequestTask) error {
Expand Down Expand Up @@ -177,7 +175,6 @@ func (e *Executor) processBlockHooks(p peer.ID, response graphsync.ResponseData,
}

func (e *Executor) onNewBlock(rt RequestTask, block graphsync.BlockData) error {
rt.DoNotSendCids.Add(block.Link().(cidlink.Link).Cid)
response := rt.LastResponse.Load().(gsmsg.GraphSyncResponse)
return e.processBlockHooks(rt.P, response, block)
}
Expand Down Expand Up @@ -218,12 +215,16 @@ func (e *Executor) processResult(rt RequestTask, link ipld.Link, result types.As

func (e *Executor) startRemoteRequest(rt RequestTask) error {
request := rt.Request
if rt.DoNotSendCids.Len() > 0 {
cidsData, err := cidset.EncodeCidSet(rt.DoNotSendCids)
doNotSendFirstBlocks := rt.DoNotSendFirstBlocks
if doNotSendFirstBlocks < int64(rt.Traverser.NBlocksTraversed()) {
doNotSendFirstBlocks = int64(rt.Traverser.NBlocksTraversed())
}
if doNotSendFirstBlocks > 0 {
doNotSendFirstBlocksData, err := donotsendfirstblocks.EncodeDoNotSendFirstBlocks(doNotSendFirstBlocks)
if err != nil {
return err
}
request = rt.Request.ReplaceExtensions([]graphsync.ExtensionData{{Name: graphsync.ExtensionDoNotSendCIDs, Data: cidsData}})
request = rt.Request.ReplaceExtensions([]graphsync.ExtensionData{{Name: graphsync.ExtensionsDoNotSendFirstBlocks, Data: doNotSendFirstBlocksData}})
}
log.Debugw("starting remote request", "id", rt.Request.ID(), "peer", rt.P.String(), "root_cid", rt.Request.Root().String())
e.manager.SendRequest(rt.P, request)
Expand Down
61 changes: 30 additions & 31 deletions requestmanager/executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"testing"
"time"

"github.com/ipfs/go-cid"
"github.com/ipfs/go-peertaskqueue/peertask"
"github.com/ipld/go-ipld-prime"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
Expand All @@ -17,7 +16,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/ipfs/go-graphsync"
"github.com/ipfs/go-graphsync/cidset"
"github.com/ipfs/go-graphsync/donotsendfirstblocks"
"github.com/ipfs/go-graphsync/ipldutil"
gsmsg "github.com/ipfs/go-graphsync/message"
"github.com/ipfs/go-graphsync/requestmanager/executor"
Expand Down Expand Up @@ -102,21 +101,21 @@ func TestRequestExecutionBlockChain(t *testing.T) {
require.EqualError(t, ree.terminalError, hooks.ErrPaused{}.Error())
},
},
"preexisting do not send cids": {
"preexisting do not send firstBlocks": {
configureRequestExecution: func(p peer.ID, requestID graphsync.RequestID, tbc *testutil.TestBlockChain, ree *requestExecutionEnv) {
ree.doNotSendCids.Add(tbc.GenisisLink.(cidlink.Link).Cid)
ree.doNotSendFirstBlocks = 1
},
verifyResults: func(t *testing.T, tbc *testutil.TestBlockChain, ree *requestExecutionEnv, responses []graphsync.ResponseProgress, receivedErrors []error) {
tbc.VerifyWholeChainSync(responses)
require.Empty(t, receivedErrors)
require.Equal(t, ree.request.ID(), ree.requestsSent[0].request.ID())
require.Equal(t, ree.request.Root(), ree.requestsSent[0].request.Root())
require.Equal(t, ree.request.Selector(), ree.requestsSent[0].request.Selector())
doNotSendCidsExt, has := ree.requestsSent[0].request.Extension(graphsync.ExtensionDoNotSendCIDs)
doNotSendFirstBlocksData, has := ree.requestsSent[0].request.Extension(graphsync.ExtensionsDoNotSendFirstBlocks)
require.True(t, has)
cidSet, err := cidset.DecodeCidSet(doNotSendCidsExt)
doNotSendFirstBlocks, err := donotsendfirstblocks.DecodeDoNotSendFirstBlocks(doNotSendFirstBlocksData)
require.NoError(t, err)
require.Equal(t, 1, cidSet.Len())
require.Equal(t, int64(1), doNotSendFirstBlocks)
require.Len(t, ree.blookHooksCalled, 10)
require.NoError(t, ree.terminalError)
},
Expand Down Expand Up @@ -145,11 +144,11 @@ func TestRequestExecutionBlockChain(t *testing.T) {
require.Equal(t, ree.request.ID(), ree.requestsSent[0].request.ID())
require.Equal(t, ree.request.Root(), ree.requestsSent[0].request.Root())
require.Equal(t, ree.request.Selector(), ree.requestsSent[0].request.Selector())
doNotSendCidsExt, has := ree.requestsSent[0].request.Extension(graphsync.ExtensionDoNotSendCIDs)
doNotSendFirstBlocksData, has := ree.requestsSent[0].request.Extension(graphsync.ExtensionsDoNotSendFirstBlocks)
require.True(t, has)
cidSet, err := cidset.DecodeCidSet(doNotSendCidsExt)
doNotSendFirstBlocks, err := donotsendfirstblocks.DecodeDoNotSendFirstBlocks(doNotSendFirstBlocksData)
require.NoError(t, err)
require.Equal(t, 6, cidSet.Len())
require.Equal(t, int64(6), doNotSendFirstBlocks)
require.Len(t, ree.blookHooksCalled, 10)
require.NoError(t, ree.terminalError)
},
Expand Down Expand Up @@ -202,16 +201,16 @@ func TestRequestExecutionBlockChain(t *testing.T) {
defer requestCancel()
var responsesReceived []graphsync.ResponseProgress
ree := &requestExecutionEnv{
ctx: requestCtx,
p: p,
pauseMessages: make(chan struct{}, 1),
blockHookResults: make(map[blockHookKey]hooks.UpdateResult),
doNotSendCids: cid.NewSet(),
request: gsmsg.NewRequest(requestID, tbc.TipLink.(cidlink.Link).Cid, tbc.Selector(), graphsync.Priority(rand.Int31())),
fal: fal,
tbc: tbc,
initialRequest: true,
inProgressErr: make(chan error, 1),
ctx: requestCtx,
p: p,
pauseMessages: make(chan struct{}, 1),
blockHookResults: make(map[blockHookKey]hooks.UpdateResult),
doNotSendFirstBlocks: 0,
request: gsmsg.NewRequest(requestID, tbc.TipLink.(cidlink.Link).Cid, tbc.Selector(), graphsync.Priority(rand.Int31())),
fal: fal,
tbc: tbc,
initialRequest: true,
inProgressErr: make(chan error, 1),
traverser: ipldutil.TraversalBuilder{
Root: tbc.TipLink,
Selector: tbc.Selector(),
Expand Down Expand Up @@ -276,7 +275,7 @@ type requestExecutionEnv struct {
request gsmsg.GraphSyncRequest
p peer.ID
blockHookResults map[blockHookKey]hooks.UpdateResult
doNotSendCids *cid.Set
doNotSendFirstBlocks int64
pauseMessages chan struct{}
externalPause pauseKey
loadLocallyUntil int
Expand Down Expand Up @@ -304,16 +303,16 @@ func (ree *requestExecutionEnv) GetRequestTask(_ peer.ID, _ *peertask.Task, requ
lastResponse.Store(gsmsg.NewResponse(ree.request.ID(), graphsync.RequestAcknowledged))

requestExecution := executor.RequestTask{
Ctx: ree.ctx,
Request: ree.request,
LastResponse: &lastResponse,
DoNotSendCids: ree.doNotSendCids,
PauseMessages: ree.pauseMessages,
Traverser: ree.traverser,
P: ree.p,
InProgressErr: ree.inProgressErr,
Empty: false,
InitialRequest: ree.initialRequest,
Ctx: ree.ctx,
Request: ree.request,
LastResponse: &lastResponse,
DoNotSendFirstBlocks: ree.doNotSendFirstBlocks,
PauseMessages: ree.pauseMessages,
Traverser: ree.traverser,
P: ree.p,
InProgressErr: ree.inProgressErr,
Empty: false,
InitialRequest: ree.initialRequest,
}
go func() {
select {
Expand Down
14 changes: 7 additions & 7 deletions requestmanager/requestmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ import (
"github.com/stretchr/testify/require"

"github.com/ipfs/go-graphsync"
"github.com/ipfs/go-graphsync/cidset"
"github.com/ipfs/go-graphsync/dedupkey"
"github.com/ipfs/go-graphsync/donotsendfirstblocks"
"github.com/ipfs/go-graphsync/listeners"
gsmsg "github.com/ipfs/go-graphsync/message"
"github.com/ipfs/go-graphsync/messagequeue"
Expand Down Expand Up @@ -877,10 +877,10 @@ func TestPauseResume(t *testing.T) {

// verify the correct new request with Do-no-send-cids & other extensions
resumedRequest := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
doNotSendCidsData, has := resumedRequest.gsr.Extension(graphsync.ExtensionDoNotSendCIDs)
doNotSendCids, err := cidset.DecodeCidSet(doNotSendCidsData)
doNotSendFirstBlocksData, has := resumedRequest.gsr.Extension(graphsync.ExtensionsDoNotSendFirstBlocks)
doNotSendFirstBlocks, err := donotsendfirstblocks.DecodeDoNotSendFirstBlocks(doNotSendFirstBlocksData)
require.NoError(t, err)
require.Equal(t, pauseAt, doNotSendCids.Len())
require.Equal(t, pauseAt, int(doNotSendFirstBlocks))
require.True(t, has)
ext1Data, has := resumedRequest.gsr.Extension(td.extensionName1)
require.True(t, has)
Expand Down Expand Up @@ -957,10 +957,10 @@ func TestPauseResumeExternal(t *testing.T) {

// verify the correct new request with Do-no-send-cids & other extensions
resumedRequest := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
doNotSendCidsData, has := resumedRequest.gsr.Extension(graphsync.ExtensionDoNotSendCIDs)
doNotSendCids, err := cidset.DecodeCidSet(doNotSendCidsData)
doNotSendFirstBlocksData, has := resumedRequest.gsr.Extension(graphsync.ExtensionsDoNotSendFirstBlocks)
doNotSendFirstBlocks, err := donotsendfirstblocks.DecodeDoNotSendFirstBlocks(doNotSendFirstBlocksData)
require.NoError(t, err)
require.Equal(t, pauseAt, doNotSendCids.Len())
require.Equal(t, pauseAt, int(doNotSendFirstBlocks))
require.True(t, has)
ext1Data, has := resumedRequest.gsr.Extension(td.extensionName1)
require.True(t, has)
Expand Down
58 changes: 28 additions & 30 deletions requestmanager/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"time"

blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-peertaskqueue/peertask"
"github.com/ipfs/go-peertaskqueue/peertracker"
"github.com/ipld/go-ipld-prime"
Expand All @@ -22,8 +21,8 @@ import (
"go.opentelemetry.io/otel/trace"

"github.com/ipfs/go-graphsync"
"github.com/ipfs/go-graphsync/cidset"
"github.com/ipfs/go-graphsync/dedupkey"
"github.com/ipfs/go-graphsync/donotsendfirstblocks"
"github.com/ipfs/go-graphsync/ipldutil"
gsmsg "github.com/ipfs/go-graphsync/message"
"github.com/ipfs/go-graphsync/peerstate"
Expand Down Expand Up @@ -73,34 +72,32 @@ func (rm *RequestManager) newRequest(parentSpan trace.Span, p peer.ID, root ipld
rp, err := rm.singleErrorResponse(err)
return request, rp, err
}
doNotSendCidsData, has := request.Extension(graphsync.ExtensionDoNotSendCIDs)
var doNotSendCids *cid.Set
doNotSendFirstBlocksData, has := request.Extension(graphsync.ExtensionsDoNotSendFirstBlocks)
var doNotSendFirstBlocks int64
if has {
doNotSendCids, err = cidset.DecodeCidSet(doNotSendCidsData)
doNotSendFirstBlocks, err = donotsendfirstblocks.DecodeDoNotSendFirstBlocks(doNotSendFirstBlocksData)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
defer parentSpan.End()
rp, err := rm.singleErrorResponse(err)
return request, rp, err
}
} else {
doNotSendCids = cid.NewSet()
}
ctx, cancel := context.WithCancel(ctx)
requestStatus := &inProgressRequestStatus{
ctx: ctx,
span: parentSpan,
startTime: time.Now(),
cancelFn: cancel,
p: p,
pauseMessages: make(chan struct{}, 1),
doNotSendCids: doNotSendCids,
request: request,
state: graphsync.Queued,
nodeStyleChooser: hooksResult.CustomChooser,
inProgressChan: make(chan graphsync.ResponseProgress),
inProgressErr: make(chan error),
ctx: ctx,
span: parentSpan,
startTime: time.Now(),
cancelFn: cancel,
p: p,
pauseMessages: make(chan struct{}, 1),
doNotSendFirstBlocks: doNotSendFirstBlocks,
request: request,
state: graphsync.Queued,
nodeStyleChooser: hooksResult.CustomChooser,
inProgressChan: make(chan graphsync.ResponseProgress),
inProgressErr: make(chan error),
}
requestStatus.lastResponse.Store(gsmsg.NewResponse(request.ID(), graphsync.RequestAcknowledged))
rm.inProgressRequestStatuses[request.ID()] = requestStatus
Expand Down Expand Up @@ -157,17 +154,17 @@ func (rm *RequestManager) requestTask(requestID graphsync.RequestID) executor.Re

ipr.state = graphsync.Running
return executor.RequestTask{
Ctx: ipr.ctx,
Span: ipr.span,
Request: ipr.request,
LastResponse: &ipr.lastResponse,
DoNotSendCids: ipr.doNotSendCids,
PauseMessages: ipr.pauseMessages,
Traverser: ipr.traverser,
P: ipr.p,
InProgressErr: ipr.inProgressErr,
InitialRequest: initialRequest,
Empty: false,
Ctx: ipr.ctx,
Span: ipr.span,
Request: ipr.request,
LastResponse: &ipr.lastResponse,
DoNotSendFirstBlocks: ipr.doNotSendFirstBlocks,
PauseMessages: ipr.pauseMessages,
Traverser: ipr.traverser,
P: ipr.p,
InProgressErr: ipr.inProgressErr,
InitialRequest: initialRequest,
Empty: false,
}
}

Expand Down Expand Up @@ -259,6 +256,7 @@ func (rm *RequestManager) cancelOnError(requestID graphsync.RequestID, ipr *inPr
rm.terminateRequest(requestID, ipr)
} else {
ipr.cancelFn()
rm.asyncLoader.CompleteResponsesFor(requestID)
}
}

Expand Down

0 comments on commit 98dd895

Please sign in to comment.