diff --git a/impl/graphsync_test.go b/impl/graphsync_test.go index f493cac8..54d45ea4 100644 --- a/impl/graphsync_test.go +++ b/impl/graphsync_test.go @@ -97,6 +97,7 @@ func TestMakeRequestToNetwork(t *testing.T) { "request(0)->newRequest(0)", "request(0)->executeTask(0)", "request(0)->terminateRequest(0)", + "message(0)->sendMessage(0)", }, tracing.TracesToStrings()) // make sure the attributes are what we expect @@ -201,11 +202,13 @@ func TestRejectRequestsByDefault(t *testing.T) { tracing := collectTracing(t) require.ElementsMatch(t, []string{ - "response(0)", "request(0)->newRequest(0)", "request(0)->executeTask(0)", "request(0)->terminateRequest(0)", "responseMessage(0)->loaderProcess(0)->cacheProcess(0)", + "response(0)->transaction(0)->execute(0)->buildMessage(0)", + "message(0)->sendMessage(0)", + "message(1)->sendMessage(0)", }, tracing.TracesToStrings()) // has ContextCancelError exception recorded in the right place tracing.SingleExceptionEvent(t, "request(0)->executeTask(0)", "ContextCancelError", ipldutil.ContextCancelError{}.Error(), false) @@ -247,7 +250,8 @@ func TestGraphsyncRoundTripRequestBudgetRequestor(t *testing.T) { tracing := collectTracing(t) traceStrings := tracing.TracesToStrings() - require.Contains(t, traceStrings, "response(0)->executeTask(0)") + require.Contains(t, traceStrings, "response(0)->executeTask(0)->processBlock(0)->loadBlock(0)") + require.Contains(t, traceStrings, "response(0)->executeTask(0)->processBlock(0)->sendBlock(0)->processBlockHooks(0)") if wasCancelled { require.Contains(t, traceStrings, "response(0)->abortRequest(0)") } @@ -299,7 +303,8 @@ func TestGraphsyncRoundTripRequestBudgetResponder(t *testing.T) { tracing := collectTracing(t) traceStrings := tracing.TracesToStrings() - require.Contains(t, traceStrings, "response(0)->executeTask(0)") + require.Contains(t, traceStrings, "response(0)->executeTask(0)->processBlock(0)->loadBlock(0)") + require.Contains(t, traceStrings, "response(0)->executeTask(0)->processBlock(0)->sendBlock(0)->processBlockHooks(0)") require.Contains(t, traceStrings, "request(0)->newRequest(0)") require.Contains(t, traceStrings, "request(0)->executeTask(0)") require.Contains(t, traceStrings, "request(0)->terminateRequest(0)") @@ -333,11 +338,9 @@ func TestGraphsyncRoundTrip(t *testing.T) { var receivedResponseData []byte var receivedRequestData []byte - var responseCount int requestor.RegisterIncomingResponseHook( func(p peer.ID, responseData graphsync.ResponseData, hookActions graphsync.IncomingResponseHookActions) { - responseCount = responseCount + 1 data, has := responseData.Extension(td.extensionName) if has { receivedResponseData = data @@ -383,7 +386,8 @@ func TestGraphsyncRoundTrip(t *testing.T) { tracing := collectTracing(t) traceStrings := tracing.TracesToStrings() - require.Contains(t, traceStrings, "response(0)->executeTask(0)") + require.Contains(t, traceStrings, "response(0)->executeTask(0)->processBlock(0)->loadBlock(0)") + require.Contains(t, traceStrings, "response(0)->executeTask(0)->processBlock(0)->sendBlock(0)->processBlockHooks(0)") require.Contains(t, traceStrings, "request(0)->newRequest(0)") require.Contains(t, traceStrings, "request(0)->executeTask(0)") require.Contains(t, traceStrings, "request(0)->terminateRequest(0)") @@ -479,7 +483,8 @@ func TestGraphsyncRoundTripPartial(t *testing.T) { tracing := collectTracing(t) traceStrings := tracing.TracesToStrings() - require.Contains(t, traceStrings, "response(0)->executeTask(0)") + require.Contains(t, traceStrings, "response(0)->executeTask(0)->processBlock(0)->loadBlock(0)") + require.Contains(t, traceStrings, "response(0)->executeTask(0)->processBlock(0)->sendBlock(0)->processBlockHooks(0)") require.Contains(t, traceStrings, "request(0)->newRequest(0)") require.Contains(t, traceStrings, "request(0)->executeTask(0)") require.Contains(t, traceStrings, "request(0)->terminateRequest(0)") @@ -499,10 +504,7 @@ func TestGraphsyncRoundTripIgnoreCids(t *testing.T) { // initialize graphsync on first node to make requests requestor := td.GraphSyncHost1() - var responseCount int - requestor.RegisterIncomingResponseHook(func(p peer.ID, response graphsync.ResponseData, hookActions graphsync.IncomingResponseHookActions) { - responseCount = responseCount + 1 - }) + assertAllResponsesReceived := assertAllResponsesReceivedFunction(requestor) // setup receiving peer to just record message coming in blockChainLength := 100 @@ -546,16 +548,20 @@ func TestGraphsyncRoundTripIgnoreCids(t *testing.T) { drain(requestor) drain(responder) assertComplete(ctx, t) + responseCount := assertAllResponsesReceived(ctx, t) tracing := collectTracing(t) - require.ElementsMatch(t, append(append([]string{ - "response(0)->executeTask(0)", + require.ElementsMatch(t, append(append(append(append(append(append([]string{ "request(0)->newRequest(0)", "request(0)->executeTask(0)", "request(0)->terminateRequest(0)", }, responseMessageTraces(t, tracing, responseCount)...), - testutil.RepeatTraceStrings("request(0)->verifyBlock({})", 50)..., // half of the full chain + testutil.RepeatTraceStrings("message({})->sendMessage(0)", responseCount+1)...), + testutil.RepeatTraceStrings("request(0)->verifyBlock({})", 50)...), // half of the full chain + testutil.RepeatTraceStrings("response(0)->executeTask(0)->processBlock({})->loadBlock(0)", blockChainLength)...), + testutil.RepeatTraceStrings("response(0)->executeTask(0)->processBlock({})->sendBlock(0)->processBlockHooks(0)", blockChainLength)...), + testutil.RepeatTraceStrings("response(0)->transaction({})->execute(0)->buildMessage(0)", blockChainLength+2)..., ), tracing.TracesToStrings()) } @@ -570,10 +576,7 @@ func TestGraphsyncRoundTripIgnoreNBlocks(t *testing.T) { // initialize graphsync on first node to make requests requestor := td.GraphSyncHost1() - var responseCount int - requestor.RegisterIncomingResponseHook(func(p peer.ID, response graphsync.ResponseData, hookActions graphsync.IncomingResponseHookActions) { - responseCount = responseCount + 1 - }) + assertAllResponsesReceived := assertAllResponsesReceivedFunction(requestor) // setup receiving peer to just record message coming in blockChainLength := 100 @@ -619,16 +622,20 @@ func TestGraphsyncRoundTripIgnoreNBlocks(t *testing.T) { drain(requestor) drain(responder) assertComplete(ctx, t) + responseCount := assertAllResponsesReceived(ctx, t) tracing := collectTracing(t) - require.ElementsMatch(t, append(append([]string{ - "response(0)->executeTask(0)", + require.ElementsMatch(t, append(append(append(append(append(append([]string{ "request(0)->newRequest(0)", "request(0)->executeTask(0)", "request(0)->terminateRequest(0)", }, responseMessageTraces(t, tracing, responseCount)...), - testutil.RepeatTraceStrings("request(0)->verifyBlock({})", 50)..., // half of the full chain + testutil.RepeatTraceStrings("message({})->sendMessage(0)", responseCount+1)...), + testutil.RepeatTraceStrings("request(0)->verifyBlock({})", 50)...), + testutil.RepeatTraceStrings("response(0)->executeTask(0)->processBlock({})->loadBlock(0)", blockChainLength)...), + testutil.RepeatTraceStrings("response(0)->executeTask(0)->processBlock({})->sendBlock(0)->processBlockHooks(0)", blockChainLength)...), + testutil.RepeatTraceStrings("response(0)->transaction({})->execute(0)->buildMessage(0)", blockChainLength+2)..., ), tracing.TracesToStrings()) } @@ -710,8 +717,10 @@ func TestPauseResume(t *testing.T) { tracing := collectTracing(t) traceStrings := tracing.TracesToStrings() - require.Contains(t, traceStrings, "response(0)->executeTask(0)") - require.Contains(t, traceStrings, "response(0)->executeTask(1)") + require.Contains(t, traceStrings, "response(0)->executeTask(0)->processBlock(0)->loadBlock(0)") + require.Contains(t, traceStrings, "response(0)->executeTask(0)->processBlock(0)->sendBlock(0)->processBlockHooks(0)") + require.Contains(t, traceStrings, "response(0)->executeTask(1)->processBlock(0)->loadBlock(0)") + require.Contains(t, traceStrings, "response(0)->executeTask(1)->processBlock(0)->sendBlock(0)->processBlockHooks(0)") require.Contains(t, traceStrings, "request(0)->newRequest(0)") require.Contains(t, traceStrings, "request(0)->executeTask(0)") require.Contains(t, traceStrings, "request(0)->terminateRequest(0)") @@ -787,10 +796,12 @@ func TestPauseResumeRequest(t *testing.T) { tracing := collectTracing(t) traceStrings := tracing.TracesToStrings() - require.Contains(t, traceStrings, "response(0)->executeTask(0)") + require.Contains(t, traceStrings, "response(0)->executeTask(0)->processBlock(0)->loadBlock(0)") + require.Contains(t, traceStrings, "response(0)->executeTask(0)->processBlock(0)->sendBlock(0)->processBlockHooks(0)") if wasCancelled { require.Contains(t, traceStrings, "response(0)->abortRequest(0)") - require.Contains(t, traceStrings, "response(1)->executeTask(0)") + require.Contains(t, traceStrings, "response(1)->executeTask(0)->processBlock(0)->loadBlock(0)") + require.Contains(t, traceStrings, "response(1)->executeTask(0)->processBlock(0)->sendBlock(0)->processBlockHooks(0)") } require.Contains(t, traceStrings, "request(0)->newRequest(0)") require.Contains(t, traceStrings, "request(0)->executeTask(0)") @@ -816,10 +827,9 @@ func TestPauseResumeViaUpdate(t *testing.T) { var receivedUpdateData []byte // initialize graphsync on first node to make requests requestor := td.GraphSyncHost1() - var responseCount int + assertAllResponsesReceived := assertAllResponsesReceivedFunction(requestor) requestor.RegisterIncomingResponseHook(func(p peer.ID, response graphsync.ResponseData, hookActions graphsync.IncomingResponseHookActions) { - responseCount = responseCount + 1 if response.Status() == graphsync.RequestPaused { var has bool receivedReponseData, has = response.Extension(td.extensionName) @@ -869,18 +879,23 @@ func TestPauseResumeViaUpdate(t *testing.T) { drain(requestor) drain(responder) assertComplete(ctx, t) + responseCount := assertAllResponsesReceived(ctx, t) tracing := collectTracing(t) - require.ElementsMatch(t, append(append([]string{ - "response(0)->executeTask(0)", + require.ElementsMatch(t, append(append(append(append(append(append(append(append([]string{ "response(0)->processUpdate(0)", - "response(0)->executeTask(1)", "request(0)->newRequest(0)", "request(0)->executeTask(0)", "request(0)->terminateRequest(0)", }, responseMessageTraces(t, tracing, responseCount)...), - testutil.RepeatTraceStrings("request(0)->verifyBlock({})", blockChainLength)..., + testutil.RepeatTraceStrings("message({})->sendMessage(0)", responseCount+2)...), + testutil.RepeatTraceStrings("request(0)->verifyBlock({})", blockChainLength)...), + testutil.RepeatTraceStrings("response(0)->executeTask(0)->processBlock({})->loadBlock(0)", 50)...), + testutil.RepeatTraceStrings("response(0)->executeTask(0)->processBlock({})->sendBlock(0)->processBlockHooks(0)", 50)...), // half of the full chain + testutil.RepeatTraceStrings("response(0)->executeTask(1)->processBlock({})->loadBlock(0)", 50)...), + testutil.RepeatTraceStrings("response(0)->executeTask(1)->processBlock({})->sendBlock(0)->processBlockHooks(0)", 50)...), // half of the full chain + testutil.RepeatTraceStrings("response(0)->transaction({})->execute(0)->buildMessage(0)", blockChainLength+3)..., ), tracing.TracesToStrings()) // make sure the attributes are what we expect processUpdateSpan := tracing.FindSpanByTraceString("response(0)->processUpdate(0)") @@ -903,10 +918,7 @@ func TestPauseResumeViaUpdateOnBlockHook(t *testing.T) { // initialize graphsync on first node to make requests requestor := td.GraphSyncHost1() - var responseCount int - requestor.RegisterIncomingResponseHook(func(p peer.ID, responseData graphsync.ResponseData, hookActions graphsync.IncomingResponseHookActions) { - responseCount = responseCount + 1 - }) + assertAllResponsesReceived := assertAllResponsesReceivedFunction(requestor) // setup receiving peer to just record message coming in blockChainLength := 100 @@ -960,18 +972,23 @@ func TestPauseResumeViaUpdateOnBlockHook(t *testing.T) { drain(requestor) drain(responder) assertComplete(ctx, t) + responseCount := assertAllResponsesReceived(ctx, t) tracing := collectTracing(t) - require.ElementsMatch(t, append(append([]string{ - "response(0)->executeTask(0)", + require.ElementsMatch(t, append(append(append(append(append(append(append(append([]string{ "response(0)->processUpdate(0)", - "response(0)->executeTask(1)", "request(0)->newRequest(0)", "request(0)->executeTask(0)", "request(0)->terminateRequest(0)", }, responseMessageTraces(t, tracing, responseCount)...), - testutil.RepeatTraceStrings("request(0)->verifyBlock({})", blockChainLength)..., + testutil.RepeatTraceStrings("message({})->sendMessage(0)", responseCount+2)...), + testutil.RepeatTraceStrings("request(0)->verifyBlock({})", blockChainLength)...), + testutil.RepeatTraceStrings("response(0)->executeTask(0)->processBlock({})->loadBlock(0)", 50)...), + testutil.RepeatTraceStrings("response(0)->executeTask(0)->processBlock({})->sendBlock(0)->processBlockHooks(0)", 50)...), // half of the full chain + testutil.RepeatTraceStrings("response(0)->executeTask(1)->processBlock({})->loadBlock(0)", 50)...), + testutil.RepeatTraceStrings("response(0)->executeTask(1)->processBlock({})->sendBlock(0)->processBlockHooks(0)", 50)...), // half of the full chain + testutil.RepeatTraceStrings("response(0)->transaction({})->execute(0)->buildMessage(0)", blockChainLength+3)..., ), tracing.TracesToStrings()) // make sure the attributes are what we expect processUpdateSpan := tracing.FindSpanByTraceString("response(0)->processUpdate(0)") @@ -1057,9 +1074,11 @@ func TestNetworkDisconnect(t *testing.T) { tracing := collectTracing(t) traceStrings := tracing.TracesToStrings() - require.Contains(t, traceStrings, "response(0)->executeTask(0)") + require.Contains(t, traceStrings, "response(0)->executeTask(0)->processBlock(0)->loadBlock(0)") + require.Contains(t, traceStrings, "response(0)->executeTask(0)->processBlock(0)->sendBlock(0)->processBlockHooks(0)") require.Contains(t, traceStrings, "response(0)->abortRequest(0)") - require.Contains(t, traceStrings, "response(0)->executeTask(1)") + require.Contains(t, traceStrings, "response(0)->executeTask(1)->processBlock(0)->loadBlock(0)") + require.Contains(t, traceStrings, "response(0)->executeTask(1)->processBlock(0)->sendBlock(0)->processBlockHooks(0)") require.Contains(t, traceStrings, "request(0)->newRequest(0)") require.Contains(t, traceStrings, "request(0)->executeTask(0)") require.Contains(t, traceStrings, "request(0)->terminateRequest(0)") @@ -1109,11 +1128,11 @@ func TestConnectFail(t *testing.T) { drain(requestor) tracing := collectTracing(t) - require.ElementsMatch(t, []string{ - "request(0)->newRequest(0)", - "request(0)->executeTask(0)", - "request(0)->terminateRequest(0)", - }, tracing.TracesToStrings()) + traceStrings := tracing.TracesToStrings() + require.Contains(t, traceStrings, "request(0)->newRequest(0)") + require.Contains(t, traceStrings, "request(0)->executeTask(0)") + require.Contains(t, traceStrings, "request(0)->terminateRequest(0)") + require.Contains(t, traceStrings, "message(0)->sendMessage(0)") // has ContextCancelError exception recorded in the right place tracing.SingleExceptionEvent(t, "request(0)->executeTask(0)", "ContextCancelError", ipldutil.ContextCancelError{}.Error(), false) } @@ -1192,7 +1211,8 @@ func TestGraphsyncRoundTripAlternatePersistenceAndNodes(t *testing.T) { tracing := collectTracing(t) traceStrings := tracing.TracesToStrings() - require.Contains(t, traceStrings, "response(0)->executeTask(0)") + require.Contains(t, traceStrings, "response(0)->executeTask(0)->processBlock(0)->loadBlock(0)") + require.Contains(t, traceStrings, "response(0)->executeTask(0)->processBlock(0)->sendBlock(0)") // may or may not contain a second response trace: "response(1)->executeTask(0)"" if wasCancelled { require.Contains(t, traceStrings, "response(0)->abortRequest(0)") @@ -1283,7 +1303,8 @@ func TestGraphsyncRoundTripMultipleAlternatePersistence(t *testing.T) { tracing := collectTracing(t) // two complete request traces expected traceStrings := tracing.TracesToStrings() - require.Contains(t, traceStrings, "response(0)->executeTask(0)") + require.Contains(t, traceStrings, "response(0)->executeTask(0)->processBlock(0)->loadBlock(0)") + require.Contains(t, traceStrings, "response(0)->executeTask(0)->processBlock(0)->sendBlock(0)->processBlockHooks(0)") // may or may not contain a second response "response(1)->executeTask(0)" require.Contains(t, traceStrings, "request(0)->newRequest(0)") require.Contains(t, traceStrings, "request(0)->executeTask(0)") @@ -1318,12 +1339,7 @@ func TestRoundTripLargeBlocksSlowNetwork(t *testing.T) { // initialize graphsync on first node to make requests requestor := td.GraphSyncHost1() - - var responseCount int - requestor.RegisterIncomingResponseHook(func(p peer.ID, responseData graphsync.ResponseData, hookActions graphsync.IncomingResponseHookActions) { - responseCount = responseCount + 1 - }) - + assertAllResponsesReceived := assertAllResponsesReceivedFunction(requestor) // setup receiving peer to just record message coming in blockChainLength := 40 blockChainPersistence := td.persistence1 @@ -1341,16 +1357,20 @@ func TestRoundTripLargeBlocksSlowNetwork(t *testing.T) { drain(requestor) drain(responder) assertComplete(ctx, t) + responseCount := assertAllResponsesReceived(ctx, t) tracing := collectTracing(t) - require.ElementsMatch(t, append(append([]string{ - "response(0)->executeTask(0)", + require.ElementsMatch(t, append(append(append(append(append(append([]string{ "request(0)->newRequest(0)", "request(0)->executeTask(0)", "request(0)->terminateRequest(0)", }, responseMessageTraces(t, tracing, responseCount)...), - testutil.RepeatTraceStrings("request(0)->verifyBlock({})", blockChainLength)..., + testutil.RepeatTraceStrings("message({})->sendMessage(0)", responseCount+1)...), + testutil.RepeatTraceStrings("request(0)->verifyBlock({})", blockChainLength)...), + testutil.RepeatTraceStrings("response(0)->executeTask(0)->processBlock({})->loadBlock(0)", blockChainLength)...), + testutil.RepeatTraceStrings("response(0)->executeTask(0)->processBlock({})->sendBlock(0)->processBlockHooks(0)", blockChainLength)...), + testutil.RepeatTraceStrings("response(0)->transaction({})->execute(0)->buildMessage(0)", blockChainLength+2)..., ), tracing.TracesToStrings()) } @@ -1481,7 +1501,8 @@ func TestUnixFSFetch(t *testing.T) { tracing := collectTracing(t) traceStrings := tracing.TracesToStrings() - require.Contains(t, traceStrings, "response(0)->executeTask(0)") + require.Contains(t, traceStrings, "response(0)->executeTask(0)->processBlock(0)->loadBlock(0)") + require.Contains(t, traceStrings, "response(0)->executeTask(0)->processBlock(0)->sendBlock(0)->processBlockHooks(0)") require.Contains(t, traceStrings, "request(0)->newRequest(0)") require.Contains(t, traceStrings, "request(0)->executeTask(0)") require.Contains(t, traceStrings, "request(0)->terminateRequest(0)") @@ -1525,16 +1546,15 @@ func TestGraphsyncBlockListeners(t *testing.T) { var receivedResponseData []byte var receivedRequestData []byte - var responseCount int requestor.RegisterIncomingResponseHook( func(p peer.ID, responseData graphsync.ResponseData, hookActions graphsync.IncomingResponseHookActions) { - responseCount = responseCount + 1 data, has := responseData.Extension(td.extensionName) if has { receivedResponseData = data } }) + assertAllResponsesReceived := assertAllResponsesReceivedFunction(requestor) responder.RegisterIncomingRequestHook(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) { var has bool @@ -1576,17 +1596,21 @@ func TestGraphsyncBlockListeners(t *testing.T) { drain(requestor) drain(responder) assertComplete(ctx, t) + responseCount := assertAllResponsesReceived(ctx, t) tracing := collectTracing(t) - require.ElementsMatch(t, append(append( + require.ElementsMatch(t, append(append(append(append(append(append( []string{ - "response(0)->executeTask(0)", "request(0)->newRequest(0)", "request(0)->executeTask(0)", "request(0)->terminateRequest(0)", }, responseMessageTraces(t, tracing, responseCount)...), - testutil.RepeatTraceStrings("request(0)->verifyBlock({})", 100)..., + testutil.RepeatTraceStrings("message({})->sendMessage(0)", responseCount+1)...), + testutil.RepeatTraceStrings("request(0)->verifyBlock({})", blockChainLength)...), + testutil.RepeatTraceStrings("response(0)->executeTask(0)->processBlock({})->loadBlock(0)", blockChainLength)...), + testutil.RepeatTraceStrings("response(0)->executeTask(0)->processBlock({})->sendBlock(0)->processBlockHooks(0)", blockChainLength)...), + testutil.RepeatTraceStrings("response(0)->transaction({})->execute(0)->buildMessage(0)", blockChainLength+2)..., ), tracing.TracesToStrings()) } @@ -1613,6 +1637,24 @@ func drain(gs graphsync.GraphExchange) { gs.(*GraphSync).responseQueue.(*taskqueue.WorkerTaskQueue).WaitForNoActiveTasks() } +func assertAllResponsesReceivedFunction(gs graphsync.GraphExchange) func(context.Context, *testing.T) int { + var responseCount int + finalResponseStatusChanRequestor := make(chan graphsync.ResponseStatusCode, 1) + gs.RegisterIncomingResponseHook(func(p peer.ID, response graphsync.ResponseData, hookActions graphsync.IncomingResponseHookActions) { + responseCount = responseCount + 1 + if response.Status().IsTerminal() { + select { + case finalResponseStatusChanRequestor <- response.Status(): + default: + } + } + }) + return func(ctx context.Context, t *testing.T) int { + testutil.AssertDoesReceive(ctx, t, finalResponseStatusChanRequestor, "final response never received") + return responseCount + } +} + func assertCompletionFunction(gs graphsync.GraphExchange, completedRequestCount int) func(context.Context, *testing.T) { completedResponse := make(chan struct{}, completedRequestCount) gs.RegisterCompletedResponseListener(func(p peer.ID, request graphsync.RequestData, status graphsync.ResponseStatusCode) { diff --git a/messagequeue/builder.go b/messagequeue/builder.go index fb794310..073f8e87 100644 --- a/messagequeue/builder.go +++ b/messagequeue/builder.go @@ -1,6 +1,7 @@ package messagequeue import ( + "context" "io" "github.com/ipfs/go-graphsync" @@ -11,6 +12,7 @@ import ( // Builder wraps a message builder with additional functions related to metadata // and notifications in the message queue type Builder struct { + ctx context.Context *gsmsg.Builder topic Topic responseStreams map[graphsync.RequestID]io.Closer @@ -19,8 +21,9 @@ type Builder struct { } // NewBuilder sets up a new builder for the given topic -func NewBuilder(topic Topic) *Builder { +func NewBuilder(ctx context.Context, topic Topic) *Builder { return &Builder{ + ctx: ctx, Builder: gsmsg.NewBuilder(), topic: topic, responseStreams: make(map[graphsync.RequestID]io.Closer), @@ -29,6 +32,10 @@ func NewBuilder(topic Topic) *Builder { } } +func (b *Builder) Context() context.Context { + return b.ctx +} + // SetResponseStream sets the given response stream to close should the message fail to send func (b *Builder) SetResponseStream(requestID graphsync.RequestID, stream io.Closer) { b.responseStreams[requestID] = stream @@ -82,6 +89,7 @@ func (b *Builder) build(publisher notifications.Publisher) (gsmsg.GraphSyncMessa BlockData: b.blockData, ResponseCodes: message.ResponseCodes(), }, + ctx: b.ctx, topic: b.topic, msgSize: b.BlockSize(), responseStreams: b.responseStreams, diff --git a/messagequeue/messagequeue.go b/messagequeue/messagequeue.go index cf3c02a3..6ddc0b2a 100644 --- a/messagequeue/messagequeue.go +++ b/messagequeue/messagequeue.go @@ -11,6 +11,10 @@ import ( "github.com/ipfs/go-graphsync" logging "github.com/ipfs/go-log/v2" "github.com/libp2p/go-libp2p-core/peer" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" gsmsg "github.com/ipfs/go-graphsync/message" gsnet "github.com/ipfs/go-graphsync/network" @@ -112,7 +116,10 @@ func (mq *MessageQueue) buildMessage(size uint64, buildMessageFn func(*Builder)) if shouldBeginNewResponse(mq.builders, size) { topic := mq.nextBuilderTopic mq.nextBuilderTopic++ - mq.builders = append(mq.builders, NewBuilder(topic)) + ctx, _ := otel.Tracer("graphsync").Start(mq.ctx, "message", trace.WithAttributes( + attribute.Int64("topic", int64(topic)), + )) + mq.builders = append(mq.builders, NewBuilder(ctx, topic)) } builder := mq.builders[len(mq.builders)-1] buildMessageFn(builder) @@ -156,7 +163,12 @@ func (mq *MessageQueue) runQueue() { for { _, metadata, err := mq.extractOutgoingMessage() if err == nil { - mq.publishError(metadata, fmt.Errorf("message queue shutdown")) + span := trace.SpanFromContext(metadata.ctx) + err := fmt.Errorf("message queue shutdown") + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + span.End() + mq.publishError(metadata, err) mq.eventPublisher.Close(metadata.topic) } else { break @@ -211,12 +223,20 @@ func (mq *MessageQueue) extractOutgoingMessage() (gsmsg.GraphSyncMessage, intern func (mq *MessageQueue) sendMessage() { message, metadata, err := mq.extractOutgoingMessage() + if err != nil { if err != errEmptyMessage { log.Errorf("Unable to assemble GraphSync message: %s", err.Error()) } return } + span := trace.SpanFromContext(metadata.ctx) + defer span.End() + _, sendSpan := otel.Tracer("graphsync").Start(metadata.ctx, "sendMessage", trace.WithAttributes( + attribute.Int64("topic", int64(metadata.topic)), + attribute.Int64("size", int64(metadata.msgSize)), + )) + defer sendSpan.End() mq.publishQueued(metadata) defer mq.eventPublisher.Close(metadata.topic) @@ -337,6 +357,7 @@ func openSender(ctx context.Context, network MessageNetwork, p peer.ID, sendTime } type internalMetadata struct { + ctx context.Context public Metadata topic Topic msgSize uint64 diff --git a/messagequeue/messagequeue_test.go b/messagequeue/messagequeue_test.go index ef7e69e5..60d5fb8b 100644 --- a/messagequeue/messagequeue_test.go +++ b/messagequeue/messagequeue_test.go @@ -183,6 +183,7 @@ func TestProcessingNotification(t *testing.T) { func TestDedupingMessages(t *testing.T) { ctx := context.Background() + ctx, collectTracing := testutil.SetupTracing(ctx) ctx, cancel := context.WithTimeout(ctx, 1*time.Second) defer cancel() @@ -251,6 +252,12 @@ func TestDedupingMessages(t *testing.T) { t.Fatal("incorrect request added to message") } } + + tracing := collectTracing(t) + require.ElementsMatch(t, []string{ + "message(0)->sendMessage(0)", + "message(1)->sendMessage(0)", + }, tracing.TracesToStrings()) } func TestSendsVeryLargeBlocksResponses(t *testing.T) { diff --git a/peermanager/peermessagemanager_test.go b/peermanager/peermessagemanager_test.go index 34f728bc..6d55c936 100644 --- a/peermanager/peermessagemanager_test.go +++ b/peermanager/peermessagemanager_test.go @@ -30,7 +30,7 @@ type fakePeer struct { } func (fp *fakePeer) AllocateAndBuildMessage(blkSize uint64, buildMessage func(b *messagequeue.Builder)) { - builder := messagequeue.NewBuilder(messagequeue.Topic(0)) + builder := messagequeue.NewBuilder(context.TODO(), messagequeue.Topic(0)) buildMessage(builder) message, err := builder.Build() if err != nil { diff --git a/requestmanager/requestmanager_test.go b/requestmanager/requestmanager_test.go index a02e36eb..ee5dcbf2 100644 --- a/requestmanager/requestmanager_test.go +++ b/requestmanager/requestmanager_test.go @@ -1015,7 +1015,7 @@ type fakePeerHandler struct { func (fph *fakePeerHandler) AllocateAndBuildMessage(p peer.ID, blkSize uint64, requestBuilder func(b *messagequeue.Builder)) { - builder := messagequeue.NewBuilder(messagequeue.Topic(0)) + builder := messagequeue.NewBuilder(context.TODO(), messagequeue.Topic(0)) requestBuilder(builder) message, err := builder.Build() if err != nil { diff --git a/responsemanager/client.go b/responsemanager/client.go index fc31595a..b76f36a8 100644 --- a/responsemanager/client.go +++ b/responsemanager/client.go @@ -85,7 +85,7 @@ type NetworkErrorListeners interface { // ResponseAssembler is an interface that returns sender interfaces for peer responses. type ResponseAssembler interface { - NewStream(p peer.ID, requestID graphsync.RequestID, subscriber notifications.Subscriber) responseassembler.ResponseStream + NewStream(ctx context.Context, p peer.ID, requestID graphsync.RequestID, subscriber notifications.Subscriber) responseassembler.ResponseStream } type responseManagerMessage interface { diff --git a/responsemanager/queryexecutor/queryexecutor.go b/responsemanager/queryexecutor/queryexecutor.go index 87a4708b..f43e19f9 100644 --- a/responsemanager/queryexecutor/queryexecutor.go +++ b/responsemanager/queryexecutor/queryexecutor.go @@ -11,6 +11,7 @@ import ( "github.com/ipld/go-ipld-prime/traversal" "github.com/libp2p/go-libp2p-core/peer" "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/trace" @@ -82,7 +83,7 @@ func New(ctx context.Context, // and uses the ResponseAssembler to build and send a response, while also triggering any of // the QueryExecutor's BlockHooks. Traversal continues until complete, or a signal or hook // suggests we should stop or pause. -func (qe *QueryExecutor) ExecuteTask(ctx context.Context, pid peer.ID, task *peertask.Task) bool { +func (qe *QueryExecutor) ExecuteTask(_ context.Context, pid peer.ID, task *peertask.Task) bool { // StartTask lets us block until this task is at the top of the execution stack responseTaskChan := make(chan ResponseTask) var rt ResponseTask @@ -97,11 +98,11 @@ func (qe *QueryExecutor) ExecuteTask(ctx context.Context, pid peer.ID, task *pee return false } - _, span := otel.Tracer("graphsync").Start(trace.ContextWithSpan(qe.ctx, rt.Span), "executeTask") + ctx, span := otel.Tracer("graphsync").Start(trace.ContextWithSpan(qe.ctx, rt.Span), "executeTask") defer span.End() log.Debugw("beginning response execution", "id", rt.Request.ID(), "peer", pid.String(), "root_cid", rt.Request.Root().String()) - err := qe.executeQuery(pid, rt) + err := qe.executeQuery(ctx, pid, rt) if err != nil { span.RecordError(err) if _, isPaused := err.(hooks.ErrPaused); !isPaused { @@ -114,10 +115,10 @@ func (qe *QueryExecutor) ExecuteTask(ctx context.Context, pid peer.ID, task *pee } func (qe *QueryExecutor) executeQuery( - p peer.ID, rt ResponseTask) error { + ctx context.Context, p peer.ID, rt ResponseTask) error { // Execute the traversal operation, continue until we have reason to stop (error, pause, complete) - err := qe.runTraversal(p, rt) + err := qe.runTraversal(ctx, p, rt) _, isPaused := err.(hooks.ErrPaused) if isPaused { @@ -180,7 +181,7 @@ func (qe *QueryExecutor) checkForUpdates( } } -func (qe *QueryExecutor) runTraversal(p peer.ID, taskData ResponseTask) error { +func (qe *QueryExecutor) runTraversal(ctx context.Context, p peer.ID, taskData ResponseTask) error { for { traverser := taskData.Traverser isComplete, err := traverser.IsComplete() @@ -195,26 +196,35 @@ func (qe *QueryExecutor) runTraversal(p peer.ID, taskData ResponseTask) error { } return err } - lnk, data, err := qe.nextBlock(taskData) + lnk, lnkCtx := taskData.Traverser.CurrentRequest() + ctx, span := otel.Tracer("graphsync").Start(ctx, "processBlock", trace.WithAttributes( + attribute.String("cid", lnk.String()), + )) + data, err := qe.loadBlock(ctx, taskData, lnk, lnkCtx) if err != nil { + span.End() return err } - err = qe.sendResponse(p, taskData, lnk, data) + err = qe.sendResponse(ctx, p, taskData, lnk, data) if err != nil { + span.End() return err } + span.End() } } -func (qe *QueryExecutor) nextBlock(taskData ResponseTask) (ipld.Link, []byte, error) { - lnk, lnkCtx := taskData.Traverser.CurrentRequest() +func (qe *QueryExecutor) loadBlock(ctx context.Context, taskData ResponseTask, lnk ipld.Link, lnkCtx ipld.LinkContext) ([]byte, error) { + _, span := otel.Tracer("graphsync").Start(ctx, "loadBlock") + defer span.End() + log.Debugf("will load link=%s", lnk) result, err := taskData.Loader(lnkCtx, lnk) if err != nil { log.Errorf("failed to load link=%s, nBlocksRead=%d, err=%s", lnk, taskData.Traverser.NBlocksTraversed(), err) taskData.Traverser.Error(traversal.SkipMe{}) - return lnk, nil, nil + return nil, nil } blockBuffer, ok := result.(*bytes.Buffer) @@ -224,22 +234,24 @@ func (qe *QueryExecutor) nextBlock(taskData ResponseTask) (ipld.Link, []byte, er if err != nil { log.Errorf("failed to write to buffer, link=%s, nBlocksRead=%d, err=%s", lnk, taskData.Traverser.NBlocksTraversed(), err) taskData.Traverser.Error(err) - return lnk, nil, err + return nil, err } } data := blockBuffer.Bytes() err = taskData.Traverser.Advance(blockBuffer) if err != nil { log.Errorf("failed to advance traversal, link=%s, nBlocksRead=%d, err=%s", lnk, taskData.Traverser.NBlocksTraversed(), err) - return lnk, data, err + return data, err } log.Debugf("successfully loaded link=%s, nBlocksRead=%d", lnk, taskData.Traverser.NBlocksTraversed()) - return lnk, data, nil + return data, nil } -func (qe *QueryExecutor) sendResponse(p peer.ID, taskData ResponseTask, link ipld.Link, data []byte) error { +func (qe *QueryExecutor) sendResponse(ctx context.Context, p peer.ID, taskData ResponseTask, link ipld.Link, data []byte) error { // Execute a transaction for this block, including any other queued operations return taskData.ResponseStream.Transaction(func(rb responseassembler.ResponseBuilder) error { + ctx, span := otel.Tracer("graphsync").Start(ctx, "sendBlock", trace.WithLinks(trace.LinkFromContext(rb.Context()))) + defer span.End() // Ensure that any updates that have occurred till now are integrated into the response err := qe.checkForUpdates(p, taskData, rb) // On any error other than a pause, we bail, if it's a pause then we continue processing _this_ block @@ -248,7 +260,9 @@ func (qe *QueryExecutor) sendResponse(p peer.ID, taskData ResponseTask, link ipl } blockData := rb.SendResponse(link, data) if blockData.BlockSize() > 0 { + _, span := otel.Tracer("graphsync").Start(ctx, "processBlockHooks") result := qe.blockHooks.ProcessBlockHooks(p, taskData.Request, blockData) + span.End() for _, extension := range result.Extensions { rb.SendExtensionData(extension) } diff --git a/responsemanager/queryexecutor/queryexecutor_test.go b/responsemanager/queryexecutor/queryexecutor_test.go index aefc736c..3711b5b9 100644 --- a/responsemanager/queryexecutor/queryexecutor_test.go +++ b/responsemanager/queryexecutor/queryexecutor_test.go @@ -437,6 +437,10 @@ func (rb fauxResponseBuilder) PauseRequest() { } } +func (rb fauxResponseBuilder) Context() context.Context { + return context.TODO() +} + var _ responseassembler.ResponseBuilder = &fauxResponseBuilder{} type blockData struct { diff --git a/responsemanager/responseassembler/responseBuilder.go b/responsemanager/responseassembler/responseBuilder.go index c4ddefcb..e395a871 100644 --- a/responsemanager/responseassembler/responseBuilder.go +++ b/responsemanager/responseassembler/responseBuilder.go @@ -1,6 +1,8 @@ package responseassembler import ( + "context" + blocks "github.com/ipfs/go-block-format" logging "github.com/ipfs/go-log/v2" "github.com/ipld/go-ipld-prime" @@ -18,6 +20,7 @@ type responseOperation interface { } type responseBuilder struct { + ctx context.Context requestID graphsync.RequestID operations []responseOperation linkTracker *peerLinkTracker @@ -47,6 +50,10 @@ func (rb *responseBuilder) PauseRequest() { rb.operations = append(rb.operations, statusOperation{rb.requestID, graphsync.RequestPaused}) } +func (rb *responseBuilder) Context() context.Context { + return rb.ctx +} + func (rb *responseBuilder) setupBlockOperation( link ipld.Link, data []byte) blockOperation { hasBlock := data != nil diff --git a/responsemanager/responseassembler/responseassembler.go b/responsemanager/responseassembler/responseassembler.go index ff2ca505..befa10a7 100644 --- a/responsemanager/responseassembler/responseassembler.go +++ b/responsemanager/responseassembler/responseassembler.go @@ -13,6 +13,8 @@ import ( "github.com/ipld/go-ipld-prime" "github.com/libp2p/go-libp2p-core/peer" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/trace" "github.com/ipfs/go-graphsync" "github.com/ipfs/go-graphsync/messagequeue" @@ -43,6 +45,9 @@ type ResponseBuilder interface { // PauseRequest temporarily halts responding to the request PauseRequest() + + // Context returns the execution context for this transaction + Context() context.Context } // PeerMessageHandler is an interface that can queue a response for a given peer to go out over the network @@ -68,8 +73,9 @@ func New(ctx context.Context, peerHandler PeerMessageHandler) *ResponseAssembler } } -func (ra *ResponseAssembler) NewStream(p peer.ID, requestID graphsync.RequestID, subscriber notifications.Subscriber) ResponseStream { +func (ra *ResponseAssembler) NewStream(ctx context.Context, p peer.ID, requestID graphsync.RequestID, subscriber notifications.Subscriber) ResponseStream { return &responseStream{ + ctx: ctx, requestID: requestID, p: p, messageSenders: ra.peerHandler, @@ -79,6 +85,7 @@ func (ra *ResponseAssembler) NewStream(p peer.ID, requestID graphsync.RequestID, } type responseStream struct { + ctx context.Context requestID graphsync.RequestID p peer.ID closed bool @@ -132,16 +139,22 @@ func (rs *responseStream) ClearRequest() { } func (rs *responseStream) Transaction(transaction Transaction) error { + ctx, span := otel.Tracer("graphsync").Start(rs.ctx, "transaction") + defer span.End() rb := &responseBuilder{ + ctx: ctx, requestID: rs.requestID, linkTracker: rs.linkTrackers.GetProcess(rs.p).(*peerLinkTracker), } err := transaction(rb) - rs.execute(rb.operations) + rs.execute(ctx, rb.operations) return err } -func (rs *responseStream) execute(operations []responseOperation) { +func (rs *responseStream) execute(ctx context.Context, operations []responseOperation) { + ctx, span := otel.Tracer("graphsync").Start(ctx, "execute") + defer span.End() + if rs.isClosed() { return } @@ -150,6 +163,9 @@ func (rs *responseStream) execute(operations []responseOperation) { size += op.size() } rs.messageSenders.AllocateAndBuildMessage(rs.p, size, func(builder *messagequeue.Builder) { + _, span = otel.Tracer("graphsync").Start(ctx, "buildMessage", trace.WithLinks(trace.LinkFromContext(builder.Context()))) + defer span.End() + if rs.isClosed() { return } diff --git a/responsemanager/responseassembler/responseassembler_test.go b/responsemanager/responseassembler/responseassembler_test.go index edce6a70..b9f26c6b 100644 --- a/responsemanager/responseassembler/responseassembler_test.go +++ b/responsemanager/responseassembler/responseassembler_test.go @@ -23,6 +23,7 @@ import ( func TestResponseAssemblerSendsResponses(t *testing.T) { ctx := context.Background() + ctx, collectTracing := testutil.SetupTracing(ctx) ctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() p := testutil.GeneratePeers(1)[0] @@ -41,11 +42,11 @@ func TestResponseAssemblerSendsResponses(t *testing.T) { var bd1, bd2 graphsync.BlockData sub1 := testutil.NewTestSubscriber(10) - stream1 := responseAssembler.NewStream(p, requestID1, sub1) + stream1 := responseAssembler.NewStream(ctx, p, requestID1, sub1) sub2 := testutil.NewTestSubscriber(10) - stream2 := responseAssembler.NewStream(p, requestID2, sub2) + stream2 := responseAssembler.NewStream(ctx, p, requestID2, sub2) sub3 := testutil.NewTestSubscriber(10) - stream3 := responseAssembler.NewStream(p, requestID3, sub3) + stream3 := responseAssembler.NewStream(ctx, p, requestID3, sub3) // send block 0 for request 1 require.NoError(t, stream1.Transaction(func(b ResponseBuilder) error { @@ -121,6 +122,16 @@ func TestResponseAssemblerSendsResponses(t *testing.T) { fph.AssertResponses(expectedResponses{requestID3: graphsync.PartialResponse}) fph.AssertSubscriber(requestID3, sub3) fph.AssertResponseStream(requestID3, stream3) + + tracing := collectTracing(t) + require.ElementsMatch(t, []string{ + "transaction(0)->execute(0)->buildMessage(0)", + "transaction(1)->execute(0)->buildMessage(0)", + "transaction(2)->execute(0)->buildMessage(0)", + "transaction(3)->execute(0)->buildMessage(0)", + "transaction(4)->execute(0)->buildMessage(0)", + "transaction(5)->execute(0)->buildMessage(0)", + }, tracing.TracesToStrings()) } func TestResponseAssemblerCloseStream(t *testing.T) { @@ -138,7 +149,7 @@ func TestResponseAssemblerCloseStream(t *testing.T) { responseAssembler := New(ctx, fph) sub1 := testutil.NewTestSubscriber(10) - stream1 := responseAssembler.NewStream(p, requestID1, sub1) + stream1 := responseAssembler.NewStream(ctx, p, requestID1, sub1) require.NoError(t, stream1.Transaction(func(b ResponseBuilder) error { b.SendResponse(links[0], blks[0].RawData()) return nil @@ -174,7 +185,7 @@ func TestResponseAssemblerSendsExtensionData(t *testing.T) { responseAssembler := New(ctx, fph) sub1 := testutil.NewTestSubscriber(10) - stream1 := responseAssembler.NewStream(p, requestID1, sub1) + stream1 := responseAssembler.NewStream(ctx, p, requestID1, sub1) require.NoError(t, stream1.Transaction(func(b ResponseBuilder) error { b.SendResponse(links[0], blks[0].RawData()) return nil @@ -220,7 +231,7 @@ func TestResponseAssemblerSendsResponsesInTransaction(t *testing.T) { fph := newFakePeerHandler(ctx, t) responseAssembler := New(ctx, fph) sub1 := testutil.NewTestSubscriber(10) - stream1 := responseAssembler.NewStream(p, requestID1, sub1) + stream1 := responseAssembler.NewStream(ctx, p, requestID1, sub1) var bd1, bd2, bd3 graphsync.BlockData err := stream1.Transaction(func(b ResponseBuilder) error { bd1 = b.SendResponse(links[0], blks[0].RawData()) @@ -260,9 +271,9 @@ func TestResponseAssemblerIgnoreBlocks(t *testing.T) { fph := newFakePeerHandler(ctx, t) responseAssembler := New(ctx, fph) sub1 := testutil.NewTestSubscriber(10) - stream1 := responseAssembler.NewStream(p, requestID1, sub1) + stream1 := responseAssembler.NewStream(ctx, p, requestID1, sub1) sub2 := testutil.NewTestSubscriber(10) - stream2 := responseAssembler.NewStream(p, requestID2, sub2) + stream2 := responseAssembler.NewStream(ctx, p, requestID2, sub2) stream1.IgnoreBlocks(links) @@ -336,9 +347,9 @@ func TestResponseAssemblerSkipFirstBlocks(t *testing.T) { responseAssembler := New(ctx, fph) sub1 := testutil.NewTestSubscriber(10) - stream1 := responseAssembler.NewStream(p, requestID1, sub1) + stream1 := responseAssembler.NewStream(ctx, p, requestID1, sub1) sub2 := testutil.NewTestSubscriber(10) - stream2 := responseAssembler.NewStream(p, requestID2, sub2) + stream2 := responseAssembler.NewStream(ctx, p, requestID2, sub2) stream1.SkipFirstBlocks(3) @@ -427,11 +438,11 @@ func TestResponseAssemblerDupKeys(t *testing.T) { fph := newFakePeerHandler(ctx, t) responseAssembler := New(ctx, fph) sub1 := testutil.NewTestSubscriber(10) - stream1 := responseAssembler.NewStream(p, requestID1, sub1) + stream1 := responseAssembler.NewStream(ctx, p, requestID1, sub1) sub2 := testutil.NewTestSubscriber(10) - stream2 := responseAssembler.NewStream(p, requestID2, sub2) + stream2 := responseAssembler.NewStream(ctx, p, requestID2, sub2) sub3 := testutil.NewTestSubscriber(10) - stream3 := responseAssembler.NewStream(p, requestID3, sub3) + stream3 := responseAssembler.NewStream(ctx, p, requestID3, sub3) stream1.DedupKey("applesauce") stream3.DedupKey("applesauce") @@ -619,7 +630,7 @@ func (fph *fakePeerHandler) RefuteResponses() { } func (fph *fakePeerHandler) AllocateAndBuildMessage(p peer.ID, blkSize uint64, buildMessageFn func(*messagequeue.Builder)) { - builder := messagequeue.NewBuilder(messagequeue.Topic(0)) + builder := messagequeue.NewBuilder(context.TODO(), messagequeue.Topic(0)) buildMessageFn(builder) msg, err := builder.Build() diff --git a/responsemanager/responsemanager_test.go b/responsemanager/responsemanager_test.go index eadbfadd..c88bb880 100644 --- a/responsemanager/responsemanager_test.go +++ b/responsemanager/responsemanager_test.go @@ -87,9 +87,10 @@ func TestIncomingQuery(t *testing.T) { td.connManager.RefuteProtected(t, td.p) tracing := td.collectTracing(t) - require.ElementsMatch(t, []string{ - "TestIncomingQuery(0)->response(0)->executeTask(0)", - }, tracing.TracesToStrings()) + require.ElementsMatch(t, append( + testutil.RepeatTraceStrings("TestIncomingQuery(0)->response(0)->executeTask(0)->processBlock({})->loadBlock(0)", td.blockChainLength), + testutil.RepeatTraceStrings("TestIncomingQuery(0)->response(0)->executeTask(0)->processBlock({})->sendBlock(0)->processBlockHooks(0)", td.blockChainLength)..., // half of the full chain + ), tracing.TracesToStrings()) } func TestCancellationQueryInProgress(t *testing.T) { @@ -857,7 +858,7 @@ type fakeResponseAssembler struct { missingBlock bool } -func (fra *fakeResponseAssembler) NewStream(p peer.ID, requestID graphsync.RequestID, subscriber notifications.Subscriber) responseassembler.ResponseStream { +func (fra *fakeResponseAssembler) NewStream(ctx context.Context, p peer.ID, requestID graphsync.RequestID, subscriber notifications.Subscriber) responseassembler.ResponseStream { fra.notifeePublisher.AddSubscriber(subscriber) return &fakeResponseStream{fra, requestID} } @@ -1007,6 +1008,10 @@ func (frb *fakeResponseBuilder) PauseRequest() { frb.fra.pauseRequest(frb.requestID) } +func (frb *fakeResponseBuilder) Context() context.Context { + return context.TODO() +} + type testData struct { ctx context.Context t *testing.T diff --git a/responsemanager/server.go b/responsemanager/server.go index 465c11e1..786216f1 100644 --- a/responsemanager/server.go +++ b/responsemanager/server.go @@ -213,7 +213,7 @@ func (rm *ResponseManager) processRequests(p peer.ID, requests []gsmsg.GraphSync }, state: graphsync.Queued, startTime: time.Now(), - responseStream: rm.responseAssembler.NewStream(key.p, key.requestID, sub), + responseStream: rm.responseAssembler.NewStream(ctx, key.p, key.requestID, sub), } // TODO: Use a better work estimation metric.