From ae1804b872de36913f0b74a697f553d5cba81b31 Mon Sep 17 00:00:00 2001 From: Aditya Maru Date: Thu, 7 Jul 2022 17:54:41 -0400 Subject: [PATCH] bulk, backupccl: introduce a Structured event Aggregator This change introduces an Aggregator object that is capable of listening for Structured events emitted in a recording, aggregating them and rendering them as LazyTags. We also introduce an AggregatorEvent interface that can be implemented by a Structured event thereby making it eligible for aggregation in the Aggregator. The first user of the Aggregator will be every backup data processor that is setup on the nodes in the cluster during a backup. The Aggregator lives as long as the processor, and listens for Aggregator events emitted by any span that is a child of the processors' span. This includes both local children as well as remote children whose recordings have been imported into a local span. The Aggregator stores running aggregates of each AggregatorEvent it is notified about, bucketed by the events' tag name. This aggregate will be rendered on the tracing span as a LazyTag. This change teaches every ExportRequest to emit an AggregatorEvent. Going forward we expect many more operations in bulk jobs to define and emit such events providing visibility into otherwise opaque operations. We cleanup some of the StructuredEvents that were previously added but have not proved useful, and also change some of the tracing span operation names to be more intuitive. To view these aggregated events once can navigate to the `/tracez` endpoint of the debug console to take a snapshot and search for either `BACKUP` or the job ID to filter for tracing spans on that node. The span associated with the backup processor will be decorated with tags that correspond to the fields in the introduced `ExportStats` proto message. Note these stats are only aggregated on a per node basis. Fixes: #80388 Release note: None --- pkg/BUILD.bazel | 4 + pkg/ccl/backupccl/BUILD.bazel | 1 + pkg/ccl/backupccl/backup_planning.go | 2 +- pkg/ccl/backupccl/backup_planning_tenant.go | 4 +- pkg/ccl/backupccl/backup_processor.go | 60 ++++++----- .../backupccl/backup_processor_planning.go | 4 +- pkg/ccl/backupccl/backuppb/BUILD.bazel | 2 + pkg/ccl/backupccl/backuppb/backup.go | 61 +++++++++++ pkg/ccl/backupccl/backuppb/backup.proto | 28 ++--- pkg/jobs/adopt.go | 4 +- pkg/kv/kvserver/batcheval/cmd_export.go | 2 +- pkg/storage/mvcc.go | 2 +- pkg/util/bulk/BUILD.bazel | 27 +++++ pkg/util/bulk/tracing_aggregator.go | 101 ++++++++++++++++++ pkg/util/bulk/tracing_aggregator_test.go | 90 ++++++++++++++++ pkg/util/stop/stopper.go | 2 +- pkg/util/tracing/bench_test.go | 4 +- pkg/util/tracing/crdbspan.go | 10 +- pkg/util/tracing/span_options.go | 2 +- pkg/util/tracing/span_test.go | 6 +- 20 files changed, 358 insertions(+), 58 deletions(-) create mode 100644 pkg/util/bulk/BUILD.bazel create mode 100644 pkg/util/bulk/tracing_aggregator.go create mode 100644 pkg/util/bulk/tracing_aggregator_test.go diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index c8320fc23196..8b1bffe06e40 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -515,6 +515,7 @@ ALL_TESTS = [ "//pkg/util/binfetcher:binfetcher_test", "//pkg/util/bitarray:bitarray_test", "//pkg/util/buildutil:buildutil_test", + "//pkg/util/bulk:bulk_test", "//pkg/util/cache:cache_test", "//pkg/util/caller:caller_test", "//pkg/util/cgroups:cgroups_test", @@ -1833,6 +1834,8 @@ GO_TARGETS = [ "//pkg/util/bufalloc:bufalloc", "//pkg/util/buildutil:buildutil", "//pkg/util/buildutil:buildutil_test", + "//pkg/util/bulk:bulk", + "//pkg/util/bulk:bulk_test", "//pkg/util/cache:cache", "//pkg/util/cache:cache_test", "//pkg/util/caller:caller", @@ -2812,6 +2815,7 @@ GET_X_DATA_TARGETS = [ "//pkg/util/bitarray:get_x_data", "//pkg/util/bufalloc:get_x_data", "//pkg/util/buildutil:get_x_data", + "//pkg/util/bulk:get_x_data", "//pkg/util/cache:get_x_data", "//pkg/util/caller:get_x_data", "//pkg/util/cancelchecker:get_x_data", diff --git a/pkg/ccl/backupccl/BUILD.bazel b/pkg/ccl/backupccl/BUILD.bazel index 88b7fbbb6662..b28aade85583 100644 --- a/pkg/ccl/backupccl/BUILD.bazel +++ b/pkg/ccl/backupccl/BUILD.bazel @@ -116,6 +116,7 @@ go_library( "//pkg/storage", "//pkg/util", "//pkg/util/admission/admissionpb", + "//pkg/util/bulk", "//pkg/util/contextutil", "//pkg/util/ctxgroup", "//pkg/util/hlc", diff --git a/pkg/ccl/backupccl/backup_planning.go b/pkg/ccl/backupccl/backup_planning.go index f95a48a5d835..3a3ba8e72295 100644 --- a/pkg/ccl/backupccl/backup_planning.go +++ b/pkg/ccl/backupccl/backup_planning.go @@ -1467,5 +1467,5 @@ func updateBackupDetails( } func init() { - sql.AddPlanHook("backup", backupPlanHook) + sql.AddPlanHook("backupccl.backupPlanHook", backupPlanHook) } diff --git a/pkg/ccl/backupccl/backup_planning_tenant.go b/pkg/ccl/backupccl/backup_planning_tenant.go index 1d88c577d0bb..0c6258f9879d 100644 --- a/pkg/ccl/backupccl/backup_planning_tenant.go +++ b/pkg/ccl/backupccl/backup_planning_tenant.go @@ -79,7 +79,7 @@ func retrieveSingleTenantMetadata( ctx context.Context, ie *sql.InternalExecutor, txn *kv.Txn, tenantID roachpb.TenantID, ) (descpb.TenantInfoWithUsage, error) { row, err := ie.QueryRow( - ctx, "backup-lookup-tenant", txn, + ctx, "backupccl.retrieveSingleTenantMetadata", txn, tenantMetadataQuery+` WHERE id = $1`, tenantID.ToUint64(), ) if err != nil { @@ -99,7 +99,7 @@ func retrieveAllTenantsMetadata( ctx context.Context, ie *sql.InternalExecutor, txn *kv.Txn, ) ([]descpb.TenantInfoWithUsage, error) { rows, err := ie.QueryBuffered( - ctx, "backup-lookup-tenants", txn, + ctx, "backupccl.retrieveAllTenantsMetadata", txn, // XXX Should we add a `WHERE active`? We require the tenant to be active // when it is specified.. tenantMetadataQuery, diff --git a/pkg/ccl/backupccl/backup_processor.go b/pkg/ccl/backupccl/backup_processor.go index d926450a3a40..0f6b11c3b970 100644 --- a/pkg/ccl/backupccl/backup_processor.go +++ b/pkg/ccl/backupccl/backup_processor.go @@ -28,6 +28,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/rowexec" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" + "github.com/cockroachdb/cockroach/pkg/util/bulk" "github.com/cockroachdb/cockroach/pkg/util/contextutil" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -106,6 +107,10 @@ type backupDataProcessor struct { // BoundAccount that reserves the memory usage of the backup processor. memAcc *mon.BoundAccount + + // Aggregator that aggregates StructuredEvents emitted in the + // backupDataProcessors' trace recording. + agg *bulk.TracingAggregator } var ( @@ -153,6 +158,12 @@ func (bp *backupDataProcessor) Start(ctx context.Context) { ctx = logtags.AddTag(ctx, "job", bp.spec.JobID) ctx = bp.StartInternal(ctx, backupProcessorName) ctx, cancel := context.WithCancel(ctx) + + // Construct an Aggregator to aggregate and render AggregatorEvents emitted in + // bps' trace recording. + ctx, bp.agg = bulk.MakeTracingAggregatorWithSpan(ctx, + fmt.Sprintf("%s-aggregator", backupProcessorName), bp.EvalCtx.Tracer) + bp.cancelAndWaitForWorker = func() { cancel() for range bp.progCh { @@ -160,7 +171,7 @@ func (bp *backupDataProcessor) Start(ctx context.Context) { } log.Infof(ctx, "starting backup data") if err := bp.flowCtx.Stopper().RunAsyncTaskEx(ctx, stop.TaskOpts{ - TaskName: "backup-worker", + TaskName: "backupDataProcessor.runBackupProcessor", SpanOpt: stop.ChildSpan, }, func(ctx context.Context) { bp.backupErr = runBackupProcessor(ctx, bp.flowCtx, &bp.spec, bp.progCh, bp.memAcc) @@ -198,6 +209,7 @@ func (bp *backupDataProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.Producer func (bp *backupDataProcessor) close() { bp.cancelAndWaitForWorker() + bp.agg.Close() if bp.InternalClose() { bp.memAcc.Close(bp.Ctx) } @@ -387,26 +399,16 @@ func runBackupProcessor( Source: roachpb.AdmissionHeader_FROM_SQL, NoMemoryReservedAtSource: true, } - log.Infof(ctx, "sending ExportRequest for span %s (attempt %d, priority %s)", + log.VEventf(ctx, 1, "sending ExportRequest for span %s (attempt %d, priority %s)", span.span, span.attempts+1, header.UserPriority.String()) var rawResp roachpb.Response var pErr *roachpb.Error - var reqSentTime time.Time - var respReceivedTime time.Time + requestSentAt := timeutil.Now() exportRequestErr := contextutil.RunWithTimeout(ctx, fmt.Sprintf("ExportRequest for span %s", span.span), timeoutPerAttempt.Get(&clusterSettings.SV), func(ctx context.Context) error { - reqSentTime = timeutil.Now() - backupProcessorSpan.RecordStructured(&backuppb.BackupExportTraceRequestEvent{ - Span: span.span.String(), - Attempt: int32(span.attempts + 1), - Priority: header.UserPriority.String(), - ReqSentTime: reqSentTime.String(), - }) - rawResp, pErr = kv.SendWrappedWithAdmission( ctx, flowCtx.Cfg.DB.NonTransactionalSender(), header, admissionHeader, req) - respReceivedTime = timeutil.Now() if pErr != nil { return pErr.GoError() } @@ -419,9 +421,7 @@ func runBackupProcessor( todo <- span // TODO(dt): send a progress update to update job progress to note // the intents being hit. - backupProcessorSpan.RecordStructured(&backuppb.BackupExportTraceResponseEvent{ - RetryableError: tracing.RedactAndTruncateError(intentErr), - }) + log.VEventf(ctx, 1, "retrying ExportRequest for span %s; encountered WriteIntentError: %s", span.span, intentErr.Error()) continue } // TimeoutError improves the opaque `context deadline exceeded` error @@ -480,19 +480,12 @@ func runBackupProcessor( completedSpans = 1 } - duration := respReceivedTime.Sub(reqSentTime) - exportResponseTraceEvent := &backuppb.BackupExportTraceResponseEvent{ - Duration: duration.String(), - FileSummaries: make([]roachpb.RowCount, 0), - } - if len(resp.Files) > 1 { log.Warning(ctx, "unexpected multi-file response using header.TargetBytes = 1") } for i, file := range resp.Files { entryCounts := countRows(file.Exported, spec.PKIDs) - exportResponseTraceEvent.FileSummaries = append(exportResponseTraceEvent.FileSummaries, entryCounts) ret := exportedSpan{ // BackupManifest_File just happens to contain the exact fields @@ -521,8 +514,9 @@ func runBackupProcessor( return ctx.Err() } } - exportResponseTraceEvent.NumFiles = int32(len(resp.Files)) - backupProcessorSpan.RecordStructured(exportResponseTraceEvent) + + // Emit the stats for the processed ExportRequest. + recordExportStats(backupProcessorSpan, resp, timeutil.Since(requestSentAt)) default: // No work left to do, so we can exit. Note that another worker could @@ -576,6 +570,22 @@ func runBackupProcessor( return grp.Wait() } +// recordExportStats emits a StructuredEvent containing the stats about the +// evaluated ExportRequest. +func recordExportStats( + sp *tracing.Span, resp *roachpb.ExportResponse, exportDuration time.Duration, +) { + if sp == nil { + return + } + exportStats := backuppb.ExportStats{Duration: exportDuration} + for _, f := range resp.Files { + exportStats.NumFiles++ + exportStats.DataSize += int64(len(f.SST)) + } + sp.RecordStructured(&exportStats) +} + func init() { rowexec.NewBackupDataProcessor = newBackupDataProcessor } diff --git a/pkg/ccl/backupccl/backup_processor_planning.go b/pkg/ccl/backupccl/backup_processor_planning.go index eb518b2ef8d8..42d5bfd07059 100644 --- a/pkg/ccl/backupccl/backup_processor_planning.go +++ b/pkg/ccl/backupccl/backup_processor_planning.go @@ -45,7 +45,7 @@ func distBackupPlanSpecs( startTime, endTime hlc.Timestamp, ) (map[base.SQLInstanceID]*execinfrapb.BackupDataSpec, error) { var span *tracing.Span - ctx, span = tracing.ChildSpan(ctx, "backup-plan-specs") + ctx, span = tracing.ChildSpan(ctx, "backupccl.distBackupPlanSpecs") _ = ctx // ctx is currently unused, but this new ctx should be used below in the future. defer span.Finish() user := execCtx.User() @@ -158,7 +158,7 @@ func distBackup( progCh chan *execinfrapb.RemoteProducerMetadata_BulkProcessorProgress, backupSpecs map[base.SQLInstanceID]*execinfrapb.BackupDataSpec, ) error { - ctx, span := tracing.ChildSpan(ctx, "backup-distsql") + ctx, span := tracing.ChildSpan(ctx, "backupccl.distBackup") defer span.Finish() evalCtx := execCtx.ExtendedEvalContext() var noTxn *kv.Txn diff --git a/pkg/ccl/backupccl/backuppb/BUILD.bazel b/pkg/ccl/backupccl/backuppb/BUILD.bazel index b8643961005c..c9d57dabd390 100644 --- a/pkg/ccl/backupccl/backuppb/BUILD.bazel +++ b/pkg/ccl/backupccl/backuppb/BUILD.bazel @@ -48,9 +48,11 @@ go_library( "//pkg/sql/parser", "//pkg/sql/protoreflect", "//pkg/sql/sem/tree", + "//pkg/util/bulk", "//pkg/util/uuid", "@com_github_cockroachdb_errors//:errors", "@com_github_gogo_protobuf//jsonpb", + "@io_opentelemetry_go_otel//attribute", ], ) diff --git a/pkg/ccl/backupccl/backuppb/backup.go b/pkg/ccl/backupccl/backuppb/backup.go index 0448199ebe06..1fc62b148fdd 100644 --- a/pkg/ccl/backupccl/backuppb/backup.go +++ b/pkg/ccl/backupccl/backuppb/backup.go @@ -10,15 +10,18 @@ package backuppb import ( "encoding/json" + "fmt" "github.com/cockroachdb/cockroach/pkg/cloud" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/parser" "github.com/cockroachdb/cockroach/pkg/sql/protoreflect" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/util/bulk" _ "github.com/cockroachdb/cockroach/pkg/util/uuid" // required for backup.proto "github.com/cockroachdb/errors" "github.com/gogo/protobuf/jsonpb" + "go.opentelemetry.io/otel/attribute" ) // IsIncremental returns if the BackupManifest corresponds to an incremental @@ -123,6 +126,64 @@ func (m ScheduledBackupExecutionArgs) MarshalJSONPB(marshaller *jsonpb.Marshaler return json.Marshal(m) } +var _ bulk.TracingAggregatorEvent = &ExportStats{} + +const ( + tagNumFiles = "num_files" + tagDataSize = "data_size" + tagThroughput = "throughput" +) + +// Render implements the LazyTag interface. +func (e *ExportStats) Render() []attribute.KeyValue { + const mb = 1 << 20 + tags := make([]attribute.KeyValue, 0) + if e.NumFiles > 0 { + tags = append(tags, attribute.KeyValue{ + Key: tagNumFiles, + Value: attribute.Int64Value(e.NumFiles), + }) + } + if e.DataSize > 0 { + dataSizeMB := float64(e.DataSize) / mb + tags = append(tags, attribute.KeyValue{ + Key: tagDataSize, + Value: attribute.StringValue(fmt.Sprintf("%.2f MB", dataSizeMB)), + }) + + if e.Duration > 0 { + throughput := dataSizeMB / e.Duration.Seconds() + tags = append(tags, attribute.KeyValue{ + Key: tagThroughput, + Value: attribute.StringValue(fmt.Sprintf("%.2f MB/s", throughput)), + }) + } + } + + return tags +} + +// Identity implements the AggregatorEvent interface. +func (e *ExportStats) Identity() bulk.TracingAggregatorEvent { + return &ExportStats{} +} + +// Combine implements the AggregatorEvent interface. +func (e *ExportStats) Combine(other bulk.TracingAggregatorEvent) { + otherExportStats, ok := other.(*ExportStats) + if !ok { + panic(fmt.Sprintf("`other` is not of type ExportStats: %T", other)) + } + e.NumFiles += otherExportStats.NumFiles + e.DataSize += otherExportStats.DataSize + e.Duration += otherExportStats.Duration +} + +// Tag implements the AggregatorEvent interface. +func (e *ExportStats) Tag() string { + return "ExportStats" +} + func init() { protoreflect.RegisterShorthands((*BackupManifest)(nil), "backup", "backup_manifest") } diff --git a/pkg/ccl/backupccl/backuppb/backup.proto b/pkg/ccl/backupccl/backuppb/backup.proto index aa8dcc12e2ef..caa125169601 100644 --- a/pkg/ccl/backupccl/backuppb/backup.proto +++ b/pkg/ccl/backupccl/backuppb/backup.proto @@ -193,22 +193,16 @@ message BackupProgressTraceEvent { util.hlc.Timestamp revision_start_time = 3 [(gogoproto.nullable) = false]; } -// BackupExportTraceRequestEvent is the trace event recorded when an -// ExportRequest has been sent. -message BackupExportTraceRequestEvent { - string span = 1; - int32 attempt = 2; - string priority = 3; - string req_sent_time = 4; -} - -// BackupExportTraceResponseEvent is the trace event recorded when we receive a -// response from the ExportRequest. -message BackupExportTraceResponseEvent { - string duration = 1; - int32 num_files = 2; - repeated roachpb.RowCount file_summaries = 3 [(gogoproto.nullable) = false]; - reserved 4 ; - string retryable_error = 5; +// ExportStats is a message containing information about each +// Export{Request,Response}. +message ExportStats { + // NumFiles is the number of SST files produced by the ExportRequest. + int64 num_files = 1; + // DataSize is the byte size of all the SST files produced by the + // ExportRequest. + int64 data_size = 2; + // Duration is the total time taken to send an ExportRequest, receive an + // ExportResponse and push the response on a channel. + int64 duration = 3 [(gogoproto.casttype) = "time.Duration"]; } diff --git a/pkg/jobs/adopt.go b/pkg/jobs/adopt.go index 756a42b8399c..57057c9370f3 100644 --- a/pkg/jobs/adopt.go +++ b/pkg/jobs/adopt.go @@ -12,6 +12,7 @@ package jobs import ( "context" + "fmt" "strconv" "sync" @@ -389,7 +390,8 @@ func (r *Registry) runJob( // TODO(ajwerner): Move this writing up the trace ID down into // stepThroughStateMachine where we're already often (and soon with // exponential backoff, always) updating the job in that call. - ctx, span := r.ac.Tracer.StartSpanCtx(ctx, typ.String(), spanOptions...) + ctx, span := r.ac.Tracer.StartSpanCtx(ctx, + fmt.Sprintf("%s-%d", typ.String(), job.ID()), spanOptions...) span.SetTag("job-id", attribute.Int64Value(int64(job.ID()))) defer span.Finish() if span.TraceID() != 0 { diff --git a/pkg/kv/kvserver/batcheval/cmd_export.go b/pkg/kv/kvserver/batcheval/cmd_export.go index 7b1e399df078..bdc17bb136c4 100644 --- a/pkg/kv/kvserver/batcheval/cmd_export.go +++ b/pkg/kv/kvserver/batcheval/cmd_export.go @@ -102,7 +102,7 @@ func evalExport( h := cArgs.Header reply := resp.(*roachpb.ExportResponse) - ctx, evalExportSpan := tracing.ChildSpan(ctx, fmt.Sprintf("Export [%s,%s)", args.Key, args.EndKey)) + ctx, evalExportSpan := tracing.ChildSpan(ctx, "evalExport") defer evalExportSpan.Finish() var evalExportTrace types.StringValue diff --git a/pkg/storage/mvcc.go b/pkg/storage/mvcc.go index cf67cf9578f4..8237ea2ecc3f 100644 --- a/pkg/storage/mvcc.go +++ b/pkg/storage/mvcc.go @@ -5726,7 +5726,7 @@ func MVCCExportToSST( ctx context.Context, cs *cluster.Settings, reader Reader, opts MVCCExportOptions, dest io.Writer, ) (roachpb.BulkOpSummary, MVCCKey, error) { var span *tracing.Span - ctx, span = tracing.ChildSpan(ctx, "MVCCExportToSST") + ctx, span = tracing.ChildSpan(ctx, "storage.MVCCExportToSST") defer span.Finish() sstWriter := MakeBackupSSTWriter(ctx, cs, dest) defer sstWriter.Close() diff --git a/pkg/util/bulk/BUILD.bazel b/pkg/util/bulk/BUILD.bazel new file mode 100644 index 000000000000..7b528186d084 --- /dev/null +++ b/pkg/util/bulk/BUILD.bazel @@ -0,0 +1,27 @@ +load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data") +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "bulk", + srcs = ["tracing_aggregator.go"], + importpath = "github.com/cockroachdb/cockroach/pkg/util/bulk", + visibility = ["//visibility:public"], + deps = [ + "//pkg/util/syncutil", + "//pkg/util/tracing", + ], +) + +go_test( + name = "bulk_test", + srcs = ["tracing_aggregator_test.go"], + deps = [ + ":bulk", + "//pkg/ccl/backupccl/backuppb", + "//pkg/util/tracing", + "//pkg/util/tracing/tracingpb", + "@com_github_stretchr_testify//require", + ], +) + +get_x_data(name = "get_x_data") diff --git a/pkg/util/bulk/tracing_aggregator.go b/pkg/util/bulk/tracing_aggregator.go new file mode 100644 index 000000000000..3bb29c7af489 --- /dev/null +++ b/pkg/util/bulk/tracing_aggregator.go @@ -0,0 +1,101 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package bulk + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/cockroach/pkg/util/tracing" +) + +// TracingAggregatorEvent describes an event that can be aggregated and stored by the +// TracingAggregator. A TracingAggregatorEvent also implements the tracing.LazyTag interface +// to render its information on the associated tracing span. +type TracingAggregatorEvent interface { + tracing.LazyTag + + // Identity returns a TracingAggregatorEvent that when combined with another + // event returns the other TracingAggregatorEvent unchanged. + Identity() TracingAggregatorEvent + // Combine combines two TracingAggregatorEvents together. + Combine(other TracingAggregatorEvent) + // Tag returns a string used to identify the TracingAggregatorEvent. + Tag() string +} + +// An TracingAggregator can be used to aggregate and render AggregatorEvents that are +// emitted as part of its tracing spans' recording. +type TracingAggregator struct { + mu struct { + syncutil.Mutex + // aggregatedEvents is a mapping from the tag identifying the + // TracingAggregatorEvent to the running aggregate of the TracingAggregatorEvent. + aggregatedEvents map[string]TracingAggregatorEvent + // sp is the tracing span managed by the TracingAggregator. + sp *tracing.Span + // closed is set to true if the TracingAggregator has already been closed. + closed bool + } +} + +// Notify implements the tracing.EventListener interface. +func (b *TracingAggregator) Notify(event tracing.Structured) { + bulkEvent, ok := event.(TracingAggregatorEvent) + if !ok { + return + } + + b.mu.Lock() + defer b.mu.Unlock() + + // If this is the first AggregatorEvent with this tag, set it as a LazyTag on + // the associated tracing span. + eventTag := bulkEvent.Tag() + if _, ok := b.mu.aggregatedEvents[bulkEvent.Tag()]; !ok { + b.mu.aggregatedEvents[eventTag] = bulkEvent.Identity() + b.mu.sp.SetLazyTag(eventTag, b.mu.aggregatedEvents[eventTag]) + } + b.mu.aggregatedEvents[eventTag].Combine(bulkEvent) +} + +// Close is responsible for finishing the Aggregators' tracing span. +func (b *TracingAggregator) Close() { + b.mu.Lock() + defer b.mu.Unlock() + if !b.mu.closed { + b.mu.sp.Finish() + b.mu.closed = true + } +} + +var _ tracing.EventListener = &TracingAggregator{} + +// MakeTracingAggregatorWithSpan returns an instance of an TracingAggregator along with a +// newly created child context. The TracingAggregator is registered as a +// tracing.EventListener on the span associated with newly created context. +// +// The TracingAggregator instance is responsible for finishing the returned span, and +// so the user must call Close(). +func MakeTracingAggregatorWithSpan( + ctx context.Context, aggregatorName string, tracer *tracing.Tracer, +) (context.Context, *TracingAggregator) { + agg := &TracingAggregator{} + aggCtx, aggSpan := tracing.EnsureChildSpan(ctx, tracer, aggregatorName, + tracing.WithEventListeners(agg)) + + agg.mu.Lock() + defer agg.mu.Unlock() + agg.mu.aggregatedEvents = make(map[string]TracingAggregatorEvent) + agg.mu.sp = aggSpan + + return aggCtx, agg +} diff --git a/pkg/util/bulk/tracing_aggregator_test.go b/pkg/util/bulk/tracing_aggregator_test.go new file mode 100644 index 000000000000..f010a44deed6 --- /dev/null +++ b/pkg/util/bulk/tracing_aggregator_test.go @@ -0,0 +1,90 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package bulk_test + +import ( + "context" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backuppb" + "github.com/cockroachdb/cockroach/pkg/util/bulk" + "github.com/cockroachdb/cockroach/pkg/util/tracing" + "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb" + "github.com/stretchr/testify/require" +) + +func TestAggregator(t *testing.T) { + tr := tracing.NewTracer() + ctx := context.Background() + ctx, root := tr.StartSpanCtx(ctx, "root", tracing.WithRecording(tracingpb.RecordingVerbose)) + defer root.Finish() + + ctx, agg := bulk.MakeTracingAggregatorWithSpan(ctx, "mockAggregator", tr) + aggSp := tracing.SpanFromContext(ctx) + defer agg.Close() + + child := tr.StartSpan("child", tracing.WithParent(root), + tracing.WithEventListeners(agg)) + defer child.Finish() + child.RecordStructured(&backuppb.ExportStats{ + NumFiles: 10, + DataSize: 10, + Duration: time.Minute, + }) + + _, childChild := tracing.ChildSpan(ctx, "childChild") + defer childChild.Finish() + childChild.RecordStructured(&backuppb.ExportStats{ + NumFiles: 20, + DataSize: 20, + Duration: time.Minute, + }) + + remoteChild := tr.StartSpan("remoteChild", tracing.WithRemoteParentFromSpanMeta(childChild.Meta())) + remoteChild.RecordStructured(&backuppb.ExportStats{ + NumFiles: 30, + DataSize: 30, + Duration: time.Minute, + }) + + // We only expect to see the aggregated stats from the local children since we + // have not imported the remote children's Recording. + exportStatsTag, found := aggSp.GetLazyTag("ExportStats") + require.True(t, found) + var es *backuppb.ExportStats + var ok bool + if es, ok = exportStatsTag.(*backuppb.ExportStats); !ok { + t.Fatal("failed to cast LazyTag to expected type") + } + require.Equal(t, backuppb.ExportStats{ + NumFiles: 30, + DataSize: 30, + Duration: 2 * time.Minute, + }, *es) + + // Import the remote recording into its parent. + rec := remoteChild.FinishAndGetConfiguredRecording() + childChild.ImportRemoteRecording(rec) + + // Now, we expect the ExportStats from the remote child to show up in the + // aggregator. + exportStatsTag, found = aggSp.GetLazyTag("ExportStats") + require.True(t, found) + if es, ok = exportStatsTag.(*backuppb.ExportStats); !ok { + t.Fatal("failed to cast LazyTag to expected type") + } + require.Equal(t, backuppb.ExportStats{ + NumFiles: 60, + DataSize: 60, + Duration: 3 * time.Minute, + }, *es) +} diff --git a/pkg/util/stop/stopper.go b/pkg/util/stop/stopper.go index 0b34aa6133fc..f9dd295cb019 100644 --- a/pkg/util/stop/stopper.go +++ b/pkg/util/stop/stopper.go @@ -461,7 +461,7 @@ func (s *Stopper) RunAsyncTaskEx(ctx context.Context, opt TaskOpts, f func(conte // // Note that we have to create the child in this parent goroutine; we can't // defer the creation to the spawned async goroutine since the parent span - // might get Finish()ed by then. However, we'll update the child'd goroutine + // might get Finish()ed by then. However, we'll update the child's goroutine // ID. var sp *tracing.Span switch opt.SpanOpt { diff --git a/pkg/util/tracing/bench_test.go b/pkg/util/tracing/bench_test.go index 7a3db23ae3cf..c636abab8b01 100644 --- a/pkg/util/tracing/bench_test.go +++ b/pkg/util/tracing/bench_test.go @@ -32,7 +32,7 @@ func BenchmarkTracer_StartSpanCtx(b *testing.B) { staticLogTags := logtags.Buffer{} staticLogTags.Add("foo", "bar") - mockListener := []EventListener{&mockEventListener{}} + mockListener := &mockEventListener{} for _, tc := range []struct { name string @@ -120,7 +120,7 @@ func BenchmarkSpan_GetRecording(b *testing.B) { func BenchmarkRecordingWithStructuredEvent(b *testing.B) { skip.UnderDeadlock(b, "span reuse triggers false-positives in the deadlock detector") ev := &types.Int32Value{Value: 5} - mockListener := []EventListener{&mockEventListener{}} + mockListener := &mockEventListener{} for _, tc := range []struct { name string diff --git a/pkg/util/tracing/crdbspan.go b/pkg/util/tracing/crdbspan.go index cbc062f32478..50a9cbf8540c 100644 --- a/pkg/util/tracing/crdbspan.go +++ b/pkg/util/tracing/crdbspan.go @@ -17,6 +17,7 @@ import ( "sync/atomic" "time" + "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/ring" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" @@ -509,7 +510,11 @@ func (s *crdbSpan) recordFinishedChildren(childRecording tracingpb.Recording) { // children being added to s. for _, span := range childRecording { for _, record := range span.StructuredRecords { - s.notifyEventListeners(record.Payload) + var d types.DynamicAny + if err := types.UnmarshalAny(record.Payload, &d); err != nil { + continue + } + s.notifyEventListeners(d.Message.(protoutil.Message)) } } @@ -930,6 +935,9 @@ func (s *crdbSpan) getRecordingNoChildrenLocked( childKey := string(tag.Key) childValue := tag.Value.Emit() + if rs.Tags == nil { + rs.Tags = make(map[string]string) + } rs.Tags[childKey] = childValue tagGroup.Tags = append(tagGroup.Tags, diff --git a/pkg/util/tracing/span_options.go b/pkg/util/tracing/span_options.go index 260073f325fc..9b0a74f44561 100644 --- a/pkg/util/tracing/span_options.go +++ b/pkg/util/tracing/span_options.go @@ -474,6 +474,6 @@ func (ev eventListenersOption) apply(opts spanOptions) spanOptions { // // The caller should not mutate `eventListeners` after calling // WithEventListeners. -func WithEventListeners(eventListeners []EventListener) SpanOption { +func WithEventListeners(eventListeners ...EventListener) SpanOption { return (eventListenersOption)(eventListeners) } diff --git a/pkg/util/tracing/span_test.go b/pkg/util/tracing/span_test.go index e4740388484a..efbc7968a84f 100644 --- a/pkg/util/tracing/span_test.go +++ b/pkg/util/tracing/span_test.go @@ -959,7 +959,7 @@ func TestEventListener(t *testing.T) { tr := NewTracer() rootEventListener := &mockEventListener{} sp := tr.StartSpan("root", WithRecording(tracingpb.RecordingStructured), - WithEventListeners([]EventListener{rootEventListener})) + WithEventListeners(rootEventListener)) // Record a few Structured events. sp.RecordStructured(&types.Int32Value{Value: 4}) @@ -969,7 +969,7 @@ func TestEventListener(t *testing.T) { // Register another event listener on only the child span. childEventListener := &mockEventListener{} childSp := tr.StartSpan("child", WithParent(sp), - WithEventListeners([]EventListener{childEventListener})) + WithEventListeners(childEventListener)) childSp.RecordStructured(&types.Int32Value{Value: 6}) childSp.RecordStructured(&types.Int32Value{Value: 7}) @@ -1015,7 +1015,7 @@ func TestEventListenerNotifiedWithoutHoldingSpanMutex(t *testing.T) { tr := NewTracer() rootEventListener := &mockEventListener{} sp := tr.StartSpan("root", WithRecording(tracingpb.RecordingStructured), - WithEventListeners([]EventListener{rootEventListener})) + WithEventListeners(rootEventListener)) defer sp.Finish() // Set the EventListeners Notify() method to acquire the span's mutex.