From 43150276c7dba59a4e07070ddd22aa6148fcf102 Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Wed, 8 Dec 2021 18:10:26 -0800 Subject: [PATCH] feat(responsemanager): clarify response completion only delete requests when they finish going over the network. put requests that are not processing but still going over the network in a state of CompletingSend --- graphsync.go | 5 ++ impl/graphsync.go | 2 - responsemanager/client.go | 10 +++ responsemanager/messages.go | 14 ++++ .../queryexecutor/queryexecutor.go | 37 +++------- .../queryexecutor/queryexecutor_test.go | 72 +++++++------------ responsemanager/responsemanager_test.go | 4 +- responsemanager/server.go | 45 ++++++++---- responsemanager/subscriber.go | 6 +- 9 files changed, 103 insertions(+), 92 deletions(-) diff --git a/graphsync.go b/graphsync.go index a23d9b17..5f6f0e06 100644 --- a/graphsync.go +++ b/graphsync.go @@ -345,6 +345,9 @@ const ( Running // Paused means a request is paused Paused + // CompletingSend means we have processed a query and are waiting for data to + // go over the network + CompletingSend ) func (rs RequestState) String() string { @@ -355,6 +358,8 @@ func (rs RequestState) String() string { return "running" case Paused: return "paused" + case CompletingSend: + return "completing send" default: return "unrecognized request state" } diff --git a/impl/graphsync.go b/impl/graphsync.go index 49e5e3f8..ce7cf7ec 100644 --- a/impl/graphsync.go +++ b/impl/graphsync.go @@ -262,9 +262,7 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork, responseManager, outgoingBlockHooks, requestUpdatedHooks, - requestorCancelledListeners, responseAssembler, - network.ConnectionManager(), ) graphSync := &GraphSync{ network: network, diff --git a/responsemanager/client.go b/responsemanager/client.go index de284e3a..001d943d 100644 --- a/responsemanager/client.go +++ b/responsemanager/client.go @@ -232,6 +232,16 @@ func (rm *ResponseManager) CloseWithNetworkError(p peer.ID, requestID graphsync. rm.send(&errorRequestMessage{p, requestID, queryexecutor.ErrNetworkError, make(chan error, 1)}, nil) } +// TerminateRequest indicates a request has finished sending data and should no longer be tracked +func (rm *ResponseManager) TerminateRequest(p peer.ID, requestID graphsync.RequestID) { + done := make(chan struct{}, 1) + rm.send(&terminateRequestMessage{p, requestID, done}, nil) + select { + case <-rm.ctx.Done(): + case <-done: + } +} + // PeerState gets current state of the outgoing responses for a given peer func (rm *ResponseManager) PeerState(p peer.ID) peerstate.PeerState { response := make(chan peerstate.PeerState) diff --git a/responsemanager/messages.go b/responsemanager/messages.go index 0a05f08e..1b0944a5 100644 --- a/responsemanager/messages.go +++ b/responsemanager/messages.go @@ -127,3 +127,17 @@ func (psm *peerStateMessage) handle(rm *ResponseManager) { case <-rm.ctx.Done(): } } + +type terminateRequestMessage struct { + p peer.ID + requestID graphsync.RequestID + done chan<- struct{} +} + +func (trm *terminateRequestMessage) handle(rm *ResponseManager) { + rm.terminateRequest(responseKey{trm.p, trm.requestID}) + select { + case <-rm.ctx.Done(): + case trm.done <- struct{}{}: + } +} diff --git a/responsemanager/queryexecutor/queryexecutor.go b/responsemanager/queryexecutor/queryexecutor.go index 09adb477..88fd6ff7 100644 --- a/responsemanager/queryexecutor/queryexecutor.go +++ b/responsemanager/queryexecutor/queryexecutor.go @@ -54,13 +54,12 @@ type ResponseSignals struct { // QueryExecutor is responsible for performing individual requests by executing their traversals type QueryExecutor struct { - ctx context.Context - manager Manager - blockHooks BlockHooks - updateHooks UpdateHooks - cancelledListeners CancelledListeners - responseAssembler ResponseAssembler - connManager network.ConnManager + ctx context.Context + manager Manager + blockHooks BlockHooks + updateHooks UpdateHooks + responseAssembler ResponseAssembler + connManager network.ConnManager } // New creates a new QueryExecutor @@ -68,18 +67,14 @@ func New(ctx context.Context, manager Manager, blockHooks BlockHooks, updateHooks UpdateHooks, - cancelledListeners CancelledListeners, responseAssembler ResponseAssembler, - connManager network.ConnManager, ) *QueryExecutor { qm := &QueryExecutor{ - blockHooks: blockHooks, - updateHooks: updateHooks, - cancelledListeners: cancelledListeners, - responseAssembler: responseAssembler, - manager: manager, - ctx: ctx, - connManager: connManager, + blockHooks: blockHooks, + updateHooks: updateHooks, + responseAssembler: responseAssembler, + manager: manager, + ctx: ctx, } return qm } @@ -106,11 +101,6 @@ func (qe *QueryExecutor) ExecuteTask(ctx context.Context, pid peer.ID, task *pee log.Debugw("beginning response execution", "id", rt.Request.ID(), "peer", pid.String(), "root_cid", rt.Request.Root().String()) err := qe.executeQuery(pid, rt) - isCancelled := err != nil && ipldutil.IsContextCancelErr(err) - if isCancelled { - qe.connManager.Unprotect(pid, rt.Request.ID().Tag()) - qe.cancelledListeners.NotifyCancelledListeners(pid, rt.Request) - } qe.manager.FinishTask(task, err) log.Debugw("finishing response execution", "id", rt.Request.ID(), "peer", pid.String(), "root_cid", rt.Request.Root().String()) return false @@ -286,11 +276,6 @@ type UpdateHooks interface { ProcessUpdateHooks(p peer.ID, request graphsync.RequestData, update graphsync.RequestData) hooks.UpdateResult } -// CancelledListeners is an interface for notifying listeners that requestor cancelled -type CancelledListeners interface { - NotifyCancelledListeners(p peer.ID, request graphsync.RequestData) -} - // ResponseAssembler is an interface that returns sender interfaces for peer responses. type ResponseAssembler interface { Transaction(p peer.ID, requestID graphsync.RequestID, transaction responseassembler.Transaction) error diff --git a/responsemanager/queryexecutor/queryexecutor_test.go b/responsemanager/queryexecutor/queryexecutor_test.go index 50a1b697..3e3f2922 100644 --- a/responsemanager/queryexecutor/queryexecutor_test.go +++ b/responsemanager/queryexecutor/queryexecutor_test.go @@ -23,7 +23,6 @@ import ( "github.com/ipfs/go-graphsync" "github.com/ipfs/go-graphsync/ipldutil" - "github.com/ipfs/go-graphsync/listeners" gsmsg "github.com/ipfs/go-graphsync/message" "github.com/ipfs/go-graphsync/notifications" "github.com/ipfs/go-graphsync/responsemanager/hooks" @@ -44,7 +43,6 @@ func TestOneBlockTask(t *testing.T) { notifeeExpect(t, td, 1, td.responseCode) require.Equal(t, false, qe.ExecuteTask(td.ctx, td.peer, td.task)) require.Equal(t, 0, td.clearRequestCalls) - require.Equal(t, 0, td.cancelledCalls) } func TestSmallGraphTask(t *testing.T) { @@ -83,7 +81,6 @@ func TestSmallGraphTask(t *testing.T) { notifeeExpect(t, td, 10, td.responseCode) // AddNotifee called on all blocks require.Equal(t, false, qe.ExecuteTask(td.ctx, td.peer, td.task)) require.Equal(t, 0, td.clearRequestCalls) - require.Equal(t, 0, td.cancelledCalls) }) t.Run("paused by hook", func(t *testing.T) { @@ -98,7 +95,6 @@ func TestSmallGraphTask(t *testing.T) { require.Equal(t, false, qe.ExecuteTask(td.ctx, td.peer, td.task)) require.Equal(t, 1, td.pauseCalls) require.Equal(t, 0, td.clearRequestCalls) - require.Equal(t, 0, td.cancelledCalls) }) t.Run("paused by signal", func(t *testing.T) { @@ -117,7 +113,6 @@ func TestSmallGraphTask(t *testing.T) { require.Equal(t, false, qe.ExecuteTask(td.ctx, td.peer, td.task)) require.Equal(t, 1, td.pauseCalls) require.Equal(t, 0, td.clearRequestCalls) - require.Equal(t, 0, td.cancelledCalls) }) t.Run("partial cancelled by hook", func(t *testing.T) { @@ -130,7 +125,6 @@ func TestSmallGraphTask(t *testing.T) { transactionExpect(t, td, []int{6, 7}, ipldutil.ContextCancelError{}.Error()) // last 2 transactions are ContextCancelled require.Equal(t, false, qe.ExecuteTask(td.ctx, td.peer, td.task)) - require.Equal(t, 1, td.cancelledCalls) require.Equal(t, 1, td.clearRequestCalls) }) @@ -153,7 +147,6 @@ func TestSmallGraphTask(t *testing.T) { require.Equal(t, 0, td.clearRequestCalls) // cancelled by signal doesn't mean we get a cancelled call here // ErrCancelledByCommand is treated differently to a context cancellation error - require.Equal(t, 0, td.cancelledCalls) }) t.Run("unknown error by hook", func(t *testing.T) { @@ -168,7 +161,6 @@ func TestSmallGraphTask(t *testing.T) { require.Equal(t, false, qe.ExecuteTask(td.ctx, td.peer, td.task)) require.Equal(t, 0, td.clearRequestCalls) - require.Equal(t, 0, td.cancelledCalls) }) t.Run("unknown error by signal", func(t *testing.T) { @@ -189,7 +181,6 @@ func TestSmallGraphTask(t *testing.T) { require.Equal(t, false, qe.ExecuteTask(td.ctx, td.peer, td.task)) require.Equal(t, 0, td.clearRequestCalls) - require.Equal(t, 0, td.cancelledCalls) }) t.Run("network error by hook", func(t *testing.T) { @@ -204,7 +195,6 @@ func TestSmallGraphTask(t *testing.T) { require.Equal(t, false, qe.ExecuteTask(td.ctx, td.peer, td.task)) require.Equal(t, 1, td.clearRequestCalls) - require.Equal(t, 0, td.cancelledCalls) }) t.Run("network error by signal", func(t *testing.T) { @@ -225,7 +215,6 @@ func TestSmallGraphTask(t *testing.T) { require.Equal(t, false, qe.ExecuteTask(td.ctx, td.peer, td.task)) require.Equal(t, 1, td.clearRequestCalls) - require.Equal(t, 0, td.cancelledCalls) }) t.Run("first block wont load", func(t *testing.T) { @@ -238,7 +227,6 @@ func TestSmallGraphTask(t *testing.T) { require.Equal(t, false, qe.ExecuteTask(td.ctx, td.peer, td.task)) require.Equal(t, 0, td.clearRequestCalls) - require.Equal(t, 0, td.cancelledCalls) }) } @@ -277,34 +265,31 @@ func newRandomBlock(index int64) *blockData { } type testData struct { - ctx context.Context - t *testing.T - cancel func() - task *peertask.Task - blockStore map[ipld.Link][]byte - persistence ipld.LinkSystem - manager *fauxManager - responseAssembler *fauxResponseAssembler - responseBuilder *fauxResponseBuilder - connManager *testutil.TestConnManager - blockHooks *hooks.OutgoingBlockHooks - updateHooks *hooks.RequestUpdatedHooks - cancelledListeners *listeners.RequestorCancelledListeners - extensionData []byte - extensionName graphsync.ExtensionName - extension graphsync.ExtensionData - requestID graphsync.RequestID - requestCid cid.Cid - requestSelector datamodel.Node - requests []gsmsg.GraphSyncRequest - signals *ResponseSignals - pauseCalls int - clearRequestCalls int - cancelledCalls int - expectedBlocks []*blockData - responseCode graphsync.ResponseStatusCode - peer peer.ID - subscriber *notifications.TopicDataSubscriber + ctx context.Context + t *testing.T + cancel func() + task *peertask.Task + blockStore map[ipld.Link][]byte + persistence ipld.LinkSystem + manager *fauxManager + responseAssembler *fauxResponseAssembler + responseBuilder *fauxResponseBuilder + blockHooks *hooks.OutgoingBlockHooks + updateHooks *hooks.RequestUpdatedHooks + extensionData []byte + extensionName graphsync.ExtensionName + extension graphsync.ExtensionData + requestID graphsync.RequestID + requestCid cid.Cid + requestSelector datamodel.Node + requests []gsmsg.GraphSyncRequest + signals *ResponseSignals + pauseCalls int + clearRequestCalls int + expectedBlocks []*blockData + responseCode graphsync.ResponseStatusCode + peer peer.ID + subscriber *notifications.TopicDataSubscriber } func newTestData(t *testing.T, blockCount int, expectedTraverse int) (*testData, *QueryExecutor) { @@ -318,10 +303,8 @@ func newTestData(t *testing.T, blockCount int, expectedTraverse int) (*testData, td.task = &peertask.Task{} td.manager = &fauxManager{ctx: ctx, t: t, expectedStartTask: td.task} td.responseAssembler = &fauxResponseAssembler{} - td.connManager = testutil.NewTestConnManager() td.blockHooks = hooks.NewBlockHooks() td.updateHooks = hooks.NewUpdateHooks() - td.cancelledListeners = listeners.NewRequestorCancelledListeners() td.requestID = graphsync.RequestID(rand.Int31()) td.requestCid, _ = cid.Decode("bafybeigdyrzt5sfp7udm7hu76uh7y26nf3efuylqabf3oclgtqy55fbzdi") td.requestSelector = basicnode.NewInt(rand.Int63()) @@ -401,18 +384,13 @@ func newTestData(t *testing.T, blockCount int, expectedTraverse int) (*testData, td.responseAssembler.responseBuilder.pauseCb = func() { td.pauseCalls++ } - td.cancelledListeners.Register(func(p peer.ID, request graphsync.RequestData) { - td.cancelledCalls++ - }) qe := New( td.ctx, td.manager, td.blockHooks, td.updateHooks, - td.cancelledListeners, td.responseAssembler, - td.connManager, ) return td, qe } diff --git a/responsemanager/responsemanager_test.go b/responsemanager/responsemanager_test.go index 0745df4f..993583c9 100644 --- a/responsemanager/responsemanager_test.go +++ b/responsemanager/responsemanager_test.go @@ -87,7 +87,6 @@ func TestCancellationQueryInProgress(t *testing.T) { }) cancelledListenerCalled := make(chan struct{}, 1) td.cancelledListeners.Register(func(p peer.ID, request graphsync.RequestData) { - td.connManager.RefuteProtected(t, td.p) cancelledListenerCalled <- struct{}{} }) responseManager.Startup() @@ -105,6 +104,7 @@ func TestCancellationQueryInProgress(t *testing.T) { close(waitForCancel) testutil.AssertDoesReceive(td.ctx, t, cancelledListenerCalled, "should call cancelled listener") + td.connManager.RefuteProtected(t, td.p) td.assertRequestCleared() } @@ -1138,7 +1138,7 @@ func (td *testData) alternateLoaderResponseManager() *ResponseManager { } func (td *testData) newQueryExecutor(manager queryexecutor.Manager) *queryexecutor.QueryExecutor { - return queryexecutor.New(td.ctx, manager, td.blockHooks, td.updateHooks, td.cancelledListeners, td.responseAssembler, td.connManager) + return queryexecutor.New(td.ctx, manager, td.blockHooks, td.updateHooks, td.responseAssembler) } func (td *testData) assertPausedRequest() { diff --git a/responsemanager/server.go b/responsemanager/server.go index d754b423..c0f109a9 100644 --- a/responsemanager/server.go +++ b/responsemanager/server.go @@ -42,9 +42,19 @@ func (rm *ResponseManager) run() { } } +func (rm *ResponseManager) terminateRequest(key responseKey) { + ipr, ok := rm.inProgressResponses[key] + if !ok { + return + } + rm.connManager.Unprotect(key.p, key.requestID.Tag()) + delete(rm.inProgressResponses, key) + ipr.cancelFn() +} + func (rm *ResponseManager) processUpdate(key responseKey, update gsmsg.GraphSyncRequest) { response, ok := rm.inProgressResponses[key] - if !ok { + if !ok || response.state == graphsync.CompletingSend { log.Warnf("received update for non existent request, peer %s, request ID %d", key.p.Pretty(), key.requestID) return } @@ -68,8 +78,7 @@ func (rm *ResponseManager) processUpdate(key responseKey, update gsmsg.GraphSync return nil }) if result.Err != nil { - delete(rm.inProgressResponses, key) - response.cancelFn() + response.state = graphsync.CompletingSend return } if result.Unpause { @@ -106,26 +115,26 @@ func (rm *ResponseManager) abortRequest(p peer.ID, requestID graphsync.RequestID key := responseKey{p, requestID} rm.responseQueue.Remove(key, key.p) response, ok := rm.inProgressResponses[key] - if !ok { + if !ok || response.state == graphsync.CompletingSend { return errors.New("could not find request") } if response.state != graphsync.Running { _ = rm.responseAssembler.Transaction(p, requestID, func(rb responseassembler.ResponseBuilder) error { if ipldutil.IsContextCancelErr(err) { - rm.connManager.Unprotect(p, requestID.Tag()) rm.cancelledListeners.NotifyCancelledListeners(p, response.request) rb.ClearRequest() + rm.terminateRequest(key) } else if err == queryexecutor.ErrNetworkError { rb.ClearRequest() + rm.terminateRequest(key) } else { rb.FinishWithError(graphsync.RequestCancelled) rb.AddNotifee(notifications.Notifee{Data: graphsync.RequestCancelled, Subscriber: response.subscriber}) + response.state = graphsync.CompletingSend } return nil }) - delete(rm.inProgressResponses, key) - response.cancelFn() return nil } select { @@ -186,7 +195,7 @@ func (rm *ResponseManager) processRequests(p peer.ID, requests []gsmsg.GraphSync func (rm *ResponseManager) taskDataForKey(key responseKey) queryexecutor.ResponseTask { response, hasResponse := rm.inProgressResponses[key] - if !hasResponse { + if !hasResponse || response.state == graphsync.CompletingSend { return queryexecutor.ResponseTask{Empty: true} } log.Infow("graphsync response processing begins", "request id", key.requestID, "peer", key.p, "total time", time.Since(response.startTime)) @@ -194,8 +203,7 @@ func (rm *ResponseManager) taskDataForKey(key responseKey) queryexecutor.Respons if response.loader == nil || response.traverser == nil { loader, traverser, isPaused, err := (&queryPreparer{rm.requestHooks, rm.responseAssembler, rm.linkSystem, rm.maxLinksPerRequest}).prepareQuery(response.ctx, key.p, response.request, response.signals, response.subscriber) if err != nil { - response.cancelFn() - delete(rm.inProgressResponses, key) + response.state = graphsync.CompletingSend return queryexecutor.ResponseTask{Empty: true} } response.loader = loader @@ -243,8 +251,19 @@ func (rm *ResponseManager) finishTask(task *peertask.Task, err error) { if err != nil { log.Infof("response failed: %w", err) } - delete(rm.inProgressResponses, key) - response.cancelFn() + + if ipldutil.IsContextCancelErr(err) { + rm.cancelledListeners.NotifyCancelledListeners(key.p, response.request) + rm.terminateRequest(key) + return + } + + if err == queryexecutor.ErrNetworkError { + rm.terminateRequest(key) + return + } + + response.state = graphsync.CompletingSend } func (rm *ResponseManager) getUpdates(key responseKey) []gsmsg.GraphSyncRequest { @@ -260,7 +279,7 @@ func (rm *ResponseManager) getUpdates(key responseKey) []gsmsg.GraphSyncRequest func (rm *ResponseManager) pauseRequest(p peer.ID, requestID graphsync.RequestID) error { key := responseKey{p, requestID} inProgressResponse, ok := rm.inProgressResponses[key] - if !ok { + if !ok || inProgressResponse.state == graphsync.CompletingSend { return errors.New("could not find request") } if inProgressResponse.state == graphsync.Paused { diff --git a/responsemanager/subscriber.go b/responsemanager/subscriber.go index cfa61ab6..70e682a0 100644 --- a/responsemanager/subscriber.go +++ b/responsemanager/subscriber.go @@ -12,6 +12,7 @@ import ( // RequestCloser can cancel request on a network error type RequestCloser interface { + TerminateRequest(p peer.ID, requestID graphsync.RequestID) CloseWithNetworkError(p peer.ID, requestID graphsync.RequestID) } @@ -43,12 +44,13 @@ func (s *subscriber) OnNext(topic notifications.Topic, event notifications.Event } status, isStatus := topic.(graphsync.ResponseStatusCode) if isStatus { - s.connManager.Unprotect(s.p, s.request.ID().Tag()) switch responseEvent.Name { case messagequeue.Error: - s.networkErrorListeners.NotifyNetworkErrorListeners(s.p, s.request, responseEvent.Err) s.requestCloser.CloseWithNetworkError(s.p, s.request.ID()) + s.requestCloser.TerminateRequest(s.p, s.request.ID()) + s.networkErrorListeners.NotifyNetworkErrorListeners(s.p, s.request, responseEvent.Err) case messagequeue.Sent: + s.requestCloser.TerminateRequest(s.p, s.request.ID()) s.completedListeners.NotifyCompletedListeners(s.p, s.request, status) } }