Skip to content

Commit

Permalink
feat(responsemanager): clarify response completion
Browse files Browse the repository at this point in the history
only delete requests when they finish going over the network. put requests that are not processing
but still going over the network in a state of CompletingSend
  • Loading branch information
hannahhoward committed Dec 9, 2021
1 parent f08c2ed commit 4315027
Show file tree
Hide file tree
Showing 9 changed files with 103 additions and 92 deletions.
5 changes: 5 additions & 0 deletions graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,9 @@ const (
Running
// Paused means a request is paused
Paused
// CompletingSend means we have processed a query and are waiting for data to
// go over the network
CompletingSend
)

func (rs RequestState) String() string {
Expand All @@ -355,6 +358,8 @@ func (rs RequestState) String() string {
return "running"
case Paused:
return "paused"
case CompletingSend:
return "completing send"
default:
return "unrecognized request state"
}
Expand Down
2 changes: 0 additions & 2 deletions impl/graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,9 +262,7 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
responseManager,
outgoingBlockHooks,
requestUpdatedHooks,
requestorCancelledListeners,
responseAssembler,
network.ConnectionManager(),
)
graphSync := &GraphSync{
network: network,
Expand Down
10 changes: 10 additions & 0 deletions responsemanager/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,16 @@ func (rm *ResponseManager) CloseWithNetworkError(p peer.ID, requestID graphsync.
rm.send(&errorRequestMessage{p, requestID, queryexecutor.ErrNetworkError, make(chan error, 1)}, nil)
}

// TerminateRequest indicates a request has finished sending data and should no longer be tracked
func (rm *ResponseManager) TerminateRequest(p peer.ID, requestID graphsync.RequestID) {
done := make(chan struct{}, 1)
rm.send(&terminateRequestMessage{p, requestID, done}, nil)
select {
case <-rm.ctx.Done():
case <-done:
}
}

// PeerState gets current state of the outgoing responses for a given peer
func (rm *ResponseManager) PeerState(p peer.ID) peerstate.PeerState {
response := make(chan peerstate.PeerState)
Expand Down
14 changes: 14 additions & 0 deletions responsemanager/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,3 +127,17 @@ func (psm *peerStateMessage) handle(rm *ResponseManager) {
case <-rm.ctx.Done():
}
}

type terminateRequestMessage struct {
p peer.ID
requestID graphsync.RequestID
done chan<- struct{}
}

func (trm *terminateRequestMessage) handle(rm *ResponseManager) {
rm.terminateRequest(responseKey{trm.p, trm.requestID})
select {
case <-rm.ctx.Done():
case trm.done <- struct{}{}:
}
}
37 changes: 11 additions & 26 deletions responsemanager/queryexecutor/queryexecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,32 +54,27 @@ type ResponseSignals struct {

// QueryExecutor is responsible for performing individual requests by executing their traversals
type QueryExecutor struct {
ctx context.Context
manager Manager
blockHooks BlockHooks
updateHooks UpdateHooks
cancelledListeners CancelledListeners
responseAssembler ResponseAssembler
connManager network.ConnManager
ctx context.Context
manager Manager
blockHooks BlockHooks
updateHooks UpdateHooks
responseAssembler ResponseAssembler
connManager network.ConnManager
}

// New creates a new QueryExecutor
func New(ctx context.Context,
manager Manager,
blockHooks BlockHooks,
updateHooks UpdateHooks,
cancelledListeners CancelledListeners,
responseAssembler ResponseAssembler,
connManager network.ConnManager,
) *QueryExecutor {
qm := &QueryExecutor{
blockHooks: blockHooks,
updateHooks: updateHooks,
cancelledListeners: cancelledListeners,
responseAssembler: responseAssembler,
manager: manager,
ctx: ctx,
connManager: connManager,
blockHooks: blockHooks,
updateHooks: updateHooks,
responseAssembler: responseAssembler,
manager: manager,
ctx: ctx,
}
return qm
}
Expand All @@ -106,11 +101,6 @@ func (qe *QueryExecutor) ExecuteTask(ctx context.Context, pid peer.ID, task *pee

log.Debugw("beginning response execution", "id", rt.Request.ID(), "peer", pid.String(), "root_cid", rt.Request.Root().String())
err := qe.executeQuery(pid, rt)
isCancelled := err != nil && ipldutil.IsContextCancelErr(err)
if isCancelled {
qe.connManager.Unprotect(pid, rt.Request.ID().Tag())
qe.cancelledListeners.NotifyCancelledListeners(pid, rt.Request)
}
qe.manager.FinishTask(task, err)
log.Debugw("finishing response execution", "id", rt.Request.ID(), "peer", pid.String(), "root_cid", rt.Request.Root().String())
return false
Expand Down Expand Up @@ -286,11 +276,6 @@ type UpdateHooks interface {
ProcessUpdateHooks(p peer.ID, request graphsync.RequestData, update graphsync.RequestData) hooks.UpdateResult
}

// CancelledListeners is an interface for notifying listeners that requestor cancelled
type CancelledListeners interface {
NotifyCancelledListeners(p peer.ID, request graphsync.RequestData)
}

// ResponseAssembler is an interface that returns sender interfaces for peer responses.
type ResponseAssembler interface {
Transaction(p peer.ID, requestID graphsync.RequestID, transaction responseassembler.Transaction) error
Expand Down
72 changes: 25 additions & 47 deletions responsemanager/queryexecutor/queryexecutor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (

"github.com/ipfs/go-graphsync"
"github.com/ipfs/go-graphsync/ipldutil"
"github.com/ipfs/go-graphsync/listeners"
gsmsg "github.com/ipfs/go-graphsync/message"
"github.com/ipfs/go-graphsync/notifications"
"github.com/ipfs/go-graphsync/responsemanager/hooks"
Expand All @@ -44,7 +43,6 @@ func TestOneBlockTask(t *testing.T) {
notifeeExpect(t, td, 1, td.responseCode)
require.Equal(t, false, qe.ExecuteTask(td.ctx, td.peer, td.task))
require.Equal(t, 0, td.clearRequestCalls)
require.Equal(t, 0, td.cancelledCalls)
}

func TestSmallGraphTask(t *testing.T) {
Expand Down Expand Up @@ -83,7 +81,6 @@ func TestSmallGraphTask(t *testing.T) {
notifeeExpect(t, td, 10, td.responseCode) // AddNotifee called on all blocks
require.Equal(t, false, qe.ExecuteTask(td.ctx, td.peer, td.task))
require.Equal(t, 0, td.clearRequestCalls)
require.Equal(t, 0, td.cancelledCalls)
})

t.Run("paused by hook", func(t *testing.T) {
Expand All @@ -98,7 +95,6 @@ func TestSmallGraphTask(t *testing.T) {
require.Equal(t, false, qe.ExecuteTask(td.ctx, td.peer, td.task))
require.Equal(t, 1, td.pauseCalls)
require.Equal(t, 0, td.clearRequestCalls)
require.Equal(t, 0, td.cancelledCalls)
})

t.Run("paused by signal", func(t *testing.T) {
Expand All @@ -117,7 +113,6 @@ func TestSmallGraphTask(t *testing.T) {
require.Equal(t, false, qe.ExecuteTask(td.ctx, td.peer, td.task))
require.Equal(t, 1, td.pauseCalls)
require.Equal(t, 0, td.clearRequestCalls)
require.Equal(t, 0, td.cancelledCalls)
})

t.Run("partial cancelled by hook", func(t *testing.T) {
Expand All @@ -130,7 +125,6 @@ func TestSmallGraphTask(t *testing.T) {
transactionExpect(t, td, []int{6, 7}, ipldutil.ContextCancelError{}.Error()) // last 2 transactions are ContextCancelled

require.Equal(t, false, qe.ExecuteTask(td.ctx, td.peer, td.task))
require.Equal(t, 1, td.cancelledCalls)
require.Equal(t, 1, td.clearRequestCalls)
})

Expand All @@ -153,7 +147,6 @@ func TestSmallGraphTask(t *testing.T) {
require.Equal(t, 0, td.clearRequestCalls)
// cancelled by signal doesn't mean we get a cancelled call here
// ErrCancelledByCommand is treated differently to a context cancellation error
require.Equal(t, 0, td.cancelledCalls)
})

t.Run("unknown error by hook", func(t *testing.T) {
Expand All @@ -168,7 +161,6 @@ func TestSmallGraphTask(t *testing.T) {

require.Equal(t, false, qe.ExecuteTask(td.ctx, td.peer, td.task))
require.Equal(t, 0, td.clearRequestCalls)
require.Equal(t, 0, td.cancelledCalls)
})

t.Run("unknown error by signal", func(t *testing.T) {
Expand All @@ -189,7 +181,6 @@ func TestSmallGraphTask(t *testing.T) {

require.Equal(t, false, qe.ExecuteTask(td.ctx, td.peer, td.task))
require.Equal(t, 0, td.clearRequestCalls)
require.Equal(t, 0, td.cancelledCalls)
})

t.Run("network error by hook", func(t *testing.T) {
Expand All @@ -204,7 +195,6 @@ func TestSmallGraphTask(t *testing.T) {

require.Equal(t, false, qe.ExecuteTask(td.ctx, td.peer, td.task))
require.Equal(t, 1, td.clearRequestCalls)
require.Equal(t, 0, td.cancelledCalls)
})

t.Run("network error by signal", func(t *testing.T) {
Expand All @@ -225,7 +215,6 @@ func TestSmallGraphTask(t *testing.T) {

require.Equal(t, false, qe.ExecuteTask(td.ctx, td.peer, td.task))
require.Equal(t, 1, td.clearRequestCalls)
require.Equal(t, 0, td.cancelledCalls)
})

t.Run("first block wont load", func(t *testing.T) {
Expand All @@ -238,7 +227,6 @@ func TestSmallGraphTask(t *testing.T) {

require.Equal(t, false, qe.ExecuteTask(td.ctx, td.peer, td.task))
require.Equal(t, 0, td.clearRequestCalls)
require.Equal(t, 0, td.cancelledCalls)
})
}

Expand Down Expand Up @@ -277,34 +265,31 @@ func newRandomBlock(index int64) *blockData {
}

type testData struct {
ctx context.Context
t *testing.T
cancel func()
task *peertask.Task
blockStore map[ipld.Link][]byte
persistence ipld.LinkSystem
manager *fauxManager
responseAssembler *fauxResponseAssembler
responseBuilder *fauxResponseBuilder
connManager *testutil.TestConnManager
blockHooks *hooks.OutgoingBlockHooks
updateHooks *hooks.RequestUpdatedHooks
cancelledListeners *listeners.RequestorCancelledListeners
extensionData []byte
extensionName graphsync.ExtensionName
extension graphsync.ExtensionData
requestID graphsync.RequestID
requestCid cid.Cid
requestSelector datamodel.Node
requests []gsmsg.GraphSyncRequest
signals *ResponseSignals
pauseCalls int
clearRequestCalls int
cancelledCalls int
expectedBlocks []*blockData
responseCode graphsync.ResponseStatusCode
peer peer.ID
subscriber *notifications.TopicDataSubscriber
ctx context.Context
t *testing.T
cancel func()
task *peertask.Task
blockStore map[ipld.Link][]byte
persistence ipld.LinkSystem
manager *fauxManager
responseAssembler *fauxResponseAssembler
responseBuilder *fauxResponseBuilder
blockHooks *hooks.OutgoingBlockHooks
updateHooks *hooks.RequestUpdatedHooks
extensionData []byte
extensionName graphsync.ExtensionName
extension graphsync.ExtensionData
requestID graphsync.RequestID
requestCid cid.Cid
requestSelector datamodel.Node
requests []gsmsg.GraphSyncRequest
signals *ResponseSignals
pauseCalls int
clearRequestCalls int
expectedBlocks []*blockData
responseCode graphsync.ResponseStatusCode
peer peer.ID
subscriber *notifications.TopicDataSubscriber
}

func newTestData(t *testing.T, blockCount int, expectedTraverse int) (*testData, *QueryExecutor) {
Expand All @@ -318,10 +303,8 @@ func newTestData(t *testing.T, blockCount int, expectedTraverse int) (*testData,
td.task = &peertask.Task{}
td.manager = &fauxManager{ctx: ctx, t: t, expectedStartTask: td.task}
td.responseAssembler = &fauxResponseAssembler{}
td.connManager = testutil.NewTestConnManager()
td.blockHooks = hooks.NewBlockHooks()
td.updateHooks = hooks.NewUpdateHooks()
td.cancelledListeners = listeners.NewRequestorCancelledListeners()
td.requestID = graphsync.RequestID(rand.Int31())
td.requestCid, _ = cid.Decode("bafybeigdyrzt5sfp7udm7hu76uh7y26nf3efuylqabf3oclgtqy55fbzdi")
td.requestSelector = basicnode.NewInt(rand.Int63())
Expand Down Expand Up @@ -401,18 +384,13 @@ func newTestData(t *testing.T, blockCount int, expectedTraverse int) (*testData,
td.responseAssembler.responseBuilder.pauseCb = func() {
td.pauseCalls++
}
td.cancelledListeners.Register(func(p peer.ID, request graphsync.RequestData) {
td.cancelledCalls++
})

qe := New(
td.ctx,
td.manager,
td.blockHooks,
td.updateHooks,
td.cancelledListeners,
td.responseAssembler,
td.connManager,
)
return td, qe
}
Expand Down
4 changes: 2 additions & 2 deletions responsemanager/responsemanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ func TestCancellationQueryInProgress(t *testing.T) {
})
cancelledListenerCalled := make(chan struct{}, 1)
td.cancelledListeners.Register(func(p peer.ID, request graphsync.RequestData) {
td.connManager.RefuteProtected(t, td.p)
cancelledListenerCalled <- struct{}{}
})
responseManager.Startup()
Expand All @@ -105,6 +104,7 @@ func TestCancellationQueryInProgress(t *testing.T) {
close(waitForCancel)

testutil.AssertDoesReceive(td.ctx, t, cancelledListenerCalled, "should call cancelled listener")
td.connManager.RefuteProtected(t, td.p)

td.assertRequestCleared()
}
Expand Down Expand Up @@ -1138,7 +1138,7 @@ func (td *testData) alternateLoaderResponseManager() *ResponseManager {
}

func (td *testData) newQueryExecutor(manager queryexecutor.Manager) *queryexecutor.QueryExecutor {
return queryexecutor.New(td.ctx, manager, td.blockHooks, td.updateHooks, td.cancelledListeners, td.responseAssembler, td.connManager)
return queryexecutor.New(td.ctx, manager, td.blockHooks, td.updateHooks, td.responseAssembler)
}

func (td *testData) assertPausedRequest() {
Expand Down
Loading

0 comments on commit 4315027

Please sign in to comment.