diff --git a/impl/graphsync_test.go b/impl/graphsync_test.go index 54d45ea4..7c7e4d28 100644 --- a/impl/graphsync_test.go +++ b/impl/graphsync_test.go @@ -205,7 +205,7 @@ func TestRejectRequestsByDefault(t *testing.T) { "request(0)->newRequest(0)", "request(0)->executeTask(0)", "request(0)->terminateRequest(0)", - "responseMessage(0)->loaderProcess(0)->cacheProcess(0)", + "processResponses(0)->loaderProcess(0)->cacheProcess(0)", "response(0)->transaction(0)->execute(0)->buildMessage(0)", "message(0)->sendMessage(0)", "message(1)->sendMessage(0)", @@ -258,8 +258,8 @@ func TestGraphsyncRoundTripRequestBudgetRequestor(t *testing.T) { require.Contains(t, traceStrings, "request(0)->newRequest(0)") require.Contains(t, traceStrings, "request(0)->executeTask(0)") require.Contains(t, traceStrings, "request(0)->terminateRequest(0)") - require.Contains(t, traceStrings, "responseMessage(0)->loaderProcess(0)->cacheProcess(0)") // should have one of these per response - require.Contains(t, traceStrings, "request(0)->verifyBlock(0)") // should have one of these per block + require.Contains(t, traceStrings, "processResponses(0)->loaderProcess(0)->cacheProcess(0)") // should have one of these per response + require.Contains(t, traceStrings, "request(0)->verifyBlock(0)") // should have one of these per block // has ErrBudgetExceeded exception recorded in the right place tracing.SingleExceptionEvent(t, "request(0)->executeTask(0)", "ErrBudgetExceeded", "traversal budget exceeded", true) @@ -308,8 +308,8 @@ func TestGraphsyncRoundTripRequestBudgetResponder(t *testing.T) { require.Contains(t, traceStrings, "request(0)->newRequest(0)") require.Contains(t, traceStrings, "request(0)->executeTask(0)") require.Contains(t, traceStrings, "request(0)->terminateRequest(0)") - require.Contains(t, traceStrings, "responseMessage(0)->loaderProcess(0)->cacheProcess(0)") // should have one of these per response - require.Contains(t, traceStrings, "request(0)->verifyBlock(0)") // should have one of these per block + require.Contains(t, traceStrings, "processResponses(0)->loaderProcess(0)->cacheProcess(0)") // should have one of these per response + require.Contains(t, traceStrings, "request(0)->verifyBlock(0)") // should have one of these per block // has ContextCancelError exception recorded in the right place // the requester gets a cancel, the responder gets a ErrBudgetExceeded @@ -391,8 +391,8 @@ func TestGraphsyncRoundTrip(t *testing.T) { require.Contains(t, traceStrings, "request(0)->newRequest(0)") require.Contains(t, traceStrings, "request(0)->executeTask(0)") require.Contains(t, traceStrings, "request(0)->terminateRequest(0)") - require.Contains(t, traceStrings, "responseMessage(0)->loaderProcess(0)->cacheProcess(0)") // should have one of these per response - require.Contains(t, traceStrings, "request(0)->verifyBlock(0)") // should have one of these per block + require.Contains(t, traceStrings, "processResponses(0)->loaderProcess(0)->cacheProcess(0)") // should have one of these per response + require.Contains(t, traceStrings, "request(0)->verifyBlock(0)") // should have one of these per block processUpdateSpan := tracing.FindSpanByTraceString("response(0)") require.Equal(t, int64(0), testutil.AttributeValueInTraceSpan(t, *processUpdateSpan, "priority").AsInt64()) @@ -488,8 +488,8 @@ func TestGraphsyncRoundTripPartial(t *testing.T) { require.Contains(t, traceStrings, "request(0)->newRequest(0)") require.Contains(t, traceStrings, "request(0)->executeTask(0)") require.Contains(t, traceStrings, "request(0)->terminateRequest(0)") - require.Contains(t, traceStrings, "responseMessage(0)->loaderProcess(0)->cacheProcess(0)") // should have one of these per response - require.Contains(t, traceStrings, "request(0)->verifyBlock(0)") // should have one of these per block + require.Contains(t, traceStrings, "processResponses(0)->loaderProcess(0)->cacheProcess(0)") // should have one of these per response + require.Contains(t, traceStrings, "request(0)->verifyBlock(0)") // should have one of these per block } func TestGraphsyncRoundTripIgnoreCids(t *testing.T) { @@ -556,7 +556,7 @@ func TestGraphsyncRoundTripIgnoreCids(t *testing.T) { "request(0)->executeTask(0)", "request(0)->terminateRequest(0)", }, - responseMessageTraces(t, tracing, responseCount)...), + processResponsesTraces(t, tracing, responseCount)...), testutil.RepeatTraceStrings("message({})->sendMessage(0)", responseCount+1)...), testutil.RepeatTraceStrings("request(0)->verifyBlock({})", 50)...), // half of the full chain testutil.RepeatTraceStrings("response(0)->executeTask(0)->processBlock({})->loadBlock(0)", blockChainLength)...), @@ -630,7 +630,7 @@ func TestGraphsyncRoundTripIgnoreNBlocks(t *testing.T) { "request(0)->executeTask(0)", "request(0)->terminateRequest(0)", }, - responseMessageTraces(t, tracing, responseCount)...), + processResponsesTraces(t, tracing, responseCount)...), testutil.RepeatTraceStrings("message({})->sendMessage(0)", responseCount+1)...), testutil.RepeatTraceStrings("request(0)->verifyBlock({})", 50)...), testutil.RepeatTraceStrings("response(0)->executeTask(0)->processBlock({})->loadBlock(0)", blockChainLength)...), @@ -724,8 +724,8 @@ func TestPauseResume(t *testing.T) { require.Contains(t, traceStrings, "request(0)->newRequest(0)") require.Contains(t, traceStrings, "request(0)->executeTask(0)") require.Contains(t, traceStrings, "request(0)->terminateRequest(0)") - require.Contains(t, traceStrings, "responseMessage(0)->loaderProcess(0)->cacheProcess(0)") // should have one of these per response - require.Contains(t, traceStrings, "request(0)->verifyBlock(0)") // should have one of these per block + require.Contains(t, traceStrings, "processResponses(0)->loaderProcess(0)->cacheProcess(0)") // should have one of these per response + require.Contains(t, traceStrings, "request(0)->verifyBlock(0)") // should have one of these per block // pause recorded tracing.SingleExceptionEvent(t, "response(0)->executeTask(0)", "github.com/ipfs/go-graphsync/responsemanager/hooks.ErrPaused", hooks.ErrPaused{}.Error(), false) @@ -807,8 +807,8 @@ func TestPauseResumeRequest(t *testing.T) { require.Contains(t, traceStrings, "request(0)->executeTask(0)") require.Contains(t, traceStrings, "request(0)->executeTask(1)") require.Contains(t, traceStrings, "request(0)->terminateRequest(0)") - require.Contains(t, traceStrings, "responseMessage(0)->loaderProcess(0)->cacheProcess(0)") // should have one of these per response - require.Contains(t, traceStrings, "request(0)->verifyBlock(0)") // should have one of these per block + require.Contains(t, traceStrings, "processResponses(0)->loaderProcess(0)->cacheProcess(0)") // should have one of these per response + require.Contains(t, traceStrings, "request(0)->verifyBlock(0)") // should have one of these per block // has ErrPaused exception recorded in the right place tracing.SingleExceptionEvent(t, "request(0)->executeTask(0)", "ErrPaused", hooks.ErrPaused{}.Error(), false) @@ -888,7 +888,7 @@ func TestPauseResumeViaUpdate(t *testing.T) { "request(0)->executeTask(0)", "request(0)->terminateRequest(0)", }, - responseMessageTraces(t, tracing, responseCount)...), + processResponsesTraces(t, tracing, responseCount)...), testutil.RepeatTraceStrings("message({})->sendMessage(0)", responseCount+2)...), testutil.RepeatTraceStrings("request(0)->verifyBlock({})", blockChainLength)...), testutil.RepeatTraceStrings("response(0)->executeTask(0)->processBlock({})->loadBlock(0)", 50)...), @@ -981,7 +981,7 @@ func TestPauseResumeViaUpdateOnBlockHook(t *testing.T) { "request(0)->executeTask(0)", "request(0)->terminateRequest(0)", }, - responseMessageTraces(t, tracing, responseCount)...), + processResponsesTraces(t, tracing, responseCount)...), testutil.RepeatTraceStrings("message({})->sendMessage(0)", responseCount+2)...), testutil.RepeatTraceStrings("request(0)->verifyBlock({})", blockChainLength)...), testutil.RepeatTraceStrings("response(0)->executeTask(0)->processBlock({})->loadBlock(0)", 50)...), @@ -1082,8 +1082,8 @@ func TestNetworkDisconnect(t *testing.T) { require.Contains(t, traceStrings, "request(0)->newRequest(0)") require.Contains(t, traceStrings, "request(0)->executeTask(0)") require.Contains(t, traceStrings, "request(0)->terminateRequest(0)") - require.Contains(t, traceStrings, "responseMessage(0)->loaderProcess(0)->cacheProcess(0)") // should have one of these per response - require.Contains(t, traceStrings, "request(0)->verifyBlock(0)") // should have one of these per block + require.Contains(t, traceStrings, "processResponses(0)->loaderProcess(0)->cacheProcess(0)") // should have one of these per response + require.Contains(t, traceStrings, "request(0)->verifyBlock(0)") // should have one of these per block // has ContextCancelError exception recorded in the right place tracing.SingleExceptionEvent(t, "request(0)->executeTask(0)", "ContextCancelError", ipldutil.ContextCancelError{}.Error(), false) @@ -1223,8 +1223,8 @@ func TestGraphsyncRoundTripAlternatePersistenceAndNodes(t *testing.T) { require.Contains(t, traceStrings, "request(1)->newRequest(0)") require.Contains(t, traceStrings, "request(1)->executeTask(0)") require.Contains(t, traceStrings, "request(1)->terminateRequest(0)") - require.Contains(t, traceStrings, "responseMessage(0)->loaderProcess(0)->cacheProcess(0)") // should have one of these per response - require.Contains(t, traceStrings, "request(1)->verifyBlock(0)") // should have one of these per block (TODO: why request(1) and not (0)?) + require.Contains(t, traceStrings, "processResponses(0)->loaderProcess(0)->cacheProcess(0)") // should have one of these per response + require.Contains(t, traceStrings, "request(1)->verifyBlock(0)") // should have one of these per block (TODO: why request(1) and not (0)?) // TODO(rvagg): this is randomly either a SkipMe or a ipldutil.ContextCancelError; confirm this is sane // tracing.SingleExceptionEvent(t, "request(0)->newRequest(0)","request(0)->executeTask(0)", "SkipMe", traversal.SkipMe{}.Error(), true) @@ -1312,8 +1312,8 @@ func TestGraphsyncRoundTripMultipleAlternatePersistence(t *testing.T) { require.Contains(t, traceStrings, "request(1)->newRequest(0)") require.Contains(t, traceStrings, "request(1)->executeTask(0)") require.Contains(t, traceStrings, "request(1)->terminateRequest(0)") - require.Contains(t, traceStrings, "responseMessage(0)->loaderProcess(0)->cacheProcess(0)") // should have one of these per response - require.Contains(t, traceStrings, "request(0)->verifyBlock(0)") // should have one of these per block + require.Contains(t, traceStrings, "processResponses(0)->loaderProcess(0)->cacheProcess(0)") // should have one of these per response + require.Contains(t, traceStrings, "request(0)->verifyBlock(0)") // should have one of these per block } // TestRoundTripLargeBlocksSlowNetwork test verifies graphsync continues to work @@ -1365,7 +1365,7 @@ func TestRoundTripLargeBlocksSlowNetwork(t *testing.T) { "request(0)->executeTask(0)", "request(0)->terminateRequest(0)", }, - responseMessageTraces(t, tracing, responseCount)...), + processResponsesTraces(t, tracing, responseCount)...), testutil.RepeatTraceStrings("message({})->sendMessage(0)", responseCount+1)...), testutil.RepeatTraceStrings("request(0)->verifyBlock({})", blockChainLength)...), testutil.RepeatTraceStrings("response(0)->executeTask(0)->processBlock({})->loadBlock(0)", blockChainLength)...), @@ -1506,8 +1506,8 @@ func TestUnixFSFetch(t *testing.T) { require.Contains(t, traceStrings, "request(0)->newRequest(0)") require.Contains(t, traceStrings, "request(0)->executeTask(0)") require.Contains(t, traceStrings, "request(0)->terminateRequest(0)") - require.Contains(t, traceStrings, "responseMessage(0)->loaderProcess(0)->cacheProcess(0)") // should have one of these per response - require.Contains(t, traceStrings, "request(0)->verifyBlock(0)") // should have one of these per block + require.Contains(t, traceStrings, "processResponses(0)->loaderProcess(0)->cacheProcess(0)") // should have one of these per response + require.Contains(t, traceStrings, "request(0)->verifyBlock(0)") // should have one of these per block } func TestGraphsyncBlockListeners(t *testing.T) { @@ -1605,7 +1605,7 @@ func TestGraphsyncBlockListeners(t *testing.T) { "request(0)->executeTask(0)", "request(0)->terminateRequest(0)", }, - responseMessageTraces(t, tracing, responseCount)...), + processResponsesTraces(t, tracing, responseCount)...), testutil.RepeatTraceStrings("message({})->sendMessage(0)", responseCount+1)...), testutil.RepeatTraceStrings("request(0)->verifyBlock({})", blockChainLength)...), testutil.RepeatTraceStrings("response(0)->executeTask(0)->processBlock({})->loadBlock(0)", blockChainLength)...), @@ -1766,12 +1766,12 @@ func (r *receiver) Connected(p peer.ID) { func (r *receiver) Disconnected(p peer.ID) { } -func responseMessageTraces(t *testing.T, tracing *testutil.Collector, responseCount int) []string { - traces := testutil.RepeatTraceStrings("responseMessage({})->loaderProcess(0)->cacheProcess(0)", responseCount-1) - finalStub := tracing.FindSpanByTraceString(fmt.Sprintf("responseMessage(%d)->loaderProcess(0)", responseCount-1)) +func processResponsesTraces(t *testing.T, tracing *testutil.Collector, responseCount int) []string { + traces := testutil.RepeatTraceStrings("processResponses({})->loaderProcess(0)->cacheProcess(0)", responseCount-1) + finalStub := tracing.FindSpanByTraceString(fmt.Sprintf("processResponses(%d)->loaderProcess(0)", responseCount-1)) require.NotNil(t, finalStub) if len(testutil.AttributeValueInTraceSpan(t, *finalStub, "requestIDs").AsInt64Slice()) == 0 { - return append(traces, fmt.Sprintf("responseMessage(%d)->loaderProcess(0)", responseCount-1)) + return append(traces, fmt.Sprintf("processResponses(%d)->loaderProcess(0)", responseCount-1)) } - return append(traces, fmt.Sprintf("responseMessage(%d)->loaderProcess(0)->cacheProcess(0)", responseCount-1)) + return append(traces, fmt.Sprintf("processResponses(%d)->loaderProcess(0)->cacheProcess(0)", responseCount-1)) } diff --git a/requestmanager/client.go b/requestmanager/client.go index 40fc9be2..c18d7bfe 100644 --- a/requestmanager/client.go +++ b/requestmanager/client.go @@ -290,7 +290,7 @@ func (rm *RequestManager) CancelRequest(ctx context.Context, requestID graphsync // and updates the in progress requests based on those responses. func (rm *RequestManager) ProcessResponses(p peer.ID, responses []gsmsg.GraphSyncResponse, blks []blocks.Block) { - rm.send(&processResponseMessage{p, responses, blks}, nil) + rm.send(&processResponsesMessage{p, responses, blks}, nil) } // UnpauseRequest unpauses a request that was paused in a block hook based request ID diff --git a/requestmanager/messages.go b/requestmanager/messages.go index dd5acf3c..ce219f3a 100644 --- a/requestmanager/messages.go +++ b/requestmanager/messages.go @@ -40,14 +40,14 @@ func (urm *unpauseRequestMessage) handle(rm *RequestManager) { } } -type processResponseMessage struct { +type processResponsesMessage struct { p peer.ID responses []gsmsg.GraphSyncResponse blks []blocks.Block } -func (prm *processResponseMessage) handle(rm *RequestManager) { - rm.processResponseMessage(prm.p, prm.responses, prm.blks) +func (prm *processResponsesMessage) handle(rm *RequestManager) { + rm.processResponses(prm.p, prm.responses, prm.blks) } type cancelRequestMessage struct { diff --git a/requestmanager/server.go b/requestmanager/server.go index 07c624d0..f8cc47db 100644 --- a/requestmanager/server.go +++ b/requestmanager/server.go @@ -262,13 +262,13 @@ func (rm *RequestManager) cancelOnError(requestID graphsync.RequestID, ipr *inPr } } -func (rm *RequestManager) processResponseMessage(p peer.ID, responses []gsmsg.GraphSyncResponse, blks []blocks.Block) { - log.Debugf("beging rocessing message for peer %s", p) +func (rm *RequestManager) processResponses(p peer.ID, responses []gsmsg.GraphSyncResponse, blks []blocks.Block) { + log.Debugf("beginning processing responses for peer %s", p) requestIds := make([]int, 0, len(responses)) for _, r := range responses { requestIds = append(requestIds, int(r.RequestID())) } - ctx, span := otel.Tracer("graphsync").Start(rm.ctx, "responseMessage", trace.WithAttributes( + ctx, span := otel.Tracer("graphsync").Start(rm.ctx, "processResponses", trace.WithAttributes( attribute.String("peerID", p.Pretty()), attribute.IntSlice("requestIDs", requestIds), )) @@ -279,7 +279,7 @@ func (rm *RequestManager) processResponseMessage(p peer.ID, responses []gsmsg.Gr responseMetadata := metadataForResponses(filteredResponses) rm.asyncLoader.ProcessResponse(ctx, responseMetadata, blks) rm.processTerminations(filteredResponses) - log.Debugf("end processing message for peer %s", p) + log.Debugf("end processing responses for peer %s", p) } func (rm *RequestManager) filterResponsesForPeer(responses []gsmsg.GraphSyncResponse, p peer.ID) []gsmsg.GraphSyncResponse {