Skip to content

Commit

Permalink
chore(requestmanager): rename processResponses internals for consiste…
Browse files Browse the repository at this point in the history
…ncy (#328)

* chore(requestmanager): rename processResponses internals for consistency

* style(fmt): run gofmt after github merge

Co-authored-by: Hannah Howard <[email protected]>
  • Loading branch information
rvagg and hannahhoward authored Jan 7, 2022
1 parent 170ca8d commit 4f4414d
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 40 deletions.
64 changes: 32 additions & 32 deletions impl/graphsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func TestRejectRequestsByDefault(t *testing.T) {
"request(0)->newRequest(0)",
"request(0)->executeTask(0)",
"request(0)->terminateRequest(0)",
"responseMessage(0)->loaderProcess(0)->cacheProcess(0)",
"processResponses(0)->loaderProcess(0)->cacheProcess(0)",
"response(0)->transaction(0)->execute(0)->buildMessage(0)",
"message(0)->sendMessage(0)",
"message(1)->sendMessage(0)",
Expand Down Expand Up @@ -258,8 +258,8 @@ func TestGraphsyncRoundTripRequestBudgetRequestor(t *testing.T) {
require.Contains(t, traceStrings, "request(0)->newRequest(0)")
require.Contains(t, traceStrings, "request(0)->executeTask(0)")
require.Contains(t, traceStrings, "request(0)->terminateRequest(0)")
require.Contains(t, traceStrings, "responseMessage(0)->loaderProcess(0)->cacheProcess(0)") // should have one of these per response
require.Contains(t, traceStrings, "request(0)->verifyBlock(0)") // should have one of these per block
require.Contains(t, traceStrings, "processResponses(0)->loaderProcess(0)->cacheProcess(0)") // should have one of these per response
require.Contains(t, traceStrings, "request(0)->verifyBlock(0)") // should have one of these per block

// has ErrBudgetExceeded exception recorded in the right place
tracing.SingleExceptionEvent(t, "request(0)->executeTask(0)", "ErrBudgetExceeded", "traversal budget exceeded", true)
Expand Down Expand Up @@ -308,8 +308,8 @@ func TestGraphsyncRoundTripRequestBudgetResponder(t *testing.T) {
require.Contains(t, traceStrings, "request(0)->newRequest(0)")
require.Contains(t, traceStrings, "request(0)->executeTask(0)")
require.Contains(t, traceStrings, "request(0)->terminateRequest(0)")
require.Contains(t, traceStrings, "responseMessage(0)->loaderProcess(0)->cacheProcess(0)") // should have one of these per response
require.Contains(t, traceStrings, "request(0)->verifyBlock(0)") // should have one of these per block
require.Contains(t, traceStrings, "processResponses(0)->loaderProcess(0)->cacheProcess(0)") // should have one of these per response
require.Contains(t, traceStrings, "request(0)->verifyBlock(0)") // should have one of these per block

// has ContextCancelError exception recorded in the right place
// the requester gets a cancel, the responder gets a ErrBudgetExceeded
Expand Down Expand Up @@ -391,8 +391,8 @@ func TestGraphsyncRoundTrip(t *testing.T) {
require.Contains(t, traceStrings, "request(0)->newRequest(0)")
require.Contains(t, traceStrings, "request(0)->executeTask(0)")
require.Contains(t, traceStrings, "request(0)->terminateRequest(0)")
require.Contains(t, traceStrings, "responseMessage(0)->loaderProcess(0)->cacheProcess(0)") // should have one of these per response
require.Contains(t, traceStrings, "request(0)->verifyBlock(0)") // should have one of these per block
require.Contains(t, traceStrings, "processResponses(0)->loaderProcess(0)->cacheProcess(0)") // should have one of these per response
require.Contains(t, traceStrings, "request(0)->verifyBlock(0)") // should have one of these per block

processUpdateSpan := tracing.FindSpanByTraceString("response(0)")
require.Equal(t, int64(0), testutil.AttributeValueInTraceSpan(t, *processUpdateSpan, "priority").AsInt64())
Expand Down Expand Up @@ -488,8 +488,8 @@ func TestGraphsyncRoundTripPartial(t *testing.T) {
require.Contains(t, traceStrings, "request(0)->newRequest(0)")
require.Contains(t, traceStrings, "request(0)->executeTask(0)")
require.Contains(t, traceStrings, "request(0)->terminateRequest(0)")
require.Contains(t, traceStrings, "responseMessage(0)->loaderProcess(0)->cacheProcess(0)") // should have one of these per response
require.Contains(t, traceStrings, "request(0)->verifyBlock(0)") // should have one of these per block
require.Contains(t, traceStrings, "processResponses(0)->loaderProcess(0)->cacheProcess(0)") // should have one of these per response
require.Contains(t, traceStrings, "request(0)->verifyBlock(0)") // should have one of these per block
}

func TestGraphsyncRoundTripIgnoreCids(t *testing.T) {
Expand Down Expand Up @@ -556,7 +556,7 @@ func TestGraphsyncRoundTripIgnoreCids(t *testing.T) {
"request(0)->executeTask(0)",
"request(0)->terminateRequest(0)",
},
responseMessageTraces(t, tracing, responseCount)...),
processResponsesTraces(t, tracing, responseCount)...),
testutil.RepeatTraceStrings("message({})->sendMessage(0)", responseCount+1)...),
testutil.RepeatTraceStrings("request(0)->verifyBlock({})", 50)...), // half of the full chain
testutil.RepeatTraceStrings("response(0)->executeTask(0)->processBlock({})->loadBlock(0)", blockChainLength)...),
Expand Down Expand Up @@ -630,7 +630,7 @@ func TestGraphsyncRoundTripIgnoreNBlocks(t *testing.T) {
"request(0)->executeTask(0)",
"request(0)->terminateRequest(0)",
},
responseMessageTraces(t, tracing, responseCount)...),
processResponsesTraces(t, tracing, responseCount)...),
testutil.RepeatTraceStrings("message({})->sendMessage(0)", responseCount+1)...),
testutil.RepeatTraceStrings("request(0)->verifyBlock({})", 50)...),
testutil.RepeatTraceStrings("response(0)->executeTask(0)->processBlock({})->loadBlock(0)", blockChainLength)...),
Expand Down Expand Up @@ -724,8 +724,8 @@ func TestPauseResume(t *testing.T) {
require.Contains(t, traceStrings, "request(0)->newRequest(0)")
require.Contains(t, traceStrings, "request(0)->executeTask(0)")
require.Contains(t, traceStrings, "request(0)->terminateRequest(0)")
require.Contains(t, traceStrings, "responseMessage(0)->loaderProcess(0)->cacheProcess(0)") // should have one of these per response
require.Contains(t, traceStrings, "request(0)->verifyBlock(0)") // should have one of these per block
require.Contains(t, traceStrings, "processResponses(0)->loaderProcess(0)->cacheProcess(0)") // should have one of these per response
require.Contains(t, traceStrings, "request(0)->verifyBlock(0)") // should have one of these per block

// pause recorded
tracing.SingleExceptionEvent(t, "response(0)->executeTask(0)", "github.com/ipfs/go-graphsync/responsemanager/hooks.ErrPaused", hooks.ErrPaused{}.Error(), false)
Expand Down Expand Up @@ -807,8 +807,8 @@ func TestPauseResumeRequest(t *testing.T) {
require.Contains(t, traceStrings, "request(0)->executeTask(0)")
require.Contains(t, traceStrings, "request(0)->executeTask(1)")
require.Contains(t, traceStrings, "request(0)->terminateRequest(0)")
require.Contains(t, traceStrings, "responseMessage(0)->loaderProcess(0)->cacheProcess(0)") // should have one of these per response
require.Contains(t, traceStrings, "request(0)->verifyBlock(0)") // should have one of these per block
require.Contains(t, traceStrings, "processResponses(0)->loaderProcess(0)->cacheProcess(0)") // should have one of these per response
require.Contains(t, traceStrings, "request(0)->verifyBlock(0)") // should have one of these per block

// has ErrPaused exception recorded in the right place
tracing.SingleExceptionEvent(t, "request(0)->executeTask(0)", "ErrPaused", hooks.ErrPaused{}.Error(), false)
Expand Down Expand Up @@ -888,7 +888,7 @@ func TestPauseResumeViaUpdate(t *testing.T) {
"request(0)->executeTask(0)",
"request(0)->terminateRequest(0)",
},
responseMessageTraces(t, tracing, responseCount)...),
processResponsesTraces(t, tracing, responseCount)...),
testutil.RepeatTraceStrings("message({})->sendMessage(0)", responseCount+2)...),
testutil.RepeatTraceStrings("request(0)->verifyBlock({})", blockChainLength)...),
testutil.RepeatTraceStrings("response(0)->executeTask(0)->processBlock({})->loadBlock(0)", 50)...),
Expand Down Expand Up @@ -981,7 +981,7 @@ func TestPauseResumeViaUpdateOnBlockHook(t *testing.T) {
"request(0)->executeTask(0)",
"request(0)->terminateRequest(0)",
},
responseMessageTraces(t, tracing, responseCount)...),
processResponsesTraces(t, tracing, responseCount)...),
testutil.RepeatTraceStrings("message({})->sendMessage(0)", responseCount+2)...),
testutil.RepeatTraceStrings("request(0)->verifyBlock({})", blockChainLength)...),
testutil.RepeatTraceStrings("response(0)->executeTask(0)->processBlock({})->loadBlock(0)", 50)...),
Expand Down Expand Up @@ -1082,8 +1082,8 @@ func TestNetworkDisconnect(t *testing.T) {
require.Contains(t, traceStrings, "request(0)->newRequest(0)")
require.Contains(t, traceStrings, "request(0)->executeTask(0)")
require.Contains(t, traceStrings, "request(0)->terminateRequest(0)")
require.Contains(t, traceStrings, "responseMessage(0)->loaderProcess(0)->cacheProcess(0)") // should have one of these per response
require.Contains(t, traceStrings, "request(0)->verifyBlock(0)") // should have one of these per block
require.Contains(t, traceStrings, "processResponses(0)->loaderProcess(0)->cacheProcess(0)") // should have one of these per response
require.Contains(t, traceStrings, "request(0)->verifyBlock(0)") // should have one of these per block

// has ContextCancelError exception recorded in the right place
tracing.SingleExceptionEvent(t, "request(0)->executeTask(0)", "ContextCancelError", ipldutil.ContextCancelError{}.Error(), false)
Expand Down Expand Up @@ -1223,8 +1223,8 @@ func TestGraphsyncRoundTripAlternatePersistenceAndNodes(t *testing.T) {
require.Contains(t, traceStrings, "request(1)->newRequest(0)")
require.Contains(t, traceStrings, "request(1)->executeTask(0)")
require.Contains(t, traceStrings, "request(1)->terminateRequest(0)")
require.Contains(t, traceStrings, "responseMessage(0)->loaderProcess(0)->cacheProcess(0)") // should have one of these per response
require.Contains(t, traceStrings, "request(1)->verifyBlock(0)") // should have one of these per block (TODO: why request(1) and not (0)?)
require.Contains(t, traceStrings, "processResponses(0)->loaderProcess(0)->cacheProcess(0)") // should have one of these per response
require.Contains(t, traceStrings, "request(1)->verifyBlock(0)") // should have one of these per block (TODO: why request(1) and not (0)?)

// TODO(rvagg): this is randomly either a SkipMe or a ipldutil.ContextCancelError; confirm this is sane
// tracing.SingleExceptionEvent(t, "request(0)->newRequest(0)","request(0)->executeTask(0)", "SkipMe", traversal.SkipMe{}.Error(), true)
Expand Down Expand Up @@ -1312,8 +1312,8 @@ func TestGraphsyncRoundTripMultipleAlternatePersistence(t *testing.T) {
require.Contains(t, traceStrings, "request(1)->newRequest(0)")
require.Contains(t, traceStrings, "request(1)->executeTask(0)")
require.Contains(t, traceStrings, "request(1)->terminateRequest(0)")
require.Contains(t, traceStrings, "responseMessage(0)->loaderProcess(0)->cacheProcess(0)") // should have one of these per response
require.Contains(t, traceStrings, "request(0)->verifyBlock(0)") // should have one of these per block
require.Contains(t, traceStrings, "processResponses(0)->loaderProcess(0)->cacheProcess(0)") // should have one of these per response
require.Contains(t, traceStrings, "request(0)->verifyBlock(0)") // should have one of these per block
}

// TestRoundTripLargeBlocksSlowNetwork test verifies graphsync continues to work
Expand Down Expand Up @@ -1365,7 +1365,7 @@ func TestRoundTripLargeBlocksSlowNetwork(t *testing.T) {
"request(0)->executeTask(0)",
"request(0)->terminateRequest(0)",
},
responseMessageTraces(t, tracing, responseCount)...),
processResponsesTraces(t, tracing, responseCount)...),
testutil.RepeatTraceStrings("message({})->sendMessage(0)", responseCount+1)...),
testutil.RepeatTraceStrings("request(0)->verifyBlock({})", blockChainLength)...),
testutil.RepeatTraceStrings("response(0)->executeTask(0)->processBlock({})->loadBlock(0)", blockChainLength)...),
Expand Down Expand Up @@ -1506,8 +1506,8 @@ func TestUnixFSFetch(t *testing.T) {
require.Contains(t, traceStrings, "request(0)->newRequest(0)")
require.Contains(t, traceStrings, "request(0)->executeTask(0)")
require.Contains(t, traceStrings, "request(0)->terminateRequest(0)")
require.Contains(t, traceStrings, "responseMessage(0)->loaderProcess(0)->cacheProcess(0)") // should have one of these per response
require.Contains(t, traceStrings, "request(0)->verifyBlock(0)") // should have one of these per block
require.Contains(t, traceStrings, "processResponses(0)->loaderProcess(0)->cacheProcess(0)") // should have one of these per response
require.Contains(t, traceStrings, "request(0)->verifyBlock(0)") // should have one of these per block
}

func TestGraphsyncBlockListeners(t *testing.T) {
Expand Down Expand Up @@ -1605,7 +1605,7 @@ func TestGraphsyncBlockListeners(t *testing.T) {
"request(0)->executeTask(0)",
"request(0)->terminateRequest(0)",
},
responseMessageTraces(t, tracing, responseCount)...),
processResponsesTraces(t, tracing, responseCount)...),
testutil.RepeatTraceStrings("message({})->sendMessage(0)", responseCount+1)...),
testutil.RepeatTraceStrings("request(0)->verifyBlock({})", blockChainLength)...),
testutil.RepeatTraceStrings("response(0)->executeTask(0)->processBlock({})->loadBlock(0)", blockChainLength)...),
Expand Down Expand Up @@ -1766,12 +1766,12 @@ func (r *receiver) Connected(p peer.ID) {
func (r *receiver) Disconnected(p peer.ID) {
}

func responseMessageTraces(t *testing.T, tracing *testutil.Collector, responseCount int) []string {
traces := testutil.RepeatTraceStrings("responseMessage({})->loaderProcess(0)->cacheProcess(0)", responseCount-1)
finalStub := tracing.FindSpanByTraceString(fmt.Sprintf("responseMessage(%d)->loaderProcess(0)", responseCount-1))
func processResponsesTraces(t *testing.T, tracing *testutil.Collector, responseCount int) []string {
traces := testutil.RepeatTraceStrings("processResponses({})->loaderProcess(0)->cacheProcess(0)", responseCount-1)
finalStub := tracing.FindSpanByTraceString(fmt.Sprintf("processResponses(%d)->loaderProcess(0)", responseCount-1))
require.NotNil(t, finalStub)
if len(testutil.AttributeValueInTraceSpan(t, *finalStub, "requestIDs").AsInt64Slice()) == 0 {
return append(traces, fmt.Sprintf("responseMessage(%d)->loaderProcess(0)", responseCount-1))
return append(traces, fmt.Sprintf("processResponses(%d)->loaderProcess(0)", responseCount-1))
}
return append(traces, fmt.Sprintf("responseMessage(%d)->loaderProcess(0)->cacheProcess(0)", responseCount-1))
return append(traces, fmt.Sprintf("processResponses(%d)->loaderProcess(0)->cacheProcess(0)", responseCount-1))
}
2 changes: 1 addition & 1 deletion requestmanager/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ func (rm *RequestManager) CancelRequest(ctx context.Context, requestID graphsync
// and updates the in progress requests based on those responses.
func (rm *RequestManager) ProcessResponses(p peer.ID, responses []gsmsg.GraphSyncResponse,
blks []blocks.Block) {
rm.send(&processResponseMessage{p, responses, blks}, nil)
rm.send(&processResponsesMessage{p, responses, blks}, nil)
}

// UnpauseRequest unpauses a request that was paused in a block hook based request ID
Expand Down
6 changes: 3 additions & 3 deletions requestmanager/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,14 @@ func (urm *unpauseRequestMessage) handle(rm *RequestManager) {
}
}

type processResponseMessage struct {
type processResponsesMessage struct {
p peer.ID
responses []gsmsg.GraphSyncResponse
blks []blocks.Block
}

func (prm *processResponseMessage) handle(rm *RequestManager) {
rm.processResponseMessage(prm.p, prm.responses, prm.blks)
func (prm *processResponsesMessage) handle(rm *RequestManager) {
rm.processResponses(prm.p, prm.responses, prm.blks)
}

type cancelRequestMessage struct {
Expand Down
8 changes: 4 additions & 4 deletions requestmanager/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,13 +262,13 @@ func (rm *RequestManager) cancelOnError(requestID graphsync.RequestID, ipr *inPr
}
}

func (rm *RequestManager) processResponseMessage(p peer.ID, responses []gsmsg.GraphSyncResponse, blks []blocks.Block) {
log.Debugf("beging rocessing message for peer %s", p)
func (rm *RequestManager) processResponses(p peer.ID, responses []gsmsg.GraphSyncResponse, blks []blocks.Block) {
log.Debugf("beginning processing responses for peer %s", p)
requestIds := make([]int, 0, len(responses))
for _, r := range responses {
requestIds = append(requestIds, int(r.RequestID()))
}
ctx, span := otel.Tracer("graphsync").Start(rm.ctx, "responseMessage", trace.WithAttributes(
ctx, span := otel.Tracer("graphsync").Start(rm.ctx, "processResponses", trace.WithAttributes(
attribute.String("peerID", p.Pretty()),
attribute.IntSlice("requestIDs", requestIds),
))
Expand All @@ -279,7 +279,7 @@ func (rm *RequestManager) processResponseMessage(p peer.ID, responses []gsmsg.Gr
responseMetadata := metadataForResponses(filteredResponses)
rm.asyncLoader.ProcessResponse(ctx, responseMetadata, blks)
rm.processTerminations(filteredResponses)
log.Debugf("end processing message for peer %s", p)
log.Debugf("end processing responses for peer %s", p)
}

func (rm *RequestManager) filterResponsesForPeer(responses []gsmsg.GraphSyncResponse, p peer.ID) []gsmsg.GraphSyncResponse {
Expand Down

0 comments on commit 4f4414d

Please sign in to comment.