diff --git a/CHANGELOG.md b/CHANGELOG.md index e0afbe4c..daa12a40 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,113 @@ some significant breaking changes. | Hannah Howard | 2 | +3316/-3015 | 25 | | Steven Allen | 1 | +95/-227 | 5 | +# go-graphsync 0.6.8 + +### Changelog + +- github.com/ipfs/go-graphsync: + - refactor: replace particular request not found errors with public error (#188) ([ipfs/go-graphsync#188](https://github.com/ipfs/go-graphsync/pull/188)) + - fix(responsemanager): fix error codes (#182) ([ipfs/go-graphsync#182](https://github.com/ipfs/go-graphsync/pull/182)) + +### Contributors + +| Contributor | Commits | Lines ± | Files Changed | +|-------------|---------|---------|---------------| +| Hannah Howard | 1 | +100/-51 | 5 | +| dirkmc | 1 | +10/-3 | 2 | + +# go-graphsync 0.6.7 + +### Changelog + +- github.com/ipfs/go-graphsync: + - Add cancel request and wait function (#185) ([ipfs/go-graphsync#185](https://github.com/ipfs/go-graphsync/pull/185)) + +### Contributors + +| Contributor | Commits | Lines ± | Files Changed | +|-------------|---------|---------|---------------| +| Hannah Howard | 1 | +154/-32 | 9 | +# go-graphsync 0.6.6 + +### Changelog + +- github.com/ipfs/go-graphsync: + - feat(requestmanager): add request timing (#181) ([ipfs/go-graphsync#181](https://github.com/ipfs/go-graphsync/pull/181)) + +### Contributors + +| Contributor | Commits | Lines ± | Files Changed | +|-------------|---------|---------|---------------| +| Hannah Howard | 1 | +9/-1 | 1 | + +# go-graphsync 0.6.5 + +### Changelog + +- github.com/ipfs/go-graphsync: + - Resolve 175 race condition, no change to hook timing (#178) ([ipfs/go-graphsync#178](https://github.com/ipfs/go-graphsync/pull/178)) + +### Contributors + +| Contributor | Commits | Lines ± | Files Changed | +|-------------|---------|---------|---------------| +| Hannah Howard | 1 | +199/-171 | 10 | + +# go-graphsync 0.6.4 + +### Changelog + +- github.com/ipfs/go-graphsync: + - feat/request-queued-hook (#172) ([ipfs/go-graphsync#172](https://github.com/ipfs/go-graphsync/pull/172)) + +### Contributors + +| Contributor | Commits | Lines ± | Files Changed | +|-------------|---------|---------|---------------| +| aarshkshah1992 | 3 | +87/-3 | 7 | +| dirkmc | 1 | +11/-0 | 1 | + +# go-graphsync 0.6.3 + +### Changelog + +- github.com/ipfs/go-graphsync: + - Fix/log blockstore reads (#169) ([ipfs/go-graphsync#169](https://github.com/ipfs/go-graphsync/pull/169)) + +### Contributors + +| Contributor | Commits | Lines ± | Files Changed | +|-------------|---------|---------|---------------| +| Aarsh Shah | 2 | +40/-177 | 6 | + +# go-graphsync 0.6.2 + +### Changelog + +- github.com/ipfs/go-graphsync: + - Better logging for Graphsync traversal (#167) ([ipfs/go-graphsync#167](https://github.com/ipfs/go-graphsync/pull/167)) + +### Contributors + +| Contributor | Commits | Lines ± | Files Changed | +|-------------|---------|---------|---------------| +| Aarsh Shah | 1 | +18/-2 | 2 | + +# go-graphsync 0.6.1 + +### Changelog + +- github.com/ipfs/go-graphsync: + - feat: fire network error when network disconnects during request (#164) ([ipfs/go-graphsync#164](https://github.com/ipfs/go-graphsync/pull/164)) + +### Contributors + +| Contributor | Commits | Lines ± | Files Changed | +|-------------|---------|---------|---------------| +| dirkmc | 1 | +86/-8 | 4 | + + # go-graphsync 0.6.0 Major code refactor for simplicity, ease of understanding diff --git a/go.mod b/go.mod index e43664d3..89cdfb98 100644 --- a/go.mod +++ b/go.mod @@ -24,7 +24,7 @@ require ( github.com/ipfs/go-ipfs-util v0.0.2 github.com/ipfs/go-ipld-cbor v0.0.5 // indirect github.com/ipfs/go-ipld-format v0.2.0 - github.com/ipfs/go-log v1.0.4 + github.com/ipfs/go-log/v2 v2.1.1 github.com/ipfs/go-merkledag v0.3.2 github.com/ipfs/go-peertaskqueue v0.2.0 github.com/ipfs/go-unixfs v0.2.4 diff --git a/graphsync.go b/graphsync.go index e2c59695..c46add92 100644 --- a/graphsync.go +++ b/graphsync.go @@ -44,11 +44,12 @@ const ( ExtensionDeDupByKey = ExtensionName("graphsync/dedup-by-key") ) -// RequestContextCancelledErr is an error message received on the error channel when the request context given by the user is cancelled/times out -type RequestContextCancelledErr struct{} +// RequestClientCancelledErr is an error message received on the error channel when the request is cancelled on by the client code, +// either by closing the passed request context or calling CancelRequest +type RequestClientCancelledErr struct{} -func (e RequestContextCancelledErr) Error() string { - return "request context cancelled" +func (e RequestClientCancelledErr) Error() string { + return "request cancelled by client" } // RequestFailedBusyErr is an error message received on the error channel when the peer is busy @@ -86,6 +87,13 @@ func (e RequestCancelledErr) Error() string { return "request failed - responder cancelled" } +// RequestNotFoundErr indicates that a request with a particular request ID was not found +type RequestNotFoundErr struct{} + +func (e RequestNotFoundErr) Error() string { + return "request not found" +} + var ( // ErrExtensionAlreadyRegistered means a user extension can be registered only once ErrExtensionAlreadyRegistered = errors.New("extension already registered") @@ -197,6 +205,10 @@ type RequestUpdatedHookActions interface { UnpauseResponse() } +// OnIncomingRequestQueuedHook is a hook that runs each time a new incoming request is added to the responder's task queue. +// It receives the peer that sent the request and all data about the request. +type OnIncomingRequestQueuedHook func(p peer.ID, request RequestData) + // OnIncomingRequestHook is a hook that runs each time a new request is received. // It receives the peer that sent the request and all data about the request. // It receives an interface for customizing the response to this request @@ -262,6 +274,9 @@ type GraphExchange interface { // UnregisterPersistenceOption unregisters an alternate loader/storer combo UnregisterPersistenceOption(name string) error + // RegisterIncomingRequestQueuedHook adds a hook that runs when a new incoming request is added to the responder's task queue. + RegisterIncomingRequestQueuedHook(hook OnIncomingRequestQueuedHook) UnregisterHookFunc + // RegisterIncomingRequestHook adds a hook that runs when a request is received RegisterIncomingRequestHook(hook OnIncomingRequestHook) UnregisterHookFunc @@ -312,4 +327,7 @@ type GraphExchange interface { // CancelResponse cancels an in progress response CancelResponse(peer.ID, RequestID) error + + // CancelRequest cancels an in progress request + CancelRequest(context.Context, RequestID) error } diff --git a/impl/graphsync.go b/impl/graphsync.go index 81a2b41e..e4e877fe 100644 --- a/impl/graphsync.go +++ b/impl/graphsync.go @@ -3,7 +3,7 @@ package graphsync import ( "context" - logging "github.com/ipfs/go-log" + logging "github.com/ipfs/go-log/v2" "github.com/ipfs/go-peertaskqueue" ipld "github.com/ipld/go-ipld-prime" "github.com/libp2p/go-libp2p-core/peer" @@ -43,6 +43,7 @@ type GraphSync struct { responseAssembler *responseassembler.ResponseAssembler peerTaskQueue *peertaskqueue.PeerTaskQueue peerManager *peermanager.PeerMessageManager + incomingRequestQueuedHooks *responderhooks.IncomingRequestQueuedHooks incomingRequestHooks *responderhooks.IncomingRequestHooks outgoingBlockHooks *responderhooks.OutgoingBlockHooks requestUpdatedHooks *responderhooks.RequestUpdatedHooks @@ -124,6 +125,7 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork, networkErrorListeners := listeners.NewNetworkErrorListeners() receiverErrorListeners := listeners.NewReceiverNetworkErrorListeners() persistenceOptions := persistenceoptions.New() + requestQueuedHooks := responderhooks.NewRequestQueuedHooks() incomingRequestHooks := responderhooks.NewRequestHooks(persistenceOptions) outgoingBlockHooks := responderhooks.NewBlockHooks() requestUpdatedHooks := responderhooks.NewUpdateHooks() @@ -142,7 +144,7 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork, requestManager := requestmanager.New(ctx, asyncLoader, linkSystem, outgoingRequestHooks, incomingResponseHooks, incomingBlockHooks, networkErrorListeners) responseAssembler := responseassembler.New(ctx, peerManager) peerTaskQueue := peertaskqueue.New() - responseManager := responsemanager.New(ctx, linkSystem, responseAssembler, peerTaskQueue, incomingRequestHooks, outgoingBlockHooks, requestUpdatedHooks, completedResponseListeners, requestorCancelledListeners, blockSentListeners, networkErrorListeners, gsConfig.maxInProgressRequests) + responseManager := responsemanager.New(ctx, linkSystem, responseAssembler, peerTaskQueue, requestQueuedHooks, incomingRequestHooks, outgoingBlockHooks, requestUpdatedHooks, completedResponseListeners, requestorCancelledListeners, blockSentListeners, networkErrorListeners, gsConfig.maxInProgressRequests) graphSync := &GraphSync{ network: network, linkSystem: linkSystem, @@ -152,6 +154,7 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork, responseAssembler: responseAssembler, peerTaskQueue: peerTaskQueue, peerManager: peerManager, + incomingRequestQueuedHooks: requestQueuedHooks, incomingRequestHooks: incomingRequestHooks, outgoingBlockHooks: outgoingBlockHooks, requestUpdatedHooks: requestUpdatedHooks, @@ -190,6 +193,12 @@ func (gs *GraphSync) RegisterIncomingRequestHook(hook graphsync.OnIncomingReques return gs.incomingRequestHooks.Register(hook) } +// RegisterIncomingRequestQueuedHook adds a hook that runs when a new incoming request is added +// to the responder's task queue. +func (gs *GraphSync) RegisterIncomingRequestQueuedHook(hook graphsync.OnIncomingRequestQueuedHook) graphsync.UnregisterHookFunc { + return gs.incomingRequestQueuedHooks.Register(hook) +} + // RegisterIncomingResponseHook adds a hook that runs when a response is received func (gs *GraphSync) RegisterIncomingResponseHook(hook graphsync.OnIncomingResponseHook) graphsync.UnregisterHookFunc { return gs.incomingResponseHooks.Register(hook) @@ -285,6 +294,11 @@ func (gs *GraphSync) CancelResponse(p peer.ID, requestID graphsync.RequestID) er return gs.responseManager.CancelResponse(p, requestID) } +// CancelRequest cancels an in progress request +func (gs *GraphSync) CancelRequest(ctx context.Context, requestID graphsync.RequestID) error { + return gs.requestManager.CancelRequest(ctx, requestID) +} + type graphSyncReceiver GraphSync func (gsr *graphSyncReceiver) graphSync() *GraphSync { diff --git a/impl/graphsync_test.go b/impl/graphsync_test.go index 2a7d1f08..c7177bf2 100644 --- a/impl/graphsync_test.go +++ b/impl/graphsync_test.go @@ -616,7 +616,7 @@ func TestNetworkDisconnect(t *testing.T) { testutil.AssertReceive(ctx, t, networkError, &err, "should receive network error") testutil.AssertReceive(ctx, t, errChan, &err, "should receive an error") - require.EqualError(t, err, graphsync.RequestContextCancelledErr{}.Error()) + require.EqualError(t, err, graphsync.RequestClientCancelledErr{}.Error()) testutil.AssertReceive(ctx, t, receiverError, &err, "should receive an error on receiver side") } @@ -652,7 +652,7 @@ func TestConnectFail(t *testing.T) { var err error testutil.AssertReceive(ctx, t, reqNetworkError, &err, "should receive network error") testutil.AssertReceive(ctx, t, errChan, &err, "should receive an error") - require.EqualError(t, err, graphsync.RequestContextCancelledErr{}.Error()) + require.EqualError(t, err, graphsync.RequestClientCancelledErr{}.Error()) } func TestGraphsyncRoundTripAlternatePersistenceAndNodes(t *testing.T) { diff --git a/ipldutil/traverser.go b/ipldutil/traverser.go index 8623c52a..ff569309 100644 --- a/ipldutil/traverser.go +++ b/ipldutil/traverser.go @@ -11,6 +11,12 @@ import ( "github.com/ipld/go-ipld-prime/traversal/selector" ) +/* TODO: This traverser creates an extra go-routine and is quite complicated, in order to give calling code control of +a selector traversal. If it were implemented inside of go-ipld-primes traversal library, with access to private functions, +it could be done without an extra go-routine, avoiding the possibility of races and simplifying implementation. This has +been documented here: https://github.com/ipld/go-ipld-prime/issues/213 -- and when this issue is implemented, this traverser +can go away */ + var defaultLinkSystem = cidlink.DefaultLinkSystem() var defaultVisitor traversal.AdvVisitFn = func(traversal.Progress, ipld.Node, traversal.VisitReason) error { return nil } @@ -45,6 +51,8 @@ type Traverser interface { Error(err error) // Shutdown cancels the traversal Shutdown(ctx context.Context) + // NBlocksTraversed returns the number of blocks successfully traversed + NBlocksTraversed() int } type state struct { @@ -64,6 +72,7 @@ type nextResponse struct { func (tb TraversalBuilder) Start(parentCtx context.Context) Traverser { ctx, cancel := context.WithCancel(parentCtx) t := &traverser{ + blocksCount: 0, parentCtx: parentCtx, ctx: ctx, cancel: cancel, @@ -100,6 +109,7 @@ func (tb TraversalBuilder) Start(parentCtx context.Context) Traverser { // traverser is a class to perform a selector traversal that stops every time a new block is loaded // and waits for manual input (in the form of advance or error) type traverser struct { + blocksCount int parentCtx context.Context ctx context.Context cancel func() @@ -118,6 +128,10 @@ type traverser struct { stopped chan struct{} } +func (t *traverser) NBlocksTraversed() int { + return t.blocksCount +} + func (t *traverser) loader(lnkCtx ipld.LinkContext, lnk ipld.Link) (io.Reader, error) { select { case <-t.ctx.Done(): @@ -159,6 +173,7 @@ func (t *traverser) writeDone(err error) { func (t *traverser) start() { select { case <-t.ctx.Done(): + close(t.stopped) return case t.awaitRequest <- struct{}{}: } @@ -218,16 +233,20 @@ func (t *traverser) Advance(reader io.Reader) error { if isComplete { return errors.New("cannot advance when done") } + select { case <-t.ctx.Done(): return ContextCancelError{} case t.awaitRequest <- struct{}{}: } + select { case <-t.ctx.Done(): return ContextCancelError{} case t.responses <- nextResponse{reader, nil}: } + + t.blocksCount++ return nil } diff --git a/ipldutil/traverser_test.go b/ipldutil/traverser_test.go index c646d947..c34c62d6 100644 --- a/ipldutil/traverser_test.go +++ b/ipldutil/traverser_test.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "testing" + "time" blocks "github.com/ipfs/go-block-format" ipld "github.com/ipld/go-ipld-prime" @@ -21,6 +22,22 @@ import ( func TestTraverser(t *testing.T) { ctx := context.Background() + t.Run("started with shutdown context, then shutdown", func(t *testing.T) { + cancelledCtx, cancel := context.WithCancel(ctx) + cancel() + testdata := testutil.NewTestIPLDTree() + ssb := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any) + sel := ssb.ExploreRecursive(selector.RecursionLimitNone(), ssb.ExploreAll(ssb.ExploreRecursiveEdge())).Node() + traverser := TraversalBuilder{ + Root: testdata.RootNodeLnk, + Selector: sel, + }.Start(cancelledCtx) + timeoutCtx, cancel := context.WithTimeout(ctx, 2*time.Second) + defer cancel() + traverser.Shutdown(timeoutCtx) + require.NoError(t, timeoutCtx.Err()) + }) + t.Run("traverses correctly, simple struct", func(t *testing.T) { testdata := testutil.NewTestIPLDTree() ssb := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any) diff --git a/message/message.go b/message/message.go index 55435232..8d269ef1 100644 --- a/message/message.go +++ b/message/message.go @@ -33,7 +33,8 @@ func IsTerminalFailureCode(status graphsync.ResponseStatusCode) bool { status == graphsync.RequestFailedContentNotFound || status == graphsync.RequestFailedLegal || status == graphsync.RequestFailedUnknown || - status == graphsync.RequestCancelled + status == graphsync.RequestCancelled || + status == graphsync.RequestRejected } // IsTerminalResponseCode returns true if the response code signals diff --git a/messagequeue/messagequeue.go b/messagequeue/messagequeue.go index cb9f54d4..de7db2d1 100644 --- a/messagequeue/messagequeue.go +++ b/messagequeue/messagequeue.go @@ -7,7 +7,7 @@ import ( "sync" "time" - logging "github.com/ipfs/go-log" + logging "github.com/ipfs/go-log/v2" "github.com/libp2p/go-libp2p-core/peer" gsmsg "github.com/ipfs/go-graphsync/message" diff --git a/network/libp2p_impl.go b/network/libp2p_impl.go index 727ab9f0..9a29bea9 100644 --- a/network/libp2p_impl.go +++ b/network/libp2p_impl.go @@ -6,7 +6,7 @@ import ( "io" "time" - logging "github.com/ipfs/go-log" + logging "github.com/ipfs/go-log/v2" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" diff --git a/requestmanager/asyncloader/responsecache/responsecache.go b/requestmanager/asyncloader/responsecache/responsecache.go index 132b5e0e..cbbba358 100644 --- a/requestmanager/asyncloader/responsecache/responsecache.go +++ b/requestmanager/asyncloader/responsecache/responsecache.go @@ -5,7 +5,7 @@ import ( "sync" blocks "github.com/ipfs/go-block-format" - logging "github.com/ipfs/go-log" + logging "github.com/ipfs/go-log/v2" "github.com/ipld/go-ipld-prime" cidlink "github.com/ipld/go-ipld-prime/linking/cid" diff --git a/requestmanager/executor/executor.go b/requestmanager/executor/executor.go index b2ee368b..86accb82 100644 --- a/requestmanager/executor/executor.go +++ b/requestmanager/executor/executor.go @@ -39,7 +39,7 @@ type ExecutionEnv struct { type RequestExecution struct { Ctx context.Context P peer.ID - NetworkError chan error + TerminalError chan error Request gsmsg.GraphSyncRequest LastResponse *atomic.Value DoNotSendCids *cid.Set @@ -55,7 +55,7 @@ func (ee ExecutionEnv) Start(re RequestExecution) (chan graphsync.ResponseProgre inProgressErr: make(chan error), ctx: re.Ctx, p: re.P, - networkError: re.NetworkError, + terminalError: re.TerminalError, request: re.Request, lastResponse: re.LastResponse, doNotSendCids: re.DoNotSendCids, @@ -74,7 +74,7 @@ type requestExecutor struct { inProgressErr chan error ctx context.Context p peer.ID - networkError chan error + terminalError chan error request gsmsg.GraphSyncRequest lastResponse *atomic.Value nodeStyleChooser traversal.LinkTargetNodePrototypeChooser @@ -155,9 +155,9 @@ func (re *requestExecutor) run() { } } select { - case networkError := <-re.networkError: + case terminalError := <-re.terminalError: select { - case re.inProgressErr <- networkError: + case re.inProgressErr <- terminalError: case <-re.env.Ctx.Done(): } default: diff --git a/requestmanager/requestmanager.go b/requestmanager/requestmanager.go index bc169f38..b139fd46 100644 --- a/requestmanager/requestmanager.go +++ b/requestmanager/requestmanager.go @@ -5,10 +5,12 @@ import ( "errors" "fmt" "sync/atomic" + "time" + "github.com/hannahhoward/go-pubsub" blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" - logging "github.com/ipfs/go-log" + logging "github.com/ipfs/go-log/v2" "github.com/ipld/go-ipld-prime" cidlink "github.com/ipld/go-ipld-prime/linking/cid" "github.com/ipld/go-ipld-prime/traversal/selector" @@ -37,13 +39,15 @@ const ( type inProgressRequestStatus struct { ctx context.Context + startTime time.Time cancelFn func() p peer.ID - networkError chan error + terminalError chan error resumeMessages chan []graphsync.ExtensionData pauseMessages chan struct{} paused bool lastResponse atomic.Value + onTerminated []chan error } // PeerHandler is an interface that can send requests to peers @@ -65,13 +69,15 @@ type AsyncLoader interface { // RequestManager tracks outgoing requests and processes incoming reponses // to them. type RequestManager struct { - ctx context.Context - cancel func() - messages chan requestManagerMessage - peerHandler PeerHandler - rc *responseCollector - asyncLoader AsyncLoader - linkSystem ipld.LinkSystem + ctx context.Context + cancel func() + messages chan requestManagerMessage + peerHandler PeerHandler + rc *responseCollector + asyncLoader AsyncLoader + disconnectNotif *pubsub.PubSub + linkSystem ipld.LinkSystem + // dont touch out side of run loop nextRequestID graphsync.RequestID inProgressRequestStatuses map[graphsync.RequestID]*inProgressRequestStatus @@ -114,6 +120,7 @@ func New(ctx context.Context, ctx: ctx, cancel: cancel, asyncLoader: asyncLoader, + disconnectNotif: pubsub.New(disconnectDispatcher), linkSystem: linkSystem, rc: newResponseCollector(ctx), messages: make(chan requestManagerMessage, 16), @@ -132,6 +139,7 @@ func (rm *RequestManager) SetDelegate(peerHandler PeerHandler) { type inProgressRequest struct { requestID graphsync.RequestID + request gsmsg.GraphSyncRequest incoming chan graphsync.ResponseProgress incomingError chan error } @@ -170,6 +178,11 @@ func (rm *RequestManager) SendRequest(ctx context.Context, case receivedInProgressRequest = <-inProgressRequestChan: } + // If the connection to the peer is disconnected, fire an error + unsub := rm.listenForDisconnect(p, func(neterr error) { + rm.networkErrorListeners.NotifyNetworkErrorListeners(p, receivedInProgressRequest.request, neterr) + }) + return rm.rc.collectResponses(ctx, receivedInProgressRequest.incoming, receivedInProgressRequest.incomingError, @@ -177,7 +190,34 @@ func (rm *RequestManager) SendRequest(ctx context.Context, rm.cancelRequest(receivedInProgressRequest.requestID, receivedInProgressRequest.incoming, receivedInProgressRequest.incomingError) - }) + }, + // Once the request has completed, stop listening for disconnect events + unsub, + ) +} + +// Dispatch the Disconnect event to subscribers +func disconnectDispatcher(p pubsub.Event, subscriberFn pubsub.SubscriberFn) error { + listener := subscriberFn.(func(peer.ID)) + listener(p.(peer.ID)) + return nil +} + +// Listen for the Disconnect event for the given peer +func (rm *RequestManager) listenForDisconnect(p peer.ID, onDisconnect func(neterr error)) func() { + // Subscribe to Disconnect notifications + return rm.disconnectNotif.Subscribe(func(evtPeer peer.ID) { + // If the peer is the one we're interested in, call the listener + if evtPeer == p { + onDisconnect(fmt.Errorf("disconnected from peer %s", p)) + } + }) +} + +// Disconnected is called when a peer disconnects +func (rm *RequestManager) Disconnected(p peer.ID) { + // Notify any listeners that a peer has disconnected + _ = rm.disconnectNotif.Publish(p) } func (rm *RequestManager) emptyResponse() (chan graphsync.ResponseProgress, chan error) { @@ -198,8 +238,10 @@ func (rm *RequestManager) singleErrorResponse(err error) (chan graphsync.Respons } type cancelRequestMessage struct { - requestID graphsync.RequestID - isPause bool + requestID graphsync.RequestID + isPause bool + onTerminated chan error + terminalError error } func (rm *RequestManager) cancelRequest(requestID graphsync.RequestID, @@ -208,7 +250,7 @@ func (rm *RequestManager) cancelRequest(requestID graphsync.RequestID, cancelMessageChannel := rm.messages for cancelMessageChannel != nil || incomingResponses != nil || incomingErrors != nil { select { - case cancelMessageChannel <- &cancelRequestMessage{requestID, false}: + case cancelMessageChannel <- &cancelRequestMessage{requestID, false, nil, nil}: cancelMessageChannel = nil // clear out any remaining responses, in case and "incoming reponse" // messages get processed before our cancel message @@ -226,6 +268,12 @@ func (rm *RequestManager) cancelRequest(requestID graphsync.RequestID, } } +// CancelRequest cancels the given request ID and waits for the request to terminate +func (rm *RequestManager) CancelRequest(ctx context.Context, requestID graphsync.RequestID) error { + terminated := make(chan error, 1) + return rm.sendSyncMessage(&cancelRequestMessage{requestID, false, terminated, graphsync.RequestClientCancelledErr{}}, terminated, ctx.Done()) +} + type processResponseMessage struct { p peer.ID responses []gsmsg.GraphSyncResponse @@ -252,7 +300,7 @@ type unpauseRequestMessage struct { // Can also send extensions with unpause func (rm *RequestManager) UnpauseRequest(requestID graphsync.RequestID, extensions ...graphsync.ExtensionData) error { response := make(chan error, 1) - return rm.sendSyncMessage(&unpauseRequestMessage{requestID, extensions, response}, response) + return rm.sendSyncMessage(&unpauseRequestMessage{requestID, extensions, response}, response, nil) } type pauseRequestMessage struct { @@ -263,18 +311,22 @@ type pauseRequestMessage struct { // PauseRequest pauses an in progress request (may take 1 or more blocks to process) func (rm *RequestManager) PauseRequest(requestID graphsync.RequestID) error { response := make(chan error, 1) - return rm.sendSyncMessage(&pauseRequestMessage{requestID, response}, response) + return rm.sendSyncMessage(&pauseRequestMessage{requestID, response}, response, nil) } -func (rm *RequestManager) sendSyncMessage(message requestManagerMessage, response chan error) error { +func (rm *RequestManager) sendSyncMessage(message requestManagerMessage, response chan error, done <-chan struct{}) error { select { case <-rm.ctx.Done(): return errors.New("context cancelled") + case <-done: + return errors.New("context cancelled") case rm.messages <- message: } select { case <-rm.ctx.Done(): return errors.New("context cancelled") + case <-done: + return errors.New("context cancelled") case err := <-response: return err } @@ -315,17 +367,21 @@ type terminateRequestMessage struct { requestID graphsync.RequestID } -func (nrm *newRequestMessage) setupRequest(requestID graphsync.RequestID, rm *RequestManager) (chan graphsync.ResponseProgress, chan error) { +func (nrm *newRequestMessage) setupRequest(requestID graphsync.RequestID, rm *RequestManager) (gsmsg.GraphSyncRequest, chan graphsync.ResponseProgress, chan error) { + log.Infow("graphsync request initiated", "request id", requestID, "peer", nrm.p, "root", nrm.root) + request, hooksResult, err := rm.validateRequest(requestID, nrm.p, nrm.root, nrm.selector, nrm.extensions) if err != nil { - return rm.singleErrorResponse(err) + rp, err := rm.singleErrorResponse(err) + return request, rp, err } doNotSendCidsData, has := request.Extension(graphsync.ExtensionDoNotSendCIDs) var doNotSendCids *cid.Set if has { doNotSendCids, err = cidset.DecodeCidSet(doNotSendCidsData) if err != nil { - return rm.singleErrorResponse(err) + rp, err := rm.singleErrorResponse(err) + return request, rp, err } } else { doNotSendCids = cid.NewSet() @@ -334,9 +390,9 @@ func (nrm *newRequestMessage) setupRequest(requestID graphsync.RequestID, rm *Re p := nrm.p resumeMessages := make(chan []graphsync.ExtensionData, 1) pauseMessages := make(chan struct{}, 1) - networkError := make(chan error, 1) + terminalError := make(chan error, 1) requestStatus := &inProgressRequestStatus{ - ctx: ctx, cancelFn: cancel, p: p, resumeMessages: resumeMessages, pauseMessages: pauseMessages, networkError: networkError, + ctx: ctx, startTime: time.Now(), cancelFn: cancel, p: p, resumeMessages: resumeMessages, pauseMessages: pauseMessages, terminalError: terminalError, } lastResponse := &requestStatus.lastResponse lastResponse.Store(gsmsg.NewResponse(request.ID(), graphsync.RequestAcknowledged)) @@ -353,21 +409,21 @@ func (nrm *newRequestMessage) setupRequest(requestID graphsync.RequestID, rm *Re Ctx: ctx, P: p, Request: request, - NetworkError: networkError, + TerminalError: terminalError, LastResponse: lastResponse, DoNotSendCids: doNotSendCids, NodePrototypeChooser: hooksResult.CustomChooser, ResumeMessages: resumeMessages, PauseMessages: pauseMessages, }) - return incoming, incomingError + return request, incoming, incomingError } func (nrm *newRequestMessage) handle(rm *RequestManager) { var ipr inProgressRequest ipr.requestID = rm.nextRequestID rm.nextRequestID++ - ipr.incoming, ipr.incomingError = nrm.setupRequest(ipr.requestID, rm) + ipr.request, ipr.incoming, ipr.incomingError = nrm.setupRequest(ipr.requestID, rm) select { case nrm.inProgressRequestChan <- ipr: @@ -376,16 +432,44 @@ func (nrm *newRequestMessage) handle(rm *RequestManager) { } func (trm *terminateRequestMessage) handle(rm *RequestManager) { + ipr, ok := rm.inProgressRequestStatuses[trm.requestID] + if ok { + log.Infow("graphsync request complete", "request id", trm.requestID, "peer", ipr.p, "total time", time.Since(ipr.startTime)) + } delete(rm.inProgressRequestStatuses, trm.requestID) rm.asyncLoader.CleanupRequest(trm.requestID) + if ok { + for _, onTerminated := range ipr.onTerminated { + select { + case <-rm.ctx.Done(): + case onTerminated <- nil: + } + } + } } func (crm *cancelRequestMessage) handle(rm *RequestManager) { inProgressRequestStatus, ok := rm.inProgressRequestStatuses[crm.requestID] if !ok { + if crm.onTerminated != nil { + select { + case crm.onTerminated <- graphsync.RequestNotFoundErr{}: + case <-rm.ctx.Done(): + } + } return } + if crm.onTerminated != nil { + inProgressRequestStatus.onTerminated = append(inProgressRequestStatus.onTerminated, crm.onTerminated) + } + if crm.terminalError != nil { + select { + case inProgressRequestStatus.terminalError <- crm.terminalError: + default: + } + } + rm.sendRequest(inProgressRequestStatus.p, gsmsg.CancelRequest(crm.requestID)) if crm.isPause { inProgressRequestStatus.paused = true @@ -445,8 +529,8 @@ func (rm *RequestManager) processExtensionsForResponse(p peer.ID, response gsmsg } responseError := rm.generateResponseErrorFromStatus(graphsync.RequestFailedUnknown) select { - case requestStatus.networkError <- responseError: - case <-requestStatus.ctx.Done(): + case requestStatus.terminalError <- responseError: + default: } rm.sendRequest(p, gsmsg.CancelRequest(response.RequestID())) requestStatus.cancelFn() @@ -462,8 +546,8 @@ func (rm *RequestManager) processTerminations(responses []gsmsg.GraphSyncRespons requestStatus := rm.inProgressRequestStatuses[response.RequestID()] responseError := rm.generateResponseErrorFromStatus(response.Status()) select { - case requestStatus.networkError <- responseError: - case <-requestStatus.ctx.Done(): + case requestStatus.terminalError <- responseError: + default: } requestStatus.cancelFn() } @@ -499,7 +583,7 @@ func (rm *RequestManager) processBlockHooks(p peer.ID, response graphsync.Respon _, isPause := result.Err.(hooks.ErrPaused) select { case <-rm.ctx.Done(): - case rm.messages <- &cancelRequestMessage{response.RequestID(), isPause}: + case rm.messages <- &cancelRequestMessage{response.RequestID(), isPause, nil, nil}: } } return result.Err @@ -579,7 +663,7 @@ func (rm *RequestManager) sendRequest(p peer.ID, request gsmsg.GraphSyncRequest) func (urm *unpauseRequestMessage) unpause(rm *RequestManager) error { inProgressRequestStatus, ok := rm.inProgressRequestStatuses[urm.id] if !ok { - return errors.New("request not found") + return graphsync.RequestNotFoundErr{} } if !inProgressRequestStatus.paused { return errors.New("request is not paused") @@ -605,7 +689,7 @@ func (urm *unpauseRequestMessage) handle(rm *RequestManager) { func (prm *pauseRequestMessage) pause(rm *RequestManager) error { inProgressRequestStatus, ok := rm.inProgressRequestStatuses[prm.id] if !ok { - return errors.New("request not found") + return graphsync.RequestNotFoundErr{} } if inProgressRequestStatus.paused { return errors.New("request is already paused") diff --git a/requestmanager/requestmanager_test.go b/requestmanager/requestmanager_test.go index 24a2e51f..c2af7d2c 100644 --- a/requestmanager/requestmanager_test.go +++ b/requestmanager/requestmanager_test.go @@ -212,9 +212,61 @@ func TestCancelRequestInProgress(t *testing.T) { errors := testutil.CollectErrors(requestCtx, t, returnedErrorChan1) require.Len(t, errors, 1) - _, ok := errors[0].(graphsync.RequestContextCancelledErr) + _, ok := errors[0].(graphsync.RequestClientCancelledErr) require.True(t, ok) } +func TestCancelRequestImperativeNoMoreBlocks(t *testing.T) { + ctx := context.Background() + td := newTestData(ctx, t) + requestCtx, cancel := context.WithTimeout(ctx, time.Second) + defer cancel() + peers := testutil.GeneratePeers(1) + + postCancel := make(chan struct{}, 1) + loadPostCancel := make(chan struct{}, 1) + td.fal.OnAsyncLoad(func(graphsync.RequestID, ipld.Link, <-chan types.AsyncLoadResult) { + select { + case <-postCancel: + loadPostCancel <- struct{}{} + default: + } + }) + + _, returnedErrorChan1 := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector()) + + requestRecords := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1) + + go func() { + firstBlocks := td.blockChain.Blocks(0, 3) + firstMetadata := encodedMetadataForBlocks(t, firstBlocks, true) + firstResponses := []gsmsg.GraphSyncResponse{ + gsmsg.NewResponse(requestRecords[0].gsr.ID(), graphsync.PartialResponse, firstMetadata), + } + td.requestManager.ProcessResponses(peers[0], firstResponses, firstBlocks) + td.fal.SuccessResponseOn(requestRecords[0].gsr.ID(), firstBlocks) + }() + + timeoutCtx, timeoutCancel := context.WithTimeout(ctx, time.Second) + defer timeoutCancel() + err := td.requestManager.CancelRequest(timeoutCtx, requestRecords[0].gsr.ID()) + require.NoError(t, err) + postCancel <- struct{}{} + + rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0] + + require.True(t, rr.gsr.IsCancel()) + require.Equal(t, requestRecords[0].gsr.ID(), rr.gsr.ID()) + + errors := testutil.CollectErrors(requestCtx, t, returnedErrorChan1) + require.Len(t, errors, 1) + _, ok := errors[0].(graphsync.RequestClientCancelledErr) + require.True(t, ok) + select { + case <-loadPostCancel: + t.Fatalf("Loaded block after cancel") + case <-requestCtx.Done(): + } +} func TestCancelManagerExitsGracefully(t *testing.T) { ctx := context.Background() @@ -352,6 +404,42 @@ func TestRequestReturnsMissingBlocks(t *testing.T) { require.NotEqual(t, len(errs), 0, "did not send errors") } +func TestDisconnectNotification(t *testing.T) { + ctx := context.Background() + td := newTestData(ctx, t) + requestCtx, cancel := context.WithTimeout(ctx, time.Second) + defer cancel() + peers := testutil.GeneratePeers(2) + + // Listen for network errors + networkErrors := make(chan peer.ID, 1) + td.networkErrorListeners.Register(func(p peer.ID, request graphsync.RequestData, err error) { + networkErrors <- p + }) + + // Send a request to the target peer + targetPeer := peers[0] + td.requestManager.SendRequest(requestCtx, targetPeer, td.blockChain.TipLink, td.blockChain.Selector()) + + // Disconnect a random peer, should not fire any events + randomPeer := peers[1] + td.requestManager.Disconnected(randomPeer) + select { + case <-networkErrors: + t.Fatal("should not fire network error when unrelated peer disconnects") + default: + } + + // Disconnect the target peer, should fire a network error + td.requestManager.Disconnected(targetPeer) + select { + case p := <-networkErrors: + require.Equal(t, p, targetPeer) + default: + t.Fatal("should fire network error when peer disconnects") + } +} + func TestEncodingExtensions(t *testing.T) { ctx := context.Background() td := newTestData(ctx, t) diff --git a/requestmanager/responsecollector.go b/requestmanager/responsecollector.go index 766b9fed..f348c61c 100644 --- a/requestmanager/responsecollector.go +++ b/requestmanager/responsecollector.go @@ -18,7 +18,9 @@ func (rc *responseCollector) collectResponses( requestCtx context.Context, incomingResponses <-chan graphsync.ResponseProgress, incomingErrors <-chan error, - cancelRequest func()) (<-chan graphsync.ResponseProgress, <-chan error) { + cancelRequest func(), + onComplete func(), +) (<-chan graphsync.ResponseProgress, <-chan error) { returnedResponses := make(chan graphsync.ResponseProgress) returnedErrors := make(chan error) @@ -26,6 +28,7 @@ func (rc *responseCollector) collectResponses( go func() { var receivedResponses []graphsync.ResponseProgress defer close(returnedResponses) + defer onComplete() outgoingResponses := func() chan<- graphsync.ResponseProgress { if len(receivedResponses) == 0 { return nil @@ -82,7 +85,7 @@ func (rc *responseCollector) collectResponses( case <-requestCtx.Done(): select { case <-rc.ctx.Done(): - case returnedErrors <- graphsync.RequestContextCancelledErr{}: + case returnedErrors <- graphsync.RequestClientCancelledErr{}: } return case err, ok := <-incomingErrors: @@ -94,7 +97,7 @@ func (rc *responseCollector) collectResponses( case <-requestCtx.Done(): select { case <-rc.ctx.Done(): - case returnedErrors <- graphsync.RequestContextCancelledErr{}: + case returnedErrors <- graphsync.RequestClientCancelledErr{}: } default: } diff --git a/requestmanager/responsecollector_test.go b/requestmanager/responsecollector_test.go index cb312606..5108f10c 100644 --- a/requestmanager/responsecollector_test.go +++ b/requestmanager/responsecollector_test.go @@ -26,7 +26,7 @@ func TestBufferingResponseProgress(t *testing.T) { cancelRequest := func() {} outgoingResponses, outgoingErrors := rc.collectResponses( - requestCtx, incomingResponses, incomingErrors, cancelRequest) + requestCtx, incomingResponses, incomingErrors, cancelRequest, func() {}) blockStore := make(map[ipld.Link][]byte) persistence := testutil.NewTestStore(blockStore) diff --git a/responsemanager/hooks/requesthook.go b/responsemanager/hooks/requesthook.go index d3e98e20..0c99158d 100644 --- a/responsemanager/hooks/requesthook.go +++ b/responsemanager/hooks/requesthook.go @@ -16,6 +16,40 @@ type PersistenceOptions interface { GetLinkSystem(name string) (ipld.LinkSystem, bool) } +// IncomingRequestQueuedHooks is a set of incoming request queued hooks that can be processed. +type IncomingRequestQueuedHooks struct { + pubSub *pubsub.PubSub +} + +type internalRequestQueuedHookEvent struct { + p peer.ID + request graphsync.RequestData +} + +func requestQueuedHookDispatcher(event pubsub.Event, subscriberFn pubsub.SubscriberFn) error { + ie := event.(internalRequestQueuedHookEvent) + hook := subscriberFn.(graphsync.OnIncomingRequestQueuedHook) + hook(ie.p, ie.request) + return nil +} + +// Register registers an extension to process new incoming requests. +func (rqh *IncomingRequestQueuedHooks) Register(hook graphsync.OnIncomingRequestQueuedHook) graphsync.UnregisterHookFunc { + return graphsync.UnregisterHookFunc(rqh.pubSub.Subscribe(hook)) +} + +// NewRequestQueuedHooks returns a new list of incoming request queued hooks. +func NewRequestQueuedHooks() *IncomingRequestQueuedHooks { + return &IncomingRequestQueuedHooks{ + pubSub: pubsub.New(requestQueuedHookDispatcher), + } +} + +// ProcessRequestQueuedHooks runs request hooks against an incoming queued request. +func (rqh *IncomingRequestQueuedHooks) ProcessRequestQueuedHooks(p peer.ID, request graphsync.RequestData) { + _ = rqh.pubSub.Publish(internalRequestQueuedHookEvent{p, request}) +} + // IncomingRequestHooks is a set of incoming request hooks that can be processed type IncomingRequestHooks struct { persistenceOptions PersistenceOptions diff --git a/responsemanager/queryexecutor.go b/responsemanager/queryexecutor.go index cecdc3ea..44d1a718 100644 --- a/responsemanager/queryexecutor.go +++ b/responsemanager/queryexecutor.go @@ -6,14 +6,10 @@ import ( "strings" "time" - "github.com/ipfs/go-cid" ipld "github.com/ipld/go-ipld-prime" - cidlink "github.com/ipld/go-ipld-prime/linking/cid" "github.com/libp2p/go-libp2p-core/peer" "github.com/ipfs/go-graphsync" - "github.com/ipfs/go-graphsync/cidset" - "github.com/ipfs/go-graphsync/dedupkey" "github.com/ipfs/go-graphsync/ipldutil" gsmsg "github.com/ipfs/go-graphsync/message" "github.com/ipfs/go-graphsync/notifications" @@ -26,12 +22,10 @@ var errCancelledByCommand = errors.New("response cancelled by responder") // TODO: Move this into a seperate module and fully seperate from the ResponseManager type queryExecutor struct { - requestHooks RequestHooks blockHooks BlockHooks updateHooks UpdateHooks cancelledListeners CancelledListeners responseAssembler ResponseAssembler - linkSystem ipld.LinkSystem queryQueue QueryQueue messages chan responseManagerMessage ctx context.Context @@ -57,9 +51,8 @@ func (qe *queryExecutor) processQueriesWorker() { } } for _, task := range tasks { - key := task.Topic.(responseKey) select { - case qe.messages <- &responseDataRequest{key, taskDataChan}: + case qe.messages <- &startTaskRequest{task, taskDataChan}: case <-qe.ctx.Done(): return } @@ -72,136 +65,19 @@ func (qe *queryExecutor) processQueriesWorker() { log.Info("Empty task on peer request stack") continue } - log.Debugw("beginning response execution", "id", key.requestID, "peer", key.p.String(), "root_cid", taskData.request.Root().String()) - status, err := qe.executeTask(key, taskData) + log.Debugw("beginning response execution", "id", taskData.request.ID(), "peer", pid.String(), "root_cid", taskData.request.Root().String()) + status, err := qe.executeQuery(pid, taskData.request, taskData.loader, taskData.traverser, taskData.signals, taskData.subscriber) isCancelled := err != nil && isContextErr(err) if isCancelled { - qe.cancelledListeners.NotifyCancelledListeners(key.p, taskData.request) + qe.cancelledListeners.NotifyCancelledListeners(pid, taskData.request) } select { - case qe.messages <- &finishTaskRequest{key, status, err}: + case qe.messages <- &finishTaskRequest{task, status, err}: case <-qe.ctx.Done(): } - log.Debugw("finishing response execution", "id", key.requestID, "peer", key.p.String(), "root_cid", taskData.request.Root().String()) + log.Debugw("finishing response execution", "id", taskData.request.ID(), "peer", pid.String(), "root_cid", taskData.request.Root().String()) } - qe.queryQueue.TasksDone(pid, tasks...) - - } - -} - -func (qe *queryExecutor) executeTask(key responseKey, taskData responseTaskData) (graphsync.ResponseStatusCode, error) { - var err error - loader := taskData.loader - traverser := taskData.traverser - if loader == nil || traverser == nil { - var isPaused bool - loader, traverser, isPaused, err = qe.prepareQuery(taskData.ctx, key.p, taskData.request, taskData.signals, taskData.subscriber) - if err != nil { - return graphsync.RequestFailedUnknown, err - } - select { - case <-qe.ctx.Done(): - return graphsync.RequestFailedUnknown, errors.New("context cancelled") - case qe.messages <- &setResponseDataRequest{key, loader, traverser}: - } - if isPaused { - return graphsync.RequestPaused, hooks.ErrPaused{} - } - } - return qe.executeQuery(key.p, taskData.request, loader, traverser, taskData.signals, taskData.subscriber) -} - -func (qe *queryExecutor) prepareQuery(ctx context.Context, - p peer.ID, - request gsmsg.GraphSyncRequest, signals signals, sub *notifications.TopicDataSubscriber) (ipld.BlockReadOpener, ipldutil.Traverser, bool, error) { - result := qe.requestHooks.ProcessRequestHooks(p, request) - var transactionError error - var isPaused bool - failNotifee := notifications.Notifee{Data: graphsync.RequestFailedUnknown, Subscriber: sub} - err := qe.responseAssembler.Transaction(p, request.ID(), func(rb responseassembler.ResponseBuilder) error { - for _, extension := range result.Extensions { - rb.SendExtensionData(extension) - } - if result.Err != nil || !result.IsValidated { - rb.FinishWithError(graphsync.RequestFailedUnknown) - rb.AddNotifee(failNotifee) - transactionError = errors.New("request not valid") - } else if result.IsPaused { - rb.PauseRequest() - isPaused = true - } - return nil - }) - if err != nil { - return nil, nil, false, err - } - if transactionError != nil { - return nil, nil, false, transactionError - } - if err := qe.processDedupByKey(request, p, failNotifee); err != nil { - return nil, nil, false, err - } - if err := qe.processDoNoSendCids(request, p, failNotifee); err != nil { - return nil, nil, false, err - } - rootLink := cidlink.Link{Cid: request.Root()} - linkSystem := result.CustomLinkSystem - if linkSystem.StorageReadOpener == nil { - linkSystem = qe.linkSystem - } - traverser := ipldutil.TraversalBuilder{ - Root: rootLink, - Selector: request.Selector(), - LinkSystem: linkSystem, - Chooser: result.CustomChooser, - }.Start(ctx) - - return linkSystem.StorageReadOpener, traverser, isPaused, nil -} - -func (qe *queryExecutor) processDedupByKey(request gsmsg.GraphSyncRequest, p peer.ID, failNotifee notifications.Notifee) error { - dedupData, has := request.Extension(graphsync.ExtensionDeDupByKey) - if !has { - return nil - } - key, err := dedupkey.DecodeDedupKey(dedupData) - if err != nil { - _ = qe.responseAssembler.Transaction(p, request.ID(), func(rb responseassembler.ResponseBuilder) error { - rb.FinishWithError(graphsync.RequestFailedUnknown) - rb.AddNotifee(failNotifee) - return nil - }) - return err - } - qe.responseAssembler.DedupKey(p, request.ID(), key) - return nil -} - -func (qe *queryExecutor) processDoNoSendCids(request gsmsg.GraphSyncRequest, p peer.ID, failNotifee notifications.Notifee) error { - doNotSendCidsData, has := request.Extension(graphsync.ExtensionDoNotSendCIDs) - if !has { - return nil - } - cidSet, err := cidset.DecodeCidSet(doNotSendCidsData) - if err != nil { - _ = qe.responseAssembler.Transaction(p, request.ID(), func(rb responseassembler.ResponseBuilder) error { - rb.FinishWithError(graphsync.RequestFailedUnknown) - rb.AddNotifee(failNotifee) - return nil - }) - return err - } - links := make([]ipld.Link, 0, cidSet.Len()) - err = cidSet.ForEach(func(c cid.Cid) error { - links = append(links, cidlink.Link{Cid: c}) - return nil - }) - if err != nil { - return err } - qe.responseAssembler.IgnoreBlocks(p, request.ID(), links) - return nil } func (qe *queryExecutor) executeQuery( @@ -255,12 +131,14 @@ func (qe *queryExecutor) executeQuery( code = graphsync.RequestFailedUnknown return nil } - if err == errCancelledByCommand { + if err == runtraversal.ErrFirstBlockLoad { + code = graphsync.RequestFailedContentNotFound + } else if err == errCancelledByCommand { code = graphsync.RequestCancelled } else { code = graphsync.RequestFailedUnknown } - rb.FinishWithError(graphsync.RequestCancelled) + rb.FinishWithError(code) } else { code = rb.FinishRequest() } diff --git a/responsemanager/querypreparer.go b/responsemanager/querypreparer.go new file mode 100644 index 00000000..067786e9 --- /dev/null +++ b/responsemanager/querypreparer.go @@ -0,0 +1,122 @@ +package responsemanager + +import ( + "context" + "errors" + + "github.com/ipfs/go-cid" + ipld "github.com/ipld/go-ipld-prime" + cidlink "github.com/ipld/go-ipld-prime/linking/cid" + "github.com/libp2p/go-libp2p-core/peer" + + "github.com/ipfs/go-graphsync" + "github.com/ipfs/go-graphsync/cidset" + "github.com/ipfs/go-graphsync/dedupkey" + "github.com/ipfs/go-graphsync/ipldutil" + gsmsg "github.com/ipfs/go-graphsync/message" + "github.com/ipfs/go-graphsync/notifications" + "github.com/ipfs/go-graphsync/responsemanager/responseassembler" +) + +type queryPreparer struct { + requestHooks RequestHooks + responseAssembler ResponseAssembler + linkSystem ipld.LinkSystem +} + +func (qe *queryPreparer) prepareQuery(ctx context.Context, + p peer.ID, + request gsmsg.GraphSyncRequest, signals signals, sub *notifications.TopicDataSubscriber) (ipld.BlockReadOpener, ipldutil.Traverser, bool, error) { + result := qe.requestHooks.ProcessRequestHooks(p, request) + var transactionError error + var isPaused bool + failNotifee := notifications.Notifee{Data: graphsync.RequestFailedUnknown, Subscriber: sub} + rejectNotifee := notifications.Notifee{Data: graphsync.RequestRejected, Subscriber: sub} + err := qe.responseAssembler.Transaction(p, request.ID(), func(rb responseassembler.ResponseBuilder) error { + for _, extension := range result.Extensions { + rb.SendExtensionData(extension) + } + if result.Err != nil { + rb.FinishWithError(graphsync.RequestFailedUnknown) + rb.AddNotifee(failNotifee) + transactionError = result.Err + } else if !result.IsValidated { + rb.FinishWithError(graphsync.RequestRejected) + rb.AddNotifee(rejectNotifee) + transactionError = errors.New("request not valid") + } else if result.IsPaused { + rb.PauseRequest() + isPaused = true + } + return nil + }) + if err != nil { + return nil, nil, false, err + } + if transactionError != nil { + return nil, nil, false, transactionError + } + if err := qe.processDedupByKey(request, p, failNotifee); err != nil { + return nil, nil, false, err + } + if err := qe.processDoNoSendCids(request, p, failNotifee); err != nil { + return nil, nil, false, err + } + rootLink := cidlink.Link{Cid: request.Root()} + linkSystem := result.CustomLinkSystem + if linkSystem.StorageReadOpener == nil { + linkSystem = qe.linkSystem + } + traverser := ipldutil.TraversalBuilder{ + Root: rootLink, + Selector: request.Selector(), + LinkSystem: linkSystem, + Chooser: result.CustomChooser, + }.Start(ctx) + + return linkSystem.StorageReadOpener, traverser, isPaused, nil +} + +func (qe *queryPreparer) processDedupByKey(request gsmsg.GraphSyncRequest, p peer.ID, failNotifee notifications.Notifee) error { + dedupData, has := request.Extension(graphsync.ExtensionDeDupByKey) + if !has { + return nil + } + key, err := dedupkey.DecodeDedupKey(dedupData) + if err != nil { + _ = qe.responseAssembler.Transaction(p, request.ID(), func(rb responseassembler.ResponseBuilder) error { + rb.FinishWithError(graphsync.RequestFailedUnknown) + rb.AddNotifee(failNotifee) + return nil + }) + return err + } + qe.responseAssembler.DedupKey(p, request.ID(), key) + return nil +} + +func (qe *queryPreparer) processDoNoSendCids(request gsmsg.GraphSyncRequest, p peer.ID, failNotifee notifications.Notifee) error { + doNotSendCidsData, has := request.Extension(graphsync.ExtensionDoNotSendCIDs) + if !has { + return nil + } + cidSet, err := cidset.DecodeCidSet(doNotSendCidsData) + if err != nil { + _ = qe.responseAssembler.Transaction(p, request.ID(), func(rb responseassembler.ResponseBuilder) error { + rb.FinishWithError(graphsync.RequestFailedUnknown) + rb.AddNotifee(failNotifee) + return nil + }) + return err + } + links := make([]ipld.Link, 0, cidSet.Len()) + err = cidSet.ForEach(func(c cid.Cid) error { + links = append(links, cidlink.Link{Cid: c}) + return nil + }) + if err != nil { + return err + } + qe.responseAssembler.IgnoreBlocks(p, request.ID(), links) + return nil +} diff --git a/responsemanager/responseassembler/responseBuilder.go b/responsemanager/responseassembler/responseBuilder.go index a18c83e6..7ccee6cf 100644 --- a/responsemanager/responseassembler/responseBuilder.go +++ b/responsemanager/responseassembler/responseBuilder.go @@ -2,7 +2,7 @@ package responseassembler import ( blocks "github.com/ipfs/go-block-format" - logging "github.com/ipfs/go-log" + logging "github.com/ipfs/go-log/v2" "github.com/ipld/go-ipld-prime" cidlink "github.com/ipld/go-ipld-prime/linking/cid" diff --git a/responsemanager/responsemanager.go b/responsemanager/responsemanager.go index 0ecae808..41faab18 100644 --- a/responsemanager/responsemanager.go +++ b/responsemanager/responsemanager.go @@ -6,7 +6,7 @@ import ( "math" "time" - logging "github.com/ipfs/go-log" + logging "github.com/ipfs/go-log/v2" "github.com/ipfs/go-peertaskqueue/peertask" ipld "github.com/ipld/go-ipld-prime" "github.com/libp2p/go-libp2p-core/peer" @@ -73,6 +73,11 @@ type RequestHooks interface { ProcessRequestHooks(p peer.ID, request graphsync.RequestData) hooks.RequestResult } +// RequestQueuedHooks is an interface for processing request queued hooks +type RequestQueuedHooks interface { + ProcessRequestQueuedHooks(p peer.ID, request graphsync.RequestData) +} + // BlockHooks is an interface for processing block hooks type BlockHooks interface { ProcessBlockHooks(p peer.ID, request graphsync.RequestData, blockData graphsync.BlockData) hooks.BlockResult @@ -121,6 +126,9 @@ type ResponseManager struct { cancelFn context.CancelFunc responseAssembler ResponseAssembler queryQueue QueryQueue + requestHooks RequestHooks + linkSystem ipld.LinkSystem + requestQueuedHooks RequestQueuedHooks updateHooks UpdateHooks cancelledListeners CancelledListeners completedListeners CompletedListeners @@ -138,6 +146,7 @@ func New(ctx context.Context, linkSystem ipld.LinkSystem, responseAssembler ResponseAssembler, queryQueue QueryQueue, + requestQueuedHooks RequestQueuedHooks, requestHooks RequestHooks, blockHooks BlockHooks, updateHooks UpdateHooks, @@ -151,12 +160,10 @@ func New(ctx context.Context, messages := make(chan responseManagerMessage, 16) workSignal := make(chan struct{}, 1) qe := &queryExecutor{ - requestHooks: requestHooks, blockHooks: blockHooks, updateHooks: updateHooks, cancelledListeners: cancelledListeners, responseAssembler: responseAssembler, - linkSystem: linkSystem, queryQueue: queryQueue, messages: messages, ctx: ctx, @@ -166,8 +173,11 @@ func New(ctx context.Context, return &ResponseManager{ ctx: ctx, cancelFn: cancelFn, + requestHooks: requestHooks, + linkSystem: linkSystem, responseAssembler: responseAssembler, queryQueue: queryQueue, + requestQueuedHooks: requestQueuedHooks, updateHooks: updateHooks, cancelledListeners: cancelledListeners, completedListeners: completedListeners, @@ -257,23 +267,17 @@ func (rm *ResponseManager) synchronize() { _ = rm.sendSyncMessage(&synchronizeMessage{sync}, sync) } -type responseDataRequest struct { - key responseKey +type startTaskRequest struct { + task *peertask.Task taskDataChan chan responseTaskData } type finishTaskRequest struct { - key responseKey + task *peertask.Task status graphsync.ResponseStatusCode err error } -type setResponseDataRequest struct { - key responseKey - loader ipld.BlockReadOpener - traverser ipldutil.Traverser -} - type responseUpdateRequest struct { key responseKey updateChan chan []gsmsg.GraphSyncRequest @@ -423,6 +427,7 @@ func (prm *processRequestMessage) handle(rm *ResponseManager) { rm.processUpdate(key, request) continue } + rm.requestQueuedHooks.ProcessRequestQueuedHooks(prm.p, request) ctx, cancelFn := context.WithCancel(rm.ctx) sub := notifications.NewTopicDataSubscriber(&subscriber{ p: key.p, @@ -433,6 +438,7 @@ func (prm *processRequestMessage) handle(rm *ResponseManager) { completedListeners: rm.completedListeners, networkErrorListeners: rm.networkErrorListeners, }) + rm.inProgressResponses[key] = &inProgressResponseStatus{ ctx: ctx, @@ -446,7 +452,9 @@ func (prm *processRequestMessage) handle(rm *ResponseManager) { }, } // TODO: Use a better work estimation metric. + rm.queryQueue.PushTasks(prm.p, peertask.Task{Topic: key, Priority: int(request.Priority()), Work: 1}) + select { case rm.workSignal <- struct{}{}: default: @@ -454,22 +462,45 @@ func (prm *processRequestMessage) handle(rm *ResponseManager) { } } -func (rdr *responseDataRequest) handle(rm *ResponseManager) { - response, ok := rm.inProgressResponses[rdr.key] - var taskData responseTaskData - if ok { - taskData = responseTaskData{false, response.subscriber, response.ctx, response.request, response.loader, response.traverser, response.signals} - } else { - taskData = responseTaskData{empty: true} +func (str *startTaskRequest) handle(rm *ResponseManager) { + key := str.task.Topic.(responseKey) + taskData := str.responseTaskData(rm, key) + if taskData.empty { + rm.queryQueue.TasksDone(key.p, str.task) } select { case <-rm.ctx.Done(): - case rdr.taskDataChan <- taskData: + case str.taskDataChan <- taskData: } } +func (str *startTaskRequest) responseTaskData(rm *ResponseManager, key responseKey) responseTaskData { + response, hasResponse := rm.inProgressResponses[key] + if !hasResponse { + return responseTaskData{empty: true} + } + + if response.loader == nil || response.traverser == nil { + loader, traverser, isPaused, err := (&queryPreparer{rm.requestHooks, rm.responseAssembler, rm.linkSystem}).prepareQuery(response.ctx, key.p, response.request, response.signals, response.subscriber) + if err != nil { + response.cancelFn() + delete(rm.inProgressResponses, key) + return responseTaskData{empty: true} + } + response.loader = loader + response.traverser = traverser + if isPaused { + response.isPaused = true + return responseTaskData{empty: true} + } + } + return responseTaskData{false, response.subscriber, response.ctx, response.request, response.loader, response.traverser, response.signals} +} + func (ftr *finishTaskRequest) handle(rm *ResponseManager) { - response, ok := rm.inProgressResponses[ftr.key] + key := ftr.task.Topic.(responseKey) + rm.queryQueue.TasksDone(key.p, ftr.task) + response, ok := rm.inProgressResponses[key] if !ok { return } @@ -480,19 +511,10 @@ func (ftr *finishTaskRequest) handle(rm *ResponseManager) { if ftr.err != nil { log.Infof("response failed: %w", ftr.err) } - delete(rm.inProgressResponses, ftr.key) + delete(rm.inProgressResponses, key) response.cancelFn() } -func (srdr *setResponseDataRequest) handle(rm *ResponseManager) { - response, ok := rm.inProgressResponses[srdr.key] - if !ok { - return - } - response.loader = srdr.loader - response.traverser = srdr.traverser -} - func (rur *responseUpdateRequest) handle(rm *ResponseManager) { response, ok := rm.inProgressResponses[rur.key] var updates []gsmsg.GraphSyncRequest diff --git a/responsemanager/responsemanager_test.go b/responsemanager/responsemanager_test.go index 0f965cd8..2b527144 100644 --- a/responsemanager/responsemanager_test.go +++ b/responsemanager/responsemanager_test.go @@ -37,6 +37,19 @@ func TestIncomingQuery(t *testing.T) { blks := td.blockChain.AllBlocks() responseManager := td.newResponseManager() + + type queuedHook struct { + p peer.ID + request graphsync.RequestData + } + + qhc := make(chan *queuedHook, 1) + td.requestQueuedHooks.Register(func(p peer.ID, request graphsync.RequestData) { + qhc <- &queuedHook{ + p: p, + request: request, + } + }) td.requestHooks.Register(selectorvalidator.SelectorValidator(100)) responseManager.Startup() @@ -45,6 +58,11 @@ func TestIncomingQuery(t *testing.T) { for i := 0; i < len(blks); i++ { td.assertSendBlock() } + + // ensure request queued hook fires. + out := <-qhc + require.Equal(t, td.p, out.p) + require.Equal(t, out.request.ID(), td.requestID) } func TestCancellationQueryInProgress(t *testing.T) { @@ -89,7 +107,7 @@ func TestCancellationViaCommand(t *testing.T) { err := responseManager.CancelResponse(td.p, td.requestID) require.NoError(t, err) - td.assertCompleteRequestWithFailure() + td.assertCompleteRequestWith(graphsync.RequestCancelled) } func TestEarlyCancellation(t *testing.T) { @@ -97,6 +115,7 @@ func TestEarlyCancellation(t *testing.T) { defer td.cancel() td.queryQueue.popWait.Add(1) responseManager := td.newResponseManager() + td.requestHooks.Register(selectorvalidator.SelectorValidator(100)) responseManager.Startup() responseManager.ProcessRequests(td.ctx, td.p, td.requests) @@ -113,6 +132,39 @@ func TestEarlyCancellation(t *testing.T) { td.assertNoResponses() } +func TestMissingContent(t *testing.T) { + t.Run("missing root block", func(t *testing.T) { + td := newTestData(t) + defer td.cancel() + responseManager := td.newResponseManager() + responseManager.Startup() + td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) { + hookActions.ValidateRequest() + }) + // delete the root block + delete(td.blockStore, cidlink.Link{Cid: td.requests[0].Root()}) + responseManager.ProcessRequests(td.ctx, td.p, td.requests) + td.assertCompleteRequestWith(graphsync.RequestFailedContentNotFound) + }) + t.Run("missing other block", func(t *testing.T) { + td := newTestData(t) + defer td.cancel() + responseManager := td.newResponseManager() + responseManager.Startup() + td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) { + hookActions.ValidateRequest() + }) + // delete a block that isn't the root + for link := range td.blockStore { + if link.String() != td.requests[0].Root().String() { + delete(td.blockStore, link) + break + } + } + responseManager.ProcessRequests(td.ctx, td.p, td.requests) + td.assertCompleteRequestWith(graphsync.RequestCompletedPartial) + }) +} func TestValidationAndExtensions(t *testing.T) { t.Run("on its own, should fail validation", func(t *testing.T) { @@ -121,7 +173,7 @@ func TestValidationAndExtensions(t *testing.T) { responseManager := td.newResponseManager() responseManager.Startup() responseManager.ProcessRequests(td.ctx, td.p, td.requests) - td.assertCompleteRequestWithFailure() + td.assertCompleteRequestWith(graphsync.RequestRejected) }) t.Run("if non validating hook succeeds, does not pass validation", func(t *testing.T) { @@ -133,7 +185,7 @@ func TestValidationAndExtensions(t *testing.T) { hookActions.SendExtensionData(td.extensionResponse) }) responseManager.ProcessRequests(td.ctx, td.p, td.requests) - td.assertCompleteRequestWithFailure() + td.assertCompleteRequestWith(graphsync.RequestRejected) td.assertReceiveExtensionResponse() }) @@ -147,7 +199,7 @@ func TestValidationAndExtensions(t *testing.T) { hookActions.SendExtensionData(td.extensionResponse) }) responseManager.ProcessRequests(td.ctx, td.p, td.requests) - td.assertCompleteRequestWithSuccess() + td.assertCompleteRequestWith(graphsync.RequestCompletedFull) td.assertReceiveExtensionResponse() }) @@ -164,7 +216,7 @@ func TestValidationAndExtensions(t *testing.T) { hookActions.TerminateWithError(errors.New("everything went to crap")) }) responseManager.ProcessRequests(td.ctx, td.p, td.requests) - td.assertCompleteRequestWithFailure() + td.assertCompleteRequestWith(graphsync.RequestFailedUnknown) td.assertReceiveExtensionResponse() }) @@ -180,7 +232,7 @@ func TestValidationAndExtensions(t *testing.T) { // hook validates request responseManager.ProcessRequests(td.ctx, td.p, td.requests) - td.assertCompleteRequestWithSuccess() + td.assertCompleteRequestWith(graphsync.RequestCompletedFull) td.assertReceiveExtensionResponse() // unregister @@ -188,7 +240,7 @@ func TestValidationAndExtensions(t *testing.T) { // now same request should fail responseManager.ProcessRequests(td.ctx, td.p, td.requests) - td.assertCompleteRequestWithFailure() + td.assertCompleteRequestWith(graphsync.RequestRejected) }) t.Run("hooks can alter the loader", func(t *testing.T) { @@ -203,7 +255,7 @@ func TestValidationAndExtensions(t *testing.T) { // request fails with base loader reading from block store that's missing data responseManager.ProcessRequests(td.ctx, td.p, td.requests) - td.assertCompleteRequestWithFailure() + td.assertCompleteRequestWith(graphsync.RequestFailedContentNotFound) err := td.peristenceOptions.Register("chainstore", td.persistence) require.NoError(t, err) @@ -216,7 +268,7 @@ func TestValidationAndExtensions(t *testing.T) { }) // hook uses different loader that should make request succeed responseManager.ProcessRequests(td.ctx, td.p, td.requests) - td.assertCompleteRequestWithSuccess() + td.assertCompleteRequestWith(graphsync.RequestCompletedFull) td.assertReceiveExtensionResponse() }) @@ -239,7 +291,7 @@ func TestValidationAndExtensions(t *testing.T) { // with default chooser, customer chooser not called responseManager.ProcessRequests(td.ctx, td.p, td.requests) - td.assertCompleteRequestWithSuccess() + td.assertCompleteRequestWith(graphsync.RequestCompletedFull) require.Equal(t, 0, customChooserCallCount) // register hook to use custom chooser @@ -252,7 +304,7 @@ func TestValidationAndExtensions(t *testing.T) { // verify now that request succeeds and uses custom chooser responseManager.ProcessRequests(td.ctx, td.p, td.requests) - td.assertCompleteRequestWithSuccess() + td.assertCompleteRequestWith(graphsync.RequestCompletedFull) td.assertReceiveExtensionResponse() require.Equal(t, 5, customChooserCallCount) }) @@ -280,7 +332,7 @@ func TestValidationAndExtensions(t *testing.T) { }), } responseManager.ProcessRequests(td.ctx, td.p, requests) - td.assertCompleteRequestWithSuccess() + td.assertCompleteRequestWith(graphsync.RequestCompletedFull) td.assertIgnoredCids(set) }) t.Run("dedup-by-key extension", func(t *testing.T) { @@ -301,7 +353,7 @@ func TestValidationAndExtensions(t *testing.T) { }), } responseManager.ProcessRequests(td.ctx, td.p, requests) - td.assertCompleteRequestWithSuccess() + td.assertCompleteRequestWith(graphsync.RequestCompletedFull) td.assertDedupKey("applesauce") }) t.Run("test pause/resume", func(t *testing.T) { @@ -319,7 +371,7 @@ func TestValidationAndExtensions(t *testing.T) { testutil.AssertChannelEmpty(t, td.sentResponses, "should not send more blocks") err := responseManager.UnpauseResponse(td.p, td.requestID) require.NoError(t, err) - td.assertCompleteRequestWithSuccess() + td.assertCompleteRequestWith(graphsync.RequestCompletedFull) }) t.Run("test block hook processing", func(t *testing.T) { t.Run("can send extension data", func(t *testing.T) { @@ -334,7 +386,7 @@ func TestValidationAndExtensions(t *testing.T) { hookActions.SendExtensionData(td.extensionResponse) }) responseManager.ProcessRequests(td.ctx, td.p, td.requests) - td.assertCompleteRequestWithSuccess() + td.assertCompleteRequestWith(graphsync.RequestCompletedFull) for i := 0; i < td.blockChainLength; i++ { td.assertReceiveExtensionResponse() } @@ -352,7 +404,7 @@ func TestValidationAndExtensions(t *testing.T) { hookActions.TerminateWithError(errors.New("failed")) }) responseManager.ProcessRequests(td.ctx, td.p, td.requests) - td.assertCompleteRequestWithFailure() + td.assertCompleteRequestWith(graphsync.RequestFailedUnknown) }) t.Run("can pause/unpause", func(t *testing.T) { @@ -378,7 +430,7 @@ func TestValidationAndExtensions(t *testing.T) { err := responseManager.UnpauseResponse(td.p, td.requestID, td.extensionResponse) require.NoError(t, err) td.assertReceiveExtensionResponse() - td.assertCompleteRequestWithSuccess() + td.assertCompleteRequestWith(graphsync.RequestCompletedFull) }) t.Run("can pause/unpause externally", func(t *testing.T) { @@ -405,9 +457,30 @@ func TestValidationAndExtensions(t *testing.T) { err := responseManager.UnpauseResponse(td.p, td.requestID) require.NoError(t, err) td.verifyNResponses(td.blockChainLength - (blockCount + 1)) - td.assertCompleteRequestWithSuccess() + td.assertCompleteRequestWith(graphsync.RequestCompletedFull) }) + t.Run("if started paused, unpausing always works", func(t *testing.T) { + td := newTestData(t) + defer td.cancel() + responseManager := td.newResponseManager() + responseManager.Startup() + advance := make(chan struct{}) + td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) { + hookActions.ValidateRequest() + hookActions.PauseResponse() + close(advance) + }) + go func() { + <-advance + err := responseManager.UnpauseResponse(td.p, td.requestID) + require.NoError(t, err) + }() + responseManager.ProcessRequests(td.ctx, td.p, td.requests) + td.assertPausedRequest() + td.verifyNResponses(td.blockChainLength) + td.assertCompleteRequestWith(graphsync.RequestCompletedFull) + }) }) t.Run("test update hook processing", func(t *testing.T) { @@ -439,7 +512,7 @@ func TestValidationAndExtensions(t *testing.T) { td.assertPausedRequest() responseManager.ProcessRequests(td.ctx, td.p, td.updateRequests) - td.assertCompleteRequestWithSuccess() + td.assertCompleteRequestWith(graphsync.RequestCompletedFull) }) t.Run("can send extension data", func(t *testing.T) { @@ -472,7 +545,7 @@ func TestValidationAndExtensions(t *testing.T) { responseManager.ProcessRequests(td.ctx, td.p, td.updateRequests) responseManager.synchronize() close(wait) - td.assertCompleteRequestWithSuccess() + td.assertCompleteRequestWith(graphsync.RequestCompletedFull) td.assertReceiveExtensionResponse() }) @@ -542,7 +615,7 @@ func TestValidationAndExtensions(t *testing.T) { responseManager.ProcessRequests(td.ctx, td.p, td.updateRequests) responseManager.synchronize() close(wait) - td.assertCompleteRequestWithFailure() + td.assertCompleteRequestWith(graphsync.RequestFailedUnknown) }) t.Run("when paused", func(t *testing.T) { @@ -572,7 +645,7 @@ func TestValidationAndExtensions(t *testing.T) { // send update responseManager.ProcessRequests(td.ctx, td.p, td.updateRequests) - td.assertCompleteRequestWithFailure() + td.assertCompleteRequestWith(graphsync.RequestFailedUnknown) // cannot unpause err := responseManager.UnpauseResponse(td.p, td.requestID) @@ -594,7 +667,7 @@ func TestNetworkErrors(t *testing.T) { }) responseManager.ProcessRequests(td.ctx, td.p, td.requests) td.verifyNResponses(td.blockChainLength) - td.assertOnlyCompleteProcessingWithSuccess() + td.assertOnlyCompleteProcessingWith(graphsync.RequestCompletedFull) err := errors.New("something went wrong") td.notifyStatusMessagesNetworkError(err) td.assertNetworkErrors(err, 1) @@ -606,7 +679,7 @@ func TestNetworkErrors(t *testing.T) { responseManager := td.newResponseManager() responseManager.Startup() responseManager.ProcessRequests(td.ctx, td.p, td.requests) - td.assertOnlyCompleteProcessingWithFailure() + td.assertOnlyCompleteProcessingWith(graphsync.RequestRejected) err := errors.New("something went wrong") td.notifyStatusMessagesNetworkError(err) td.assertNetworkErrors(err, 1) @@ -714,6 +787,7 @@ type fakeResponseAssembler struct { ignoredLinks chan []ipld.Link notifeePublisher *testutil.MockPublisher dedupKeys chan string + missingBlock bool } func (fra *fakeResponseAssembler) Transaction(p peer.ID, requestID graphsync.RequestID, transaction responseassembler.Transaction) error { @@ -776,7 +850,9 @@ func (fra *fakeResponseAssembler) sendResponse( data []byte, ) graphsync.BlockData { fra.sentResponses <- sentResponse{requestID, link, data} - + if data == nil { + fra.missingBlock = true + } return fakeBlkData{link, uint64(len(data))} } @@ -788,11 +864,17 @@ func (fra *fakeResponseAssembler) sendExtensionData( } func (fra *fakeResponseAssembler) finishRequest(requestID graphsync.RequestID) graphsync.ResponseStatusCode { - fra.lastCompletedRequest <- completedRequest{requestID, graphsync.RequestCompletedFull} - return graphsync.RequestCompletedFull + code := graphsync.RequestCompletedFull + if fra.missingBlock { + code = graphsync.RequestCompletedPartial + } + fra.missingBlock = false + fra.lastCompletedRequest <- completedRequest{requestID, code} + return code } func (fra *fakeResponseAssembler) finishWithError(requestID graphsync.RequestID, status graphsync.ResponseStatusCode) { + fra.missingBlock = false fra.lastCompletedRequest <- completedRequest{requestID, status} } @@ -801,6 +883,7 @@ func (fra *fakeResponseAssembler) pauseRequest(requestID graphsync.RequestID) { } func (fra *fakeResponseAssembler) clearRequest(requestID graphsync.RequestID) { + fra.missingBlock = false fra.clearedRequests <- clearedRequest{requestID} } @@ -867,6 +950,7 @@ type testData struct { updateRequests []gsmsg.GraphSyncRequest p peer.ID peristenceOptions *persistenceoptions.PersistenceOptions + requestQueuedHooks *hooks.IncomingRequestQueuedHooks requestHooks *hooks.IncomingRequestHooks blockHooks *hooks.OutgoingBlockHooks updateHooks *hooks.RequestUpdatedHooks @@ -940,6 +1024,7 @@ func newTestData(t *testing.T) testData { } td.p = testutil.GeneratePeers(1)[0] td.peristenceOptions = persistenceoptions.New() + td.requestQueuedHooks = hooks.NewRequestQueuedHooks() td.requestHooks = hooks.NewRequestHooks(td.peristenceOptions) td.blockHooks = hooks.NewBlockHooks() td.updateHooks = hooks.NewUpdateHooks() @@ -969,13 +1054,13 @@ func newTestData(t *testing.T) testData { } func (td *testData) newResponseManager() *ResponseManager { - return New(td.ctx, td.persistence, td.responseAssembler, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners, td.cancelledListeners, td.blockSentListeners, td.networkErrorListeners, 6) + return New(td.ctx, td.persistence, td.responseAssembler, td.queryQueue, td.requestQueuedHooks, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners, td.cancelledListeners, td.blockSentListeners, td.networkErrorListeners, 6) } func (td *testData) alternateLoaderResponseManager() *ResponseManager { obs := make(map[ipld.Link][]byte) persistence := testutil.NewTestStore(obs) - return New(td.ctx, persistence, td.responseAssembler, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners, td.cancelledListeners, td.blockSentListeners, td.networkErrorListeners, 6) + return New(td.ctx, persistence, td.responseAssembler, td.queryQueue, td.requestQueuedHooks, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners, td.cancelledListeners, td.blockSentListeners, td.networkErrorListeners, 6) } func (td *testData) assertPausedRequest() { @@ -1011,38 +1096,24 @@ func (td *testData) assertNoResponses() { testutil.AssertDoesReceiveFirst(td.t, timer.C, "should not process more responses", td.sentResponses, td.completedRequestChan) } -func (td *testData) assertCompleteRequestWithFailure() { - td.assertOnlyCompleteProcessingWithFailure() +func (td *testData) assertCompleteRequestWith(expectedCode graphsync.ResponseStatusCode) { + td.assertOnlyCompleteProcessingWith(expectedCode) td.notifyStatusMessagesSent() var status graphsync.ResponseStatusCode testutil.AssertReceive(td.ctx, td.t, td.completedResponseStatuses, &status, "should receive status") - require.True(td.t, gsmsg.IsTerminalFailureCode(status), "request should succeed") + require.Equal(td.t, expectedCode, status) } -func (td *testData) assertCompleteRequestWithSuccess() { - td.assertOnlyCompleteProcessingWithSuccess() - td.notifyStatusMessagesSent() - var status graphsync.ResponseStatusCode - testutil.AssertReceive(td.ctx, td.t, td.completedResponseStatuses, &status, "should receive status") - require.True(td.t, gsmsg.IsTerminalSuccessCode(status), "request should succeed") -} - -func (td *testData) assertOnlyCompleteProcessingWithSuccess() { +func (td *testData) assertOnlyCompleteProcessingWith(expectedCode graphsync.ResponseStatusCode) { var lastRequest completedRequest testutil.AssertReceive(td.ctx, td.t, td.completedRequestChan, &lastRequest, "should complete request") - require.True(td.t, gsmsg.IsTerminalSuccessCode(lastRequest.result), "request should succeed") + require.Equal(td.t, expectedCode, lastRequest.result) } func (td *testData) assertRequestCleared() { testutil.AssertDoesReceive(td.ctx, td.t, td.clearedRequests, "should clear request") } -func (td *testData) assertOnlyCompleteProcessingWithFailure() { - var lastRequest completedRequest - testutil.AssertReceive(td.ctx, td.t, td.completedRequestChan, &lastRequest, "should complete request") - require.True(td.t, gsmsg.IsTerminalFailureCode(lastRequest.result), "should terminate with failure") -} - func (td *testData) assertRequestDoesNotCompleteWhilePaused() { timer := time.NewTimer(100 * time.Millisecond) testutil.AssertDoesReceiveFirst(td.t, timer.C, "should not complete request while paused", td.completedRequestChan) diff --git a/responsemanager/runtraversal/runtraversal.go b/responsemanager/runtraversal/runtraversal.go index 5814921c..b4f514b3 100644 --- a/responsemanager/runtraversal/runtraversal.go +++ b/responsemanager/runtraversal/runtraversal.go @@ -4,12 +4,24 @@ import ( "bytes" "io" + logging "github.com/ipfs/go-log/v2" ipld "github.com/ipld/go-ipld-prime" "github.com/ipld/go-ipld-prime/traversal" "github.com/ipfs/go-graphsync/ipldutil" ) +var logger = logging.Logger("gs-traversal") + +type errorString string + +func (e errorString) Error() string { + return string(e) +} + +// ErrFirstBlockLoad indicates the traversal was unable to load the very first block in the traversal +const ErrFirstBlockLoad = errorString("Unable to load first block") + // ResponseSender sends responses over the network type ResponseSender func( link ipld.Link, @@ -22,15 +34,26 @@ func RunTraversal( loader ipld.BlockReadOpener, traverser ipldutil.Traverser, sendResponse ResponseSender) error { + for { isComplete, err := traverser.IsComplete() if isComplete { + if err != nil { + logger.Errorf("traversal completion check failed, nBlocksRead=%d, err=%s", traverser.NBlocksTraversed(), err) + if (traverser.NBlocksTraversed() == 0 && err == traversal.SkipMe{}) { + return ErrFirstBlockLoad + } + } else { + logger.Debugf("traversal completed successfully, nBlocksRead=%d", traverser.NBlocksTraversed()) + } return err } lnk, lnkCtx := traverser.CurrentRequest() + logger.Debugf("will load link=%s", lnk) result, err := loader(lnkCtx, lnk) var data []byte if err != nil { + logger.Errorf("failed to load link=%s, nBlocksRead=%d, err=%s", lnk, traverser.NBlocksTraversed(), err) traverser.Error(traversal.SkipMe{}) } else { blockBuffer, ok := result.(*bytes.Buffer) @@ -39,14 +62,17 @@ func RunTraversal( _, err = io.Copy(blockBuffer, result) } if err != nil { + logger.Errorf("failed to write to buffer, link=%s, nBlocksRead=%d, err=%s", lnk, traverser.NBlocksTraversed(), err) traverser.Error(err) } else { data = blockBuffer.Bytes() err = traverser.Advance(blockBuffer) if err != nil { + logger.Errorf("failed to advance traversal, link=%s, nBlocksRead=%d, err=%s", lnk, traverser.NBlocksTraversed(), err) return err } } + logger.Debugf("successfully loaded link=%s, nBlocksRead=%d", lnk, traverser.NBlocksTraversed()) } err = sendResponse(lnk, data) if err != nil { diff --git a/responsemanager/runtraversal/runtraversal_test.go b/responsemanager/runtraversal/runtraversal_test.go index 00e510c1..755973f3 100644 --- a/responsemanager/runtraversal/runtraversal_test.go +++ b/responsemanager/runtraversal/runtraversal_test.go @@ -69,6 +69,10 @@ type fakeTraverser struct { expectedOutcomes []traverseOutcome } +func (ft *fakeTraverser) NBlocksTraversed() int { + return 0 +} + // IsComplete returns the completion state (boolean) and if so, the final error result from IPLD func (ft *fakeTraverser) IsComplete() (bool, error) { if ft.currentLink >= len(ft.loadedLinks) { diff --git a/scripts/mkreleaselog b/scripts/mkreleaselog index 894a2424..574525d0 100755 --- a/scripts/mkreleaselog +++ b/scripts/mkreleaselog @@ -139,7 +139,7 @@ indent() { } mod_deps() { - go list -json -m all | jq 'select(.Version != null)' + go list -mod=mod -json -m all | jq 'select(.Version != null)' } ensure() {