Skip to content

Commit

Permalink
Response message tracing (#327)
Browse files Browse the repository at this point in the history
* feat(queryexecutor): trace per block spans

* feat(responseassembler): add timings in response assembler

* feat(messagequeue): add message tracing

* test(fix tests and cleanup):

N

* style(lint): fix staticcheck

* test(impl): fix flaky test

impossible to know if one or two messages will be sent

* feat(messagequeue): record error

* fix(graphsync): test cleanups and refactors

* fix(impl): fix data race
  • Loading branch information
hannahhoward authored Jan 7, 2022
1 parent 8e9f6cf commit 170ca8d
Show file tree
Hide file tree
Showing 14 changed files with 241 additions and 106 deletions.
168 changes: 105 additions & 63 deletions impl/graphsync_test.go

Large diffs are not rendered by default.

10 changes: 9 additions & 1 deletion messagequeue/builder.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package messagequeue

import (
"context"
"io"

"github.com/ipfs/go-graphsync"
Expand All @@ -11,6 +12,7 @@ import (
// Builder wraps a message builder with additional functions related to metadata
// and notifications in the message queue
type Builder struct {
ctx context.Context
*gsmsg.Builder
topic Topic
responseStreams map[graphsync.RequestID]io.Closer
Expand All @@ -19,8 +21,9 @@ type Builder struct {
}

// NewBuilder sets up a new builder for the given topic
func NewBuilder(topic Topic) *Builder {
func NewBuilder(ctx context.Context, topic Topic) *Builder {
return &Builder{
ctx: ctx,
Builder: gsmsg.NewBuilder(),
topic: topic,
responseStreams: make(map[graphsync.RequestID]io.Closer),
Expand All @@ -29,6 +32,10 @@ func NewBuilder(topic Topic) *Builder {
}
}

func (b *Builder) Context() context.Context {
return b.ctx
}

// SetResponseStream sets the given response stream to close should the message fail to send
func (b *Builder) SetResponseStream(requestID graphsync.RequestID, stream io.Closer) {
b.responseStreams[requestID] = stream
Expand Down Expand Up @@ -82,6 +89,7 @@ func (b *Builder) build(publisher notifications.Publisher) (gsmsg.GraphSyncMessa
BlockData: b.blockData,
ResponseCodes: message.ResponseCodes(),
},
ctx: b.ctx,
topic: b.topic,
msgSize: b.BlockSize(),
responseStreams: b.responseStreams,
Expand Down
25 changes: 23 additions & 2 deletions messagequeue/messagequeue.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ import (
"github.com/ipfs/go-graphsync"
logging "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p-core/peer"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"

gsmsg "github.com/ipfs/go-graphsync/message"
gsnet "github.com/ipfs/go-graphsync/network"
Expand Down Expand Up @@ -112,7 +116,10 @@ func (mq *MessageQueue) buildMessage(size uint64, buildMessageFn func(*Builder))
if shouldBeginNewResponse(mq.builders, size) {
topic := mq.nextBuilderTopic
mq.nextBuilderTopic++
mq.builders = append(mq.builders, NewBuilder(topic))
ctx, _ := otel.Tracer("graphsync").Start(mq.ctx, "message", trace.WithAttributes(
attribute.Int64("topic", int64(topic)),
))
mq.builders = append(mq.builders, NewBuilder(ctx, topic))
}
builder := mq.builders[len(mq.builders)-1]
buildMessageFn(builder)
Expand Down Expand Up @@ -156,7 +163,12 @@ func (mq *MessageQueue) runQueue() {
for {
_, metadata, err := mq.extractOutgoingMessage()
if err == nil {
mq.publishError(metadata, fmt.Errorf("message queue shutdown"))
span := trace.SpanFromContext(metadata.ctx)
err := fmt.Errorf("message queue shutdown")
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
span.End()
mq.publishError(metadata, err)
mq.eventPublisher.Close(metadata.topic)
} else {
break
Expand Down Expand Up @@ -211,12 +223,20 @@ func (mq *MessageQueue) extractOutgoingMessage() (gsmsg.GraphSyncMessage, intern

func (mq *MessageQueue) sendMessage() {
message, metadata, err := mq.extractOutgoingMessage()

if err != nil {
if err != errEmptyMessage {
log.Errorf("Unable to assemble GraphSync message: %s", err.Error())
}
return
}
span := trace.SpanFromContext(metadata.ctx)
defer span.End()
_, sendSpan := otel.Tracer("graphsync").Start(metadata.ctx, "sendMessage", trace.WithAttributes(
attribute.Int64("topic", int64(metadata.topic)),
attribute.Int64("size", int64(metadata.msgSize)),
))
defer sendSpan.End()
mq.publishQueued(metadata)
defer mq.eventPublisher.Close(metadata.topic)

Expand Down Expand Up @@ -337,6 +357,7 @@ func openSender(ctx context.Context, network MessageNetwork, p peer.ID, sendTime
}

type internalMetadata struct {
ctx context.Context
public Metadata
topic Topic
msgSize uint64
Expand Down
7 changes: 7 additions & 0 deletions messagequeue/messagequeue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ func TestProcessingNotification(t *testing.T) {

func TestDedupingMessages(t *testing.T) {
ctx := context.Background()
ctx, collectTracing := testutil.SetupTracing(ctx)
ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()

Expand Down Expand Up @@ -251,6 +252,12 @@ func TestDedupingMessages(t *testing.T) {
t.Fatal("incorrect request added to message")
}
}

tracing := collectTracing(t)
require.ElementsMatch(t, []string{
"message(0)->sendMessage(0)",
"message(1)->sendMessage(0)",
}, tracing.TracesToStrings())
}

func TestSendsVeryLargeBlocksResponses(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion peermanager/peermessagemanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type fakePeer struct {
}

func (fp *fakePeer) AllocateAndBuildMessage(blkSize uint64, buildMessage func(b *messagequeue.Builder)) {
builder := messagequeue.NewBuilder(messagequeue.Topic(0))
builder := messagequeue.NewBuilder(context.TODO(), messagequeue.Topic(0))
buildMessage(builder)
message, err := builder.Build()
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion requestmanager/requestmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1015,7 +1015,7 @@ type fakePeerHandler struct {

func (fph *fakePeerHandler) AllocateAndBuildMessage(p peer.ID, blkSize uint64,
requestBuilder func(b *messagequeue.Builder)) {
builder := messagequeue.NewBuilder(messagequeue.Topic(0))
builder := messagequeue.NewBuilder(context.TODO(), messagequeue.Topic(0))
requestBuilder(builder)
message, err := builder.Build()
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion responsemanager/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ type NetworkErrorListeners interface {

// ResponseAssembler is an interface that returns sender interfaces for peer responses.
type ResponseAssembler interface {
NewStream(p peer.ID, requestID graphsync.RequestID, subscriber notifications.Subscriber) responseassembler.ResponseStream
NewStream(ctx context.Context, p peer.ID, requestID graphsync.RequestID, subscriber notifications.Subscriber) responseassembler.ResponseStream
}

type responseManagerMessage interface {
Expand Down
44 changes: 29 additions & 15 deletions responsemanager/queryexecutor/queryexecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/ipld/go-ipld-prime/traversal"
"github.com/libp2p/go-libp2p-core/peer"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"

Expand Down Expand Up @@ -82,7 +83,7 @@ func New(ctx context.Context,
// and uses the ResponseAssembler to build and send a response, while also triggering any of
// the QueryExecutor's BlockHooks. Traversal continues until complete, or a signal or hook
// suggests we should stop or pause.
func (qe *QueryExecutor) ExecuteTask(ctx context.Context, pid peer.ID, task *peertask.Task) bool {
func (qe *QueryExecutor) ExecuteTask(_ context.Context, pid peer.ID, task *peertask.Task) bool {
// StartTask lets us block until this task is at the top of the execution stack
responseTaskChan := make(chan ResponseTask)
var rt ResponseTask
Expand All @@ -97,11 +98,11 @@ func (qe *QueryExecutor) ExecuteTask(ctx context.Context, pid peer.ID, task *pee
return false
}

_, span := otel.Tracer("graphsync").Start(trace.ContextWithSpan(qe.ctx, rt.Span), "executeTask")
ctx, span := otel.Tracer("graphsync").Start(trace.ContextWithSpan(qe.ctx, rt.Span), "executeTask")
defer span.End()

log.Debugw("beginning response execution", "id", rt.Request.ID(), "peer", pid.String(), "root_cid", rt.Request.Root().String())
err := qe.executeQuery(pid, rt)
err := qe.executeQuery(ctx, pid, rt)
if err != nil {
span.RecordError(err)
if _, isPaused := err.(hooks.ErrPaused); !isPaused {
Expand All @@ -114,10 +115,10 @@ func (qe *QueryExecutor) ExecuteTask(ctx context.Context, pid peer.ID, task *pee
}

func (qe *QueryExecutor) executeQuery(
p peer.ID, rt ResponseTask) error {
ctx context.Context, p peer.ID, rt ResponseTask) error {

// Execute the traversal operation, continue until we have reason to stop (error, pause, complete)
err := qe.runTraversal(p, rt)
err := qe.runTraversal(ctx, p, rt)

_, isPaused := err.(hooks.ErrPaused)
if isPaused {
Expand Down Expand Up @@ -180,7 +181,7 @@ func (qe *QueryExecutor) checkForUpdates(
}
}

func (qe *QueryExecutor) runTraversal(p peer.ID, taskData ResponseTask) error {
func (qe *QueryExecutor) runTraversal(ctx context.Context, p peer.ID, taskData ResponseTask) error {
for {
traverser := taskData.Traverser
isComplete, err := traverser.IsComplete()
Expand All @@ -195,26 +196,35 @@ func (qe *QueryExecutor) runTraversal(p peer.ID, taskData ResponseTask) error {
}
return err
}
lnk, data, err := qe.nextBlock(taskData)
lnk, lnkCtx := taskData.Traverser.CurrentRequest()
ctx, span := otel.Tracer("graphsync").Start(ctx, "processBlock", trace.WithAttributes(
attribute.String("cid", lnk.String()),
))
data, err := qe.loadBlock(ctx, taskData, lnk, lnkCtx)
if err != nil {
span.End()
return err
}
err = qe.sendResponse(p, taskData, lnk, data)
err = qe.sendResponse(ctx, p, taskData, lnk, data)
if err != nil {
span.End()
return err
}
span.End()
}
}

func (qe *QueryExecutor) nextBlock(taskData ResponseTask) (ipld.Link, []byte, error) {
lnk, lnkCtx := taskData.Traverser.CurrentRequest()
func (qe *QueryExecutor) loadBlock(ctx context.Context, taskData ResponseTask, lnk ipld.Link, lnkCtx ipld.LinkContext) ([]byte, error) {
_, span := otel.Tracer("graphsync").Start(ctx, "loadBlock")
defer span.End()

log.Debugf("will load link=%s", lnk)
result, err := taskData.Loader(lnkCtx, lnk)

if err != nil {
log.Errorf("failed to load link=%s, nBlocksRead=%d, err=%s", lnk, taskData.Traverser.NBlocksTraversed(), err)
taskData.Traverser.Error(traversal.SkipMe{})
return lnk, nil, nil
return nil, nil
}

blockBuffer, ok := result.(*bytes.Buffer)
Expand All @@ -224,22 +234,24 @@ func (qe *QueryExecutor) nextBlock(taskData ResponseTask) (ipld.Link, []byte, er
if err != nil {
log.Errorf("failed to write to buffer, link=%s, nBlocksRead=%d, err=%s", lnk, taskData.Traverser.NBlocksTraversed(), err)
taskData.Traverser.Error(err)
return lnk, nil, err
return nil, err
}
}
data := blockBuffer.Bytes()
err = taskData.Traverser.Advance(blockBuffer)
if err != nil {
log.Errorf("failed to advance traversal, link=%s, nBlocksRead=%d, err=%s", lnk, taskData.Traverser.NBlocksTraversed(), err)
return lnk, data, err
return data, err
}
log.Debugf("successfully loaded link=%s, nBlocksRead=%d", lnk, taskData.Traverser.NBlocksTraversed())
return lnk, data, nil
return data, nil
}

func (qe *QueryExecutor) sendResponse(p peer.ID, taskData ResponseTask, link ipld.Link, data []byte) error {
func (qe *QueryExecutor) sendResponse(ctx context.Context, p peer.ID, taskData ResponseTask, link ipld.Link, data []byte) error {
// Execute a transaction for this block, including any other queued operations
return taskData.ResponseStream.Transaction(func(rb responseassembler.ResponseBuilder) error {
ctx, span := otel.Tracer("graphsync").Start(ctx, "sendBlock", trace.WithLinks(trace.LinkFromContext(rb.Context())))
defer span.End()
// Ensure that any updates that have occurred till now are integrated into the response
err := qe.checkForUpdates(p, taskData, rb)
// On any error other than a pause, we bail, if it's a pause then we continue processing _this_ block
Expand All @@ -248,7 +260,9 @@ func (qe *QueryExecutor) sendResponse(p peer.ID, taskData ResponseTask, link ipl
}
blockData := rb.SendResponse(link, data)
if blockData.BlockSize() > 0 {
_, span := otel.Tracer("graphsync").Start(ctx, "processBlockHooks")
result := qe.blockHooks.ProcessBlockHooks(p, taskData.Request, blockData)
span.End()
for _, extension := range result.Extensions {
rb.SendExtensionData(extension)
}
Expand Down
4 changes: 4 additions & 0 deletions responsemanager/queryexecutor/queryexecutor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,10 @@ func (rb fauxResponseBuilder) PauseRequest() {
}
}

func (rb fauxResponseBuilder) Context() context.Context {
return context.TODO()
}

var _ responseassembler.ResponseBuilder = &fauxResponseBuilder{}

type blockData struct {
Expand Down
7 changes: 7 additions & 0 deletions responsemanager/responseassembler/responseBuilder.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package responseassembler

import (
"context"

blocks "github.com/ipfs/go-block-format"
logging "github.com/ipfs/go-log/v2"
"github.com/ipld/go-ipld-prime"
Expand All @@ -18,6 +20,7 @@ type responseOperation interface {
}

type responseBuilder struct {
ctx context.Context
requestID graphsync.RequestID
operations []responseOperation
linkTracker *peerLinkTracker
Expand Down Expand Up @@ -47,6 +50,10 @@ func (rb *responseBuilder) PauseRequest() {
rb.operations = append(rb.operations, statusOperation{rb.requestID, graphsync.RequestPaused})
}

func (rb *responseBuilder) Context() context.Context {
return rb.ctx
}

func (rb *responseBuilder) setupBlockOperation(
link ipld.Link, data []byte) blockOperation {
hasBlock := data != nil
Expand Down
Loading

0 comments on commit 170ca8d

Please sign in to comment.