Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bulk, backupccl: introduce a Structured event Aggregator #84043

Merged
merged 1 commit into from
Aug 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,7 @@ ALL_TESTS = [
"//pkg/util/binfetcher:binfetcher_test",
"//pkg/util/bitarray:bitarray_test",
"//pkg/util/buildutil:buildutil_test",
"//pkg/util/bulk:bulk_test",
"//pkg/util/cache:cache_test",
"//pkg/util/caller:caller_test",
"//pkg/util/cgroups:cgroups_test",
Expand Down Expand Up @@ -1833,6 +1834,8 @@ GO_TARGETS = [
"//pkg/util/bufalloc:bufalloc",
"//pkg/util/buildutil:buildutil",
"//pkg/util/buildutil:buildutil_test",
"//pkg/util/bulk:bulk",
"//pkg/util/bulk:bulk_test",
"//pkg/util/cache:cache",
"//pkg/util/cache:cache_test",
"//pkg/util/caller:caller",
Expand Down Expand Up @@ -2812,6 +2815,7 @@ GET_X_DATA_TARGETS = [
"//pkg/util/bitarray:get_x_data",
"//pkg/util/bufalloc:get_x_data",
"//pkg/util/buildutil:get_x_data",
"//pkg/util/bulk:get_x_data",
"//pkg/util/cache:get_x_data",
"//pkg/util/caller:get_x_data",
"//pkg/util/cancelchecker:get_x_data",
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/backupccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ go_library(
"//pkg/storage",
"//pkg/util",
"//pkg/util/admission/admissionpb",
"//pkg/util/bulk",
"//pkg/util/contextutil",
"//pkg/util/ctxgroup",
"//pkg/util/hlc",
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/backup_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -1467,5 +1467,5 @@ func updateBackupDetails(
}

func init() {
sql.AddPlanHook("backup", backupPlanHook)
sql.AddPlanHook("backupccl.backupPlanHook", backupPlanHook)
}
4 changes: 2 additions & 2 deletions pkg/ccl/backupccl/backup_planning_tenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func retrieveSingleTenantMetadata(
ctx context.Context, ie *sql.InternalExecutor, txn *kv.Txn, tenantID roachpb.TenantID,
) (descpb.TenantInfoWithUsage, error) {
row, err := ie.QueryRow(
ctx, "backup-lookup-tenant", txn,
ctx, "backupccl.retrieveSingleTenantMetadata", txn,
tenantMetadataQuery+` WHERE id = $1`, tenantID.ToUint64(),
)
if err != nil {
Expand All @@ -99,7 +99,7 @@ func retrieveAllTenantsMetadata(
ctx context.Context, ie *sql.InternalExecutor, txn *kv.Txn,
) ([]descpb.TenantInfoWithUsage, error) {
rows, err := ie.QueryBuffered(
ctx, "backup-lookup-tenants", txn,
ctx, "backupccl.retrieveAllTenantsMetadata", txn,
// XXX Should we add a `WHERE active`? We require the tenant to be active
// when it is specified..
tenantMetadataQuery,
Expand Down
60 changes: 35 additions & 25 deletions pkg/ccl/backupccl/backup_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/rowexec"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
"github.com/cockroachdb/cockroach/pkg/util/bulk"
"github.com/cockroachdb/cockroach/pkg/util/contextutil"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand Down Expand Up @@ -106,6 +107,10 @@ type backupDataProcessor struct {

// BoundAccount that reserves the memory usage of the backup processor.
memAcc *mon.BoundAccount

// Aggregator that aggregates StructuredEvents emitted in the
// backupDataProcessors' trace recording.
agg *bulk.TracingAggregator
}

var (
Expand Down Expand Up @@ -153,14 +158,20 @@ func (bp *backupDataProcessor) Start(ctx context.Context) {
ctx = logtags.AddTag(ctx, "job", bp.spec.JobID)
ctx = bp.StartInternal(ctx, backupProcessorName)
ctx, cancel := context.WithCancel(ctx)

// Construct an Aggregator to aggregate and render AggregatorEvents emitted in
// bps' trace recording.
ctx, bp.agg = bulk.MakeTracingAggregatorWithSpan(ctx,
fmt.Sprintf("%s-aggregator", backupProcessorName), bp.EvalCtx.Tracer)

bp.cancelAndWaitForWorker = func() {
cancel()
for range bp.progCh {
}
}
log.Infof(ctx, "starting backup data")
if err := bp.flowCtx.Stopper().RunAsyncTaskEx(ctx, stop.TaskOpts{
TaskName: "backup-worker",
TaskName: "backupDataProcessor.runBackupProcessor",
SpanOpt: stop.ChildSpan,
}, func(ctx context.Context) {
bp.backupErr = runBackupProcessor(ctx, bp.flowCtx, &bp.spec, bp.progCh, bp.memAcc)
Expand Down Expand Up @@ -198,6 +209,7 @@ func (bp *backupDataProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.Producer

func (bp *backupDataProcessor) close() {
bp.cancelAndWaitForWorker()
bp.agg.Close()
if bp.InternalClose() {
bp.memAcc.Close(bp.Ctx)
}
Expand Down Expand Up @@ -387,26 +399,16 @@ func runBackupProcessor(
Source: roachpb.AdmissionHeader_FROM_SQL,
NoMemoryReservedAtSource: true,
}
log.Infof(ctx, "sending ExportRequest for span %s (attempt %d, priority %s)",
log.VEventf(ctx, 1, "sending ExportRequest for span %s (attempt %d, priority %s)",
span.span, span.attempts+1, header.UserPriority.String())
var rawResp roachpb.Response
var pErr *roachpb.Error
var reqSentTime time.Time
var respReceivedTime time.Time
requestSentAt := timeutil.Now()
exportRequestErr := contextutil.RunWithTimeout(ctx,
fmt.Sprintf("ExportRequest for span %s", span.span),
timeoutPerAttempt.Get(&clusterSettings.SV), func(ctx context.Context) error {
reqSentTime = timeutil.Now()
backupProcessorSpan.RecordStructured(&backuppb.BackupExportTraceRequestEvent{
Span: span.span.String(),
Attempt: int32(span.attempts + 1),
Priority: header.UserPriority.String(),
ReqSentTime: reqSentTime.String(),
})

rawResp, pErr = kv.SendWrappedWithAdmission(
ctx, flowCtx.Cfg.DB.NonTransactionalSender(), header, admissionHeader, req)
respReceivedTime = timeutil.Now()
if pErr != nil {
return pErr.GoError()
}
Expand All @@ -419,9 +421,7 @@ func runBackupProcessor(
todo <- span
// TODO(dt): send a progress update to update job progress to note
// the intents being hit.
backupProcessorSpan.RecordStructured(&backuppb.BackupExportTraceResponseEvent{
RetryableError: tracing.RedactAndTruncateError(intentErr),
})
log.VEventf(ctx, 1, "retrying ExportRequest for span %s; encountered WriteIntentError: %s", span.span, intentErr.Error())
continue
}
// TimeoutError improves the opaque `context deadline exceeded` error
Expand Down Expand Up @@ -480,19 +480,12 @@ func runBackupProcessor(
completedSpans = 1
}

duration := respReceivedTime.Sub(reqSentTime)
exportResponseTraceEvent := &backuppb.BackupExportTraceResponseEvent{
Duration: duration.String(),
FileSummaries: make([]roachpb.RowCount, 0),
}

if len(resp.Files) > 1 {
log.Warning(ctx, "unexpected multi-file response using header.TargetBytes = 1")
}

for i, file := range resp.Files {
entryCounts := countRows(file.Exported, spec.PKIDs)
exportResponseTraceEvent.FileSummaries = append(exportResponseTraceEvent.FileSummaries, entryCounts)

ret := exportedSpan{
// BackupManifest_File just happens to contain the exact fields
Expand Down Expand Up @@ -521,8 +514,9 @@ func runBackupProcessor(
return ctx.Err()
}
}
exportResponseTraceEvent.NumFiles = int32(len(resp.Files))
backupProcessorSpan.RecordStructured(exportResponseTraceEvent)

// Emit the stats for the processed ExportRequest.
recordExportStats(backupProcessorSpan, resp, timeutil.Since(requestSentAt))
adityamaru marked this conversation as resolved.
Show resolved Hide resolved

default:
// No work left to do, so we can exit. Note that another worker could
Expand Down Expand Up @@ -576,6 +570,22 @@ func runBackupProcessor(
return grp.Wait()
}

// recordExportStats emits a StructuredEvent containing the stats about the
// evaluated ExportRequest.
func recordExportStats(
sp *tracing.Span, resp *roachpb.ExportResponse, exportDuration time.Duration,
) {
if sp == nil {
return
}
exportStats := backuppb.ExportStats{Duration: exportDuration}
for _, f := range resp.Files {
exportStats.NumFiles++
exportStats.DataSize += int64(len(f.SST))
}
sp.RecordStructured(&exportStats)
}

func init() {
rowexec.NewBackupDataProcessor = newBackupDataProcessor
}
4 changes: 2 additions & 2 deletions pkg/ccl/backupccl/backup_processor_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func distBackupPlanSpecs(
startTime, endTime hlc.Timestamp,
) (map[base.SQLInstanceID]*execinfrapb.BackupDataSpec, error) {
var span *tracing.Span
ctx, span = tracing.ChildSpan(ctx, "backup-plan-specs")
ctx, span = tracing.ChildSpan(ctx, "backupccl.distBackupPlanSpecs")
_ = ctx // ctx is currently unused, but this new ctx should be used below in the future.
defer span.Finish()
user := execCtx.User()
Expand Down Expand Up @@ -158,7 +158,7 @@ func distBackup(
progCh chan *execinfrapb.RemoteProducerMetadata_BulkProcessorProgress,
backupSpecs map[base.SQLInstanceID]*execinfrapb.BackupDataSpec,
) error {
ctx, span := tracing.ChildSpan(ctx, "backup-distsql")
ctx, span := tracing.ChildSpan(ctx, "backupccl.distBackup")
defer span.Finish()
evalCtx := execCtx.ExtendedEvalContext()
var noTxn *kv.Txn
Expand Down
2 changes: 2 additions & 0 deletions pkg/ccl/backupccl/backuppb/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,11 @@ go_library(
"//pkg/sql/parser",
"//pkg/sql/protoreflect",
"//pkg/sql/sem/tree",
"//pkg/util/bulk",
"//pkg/util/uuid",
"@com_github_cockroachdb_errors//:errors",
"@com_github_gogo_protobuf//jsonpb",
"@io_opentelemetry_go_otel//attribute",
],
)

Expand Down
61 changes: 61 additions & 0 deletions pkg/ccl/backupccl/backuppb/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,18 @@ package backuppb

import (
"encoding/json"
"fmt"

"github.com/cockroachdb/cockroach/pkg/cloud"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/sql/protoreflect"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/util/bulk"
_ "github.com/cockroachdb/cockroach/pkg/util/uuid" // required for backup.proto
"github.com/cockroachdb/errors"
"github.com/gogo/protobuf/jsonpb"
"go.opentelemetry.io/otel/attribute"
)

// IsIncremental returns if the BackupManifest corresponds to an incremental
Expand Down Expand Up @@ -123,6 +126,64 @@ func (m ScheduledBackupExecutionArgs) MarshalJSONPB(marshaller *jsonpb.Marshaler
return json.Marshal(m)
}

var _ bulk.TracingAggregatorEvent = &ExportStats{}

const (
tagNumFiles = "num_files"
tagDataSize = "data_size"
tagThroughput = "throughput"
)

// Render implements the LazyTag interface.
func (e *ExportStats) Render() []attribute.KeyValue {
const mb = 1 << 20
tags := make([]attribute.KeyValue, 0)
if e.NumFiles > 0 {
tags = append(tags, attribute.KeyValue{
Key: tagNumFiles,
Value: attribute.Int64Value(e.NumFiles),
})
}
if e.DataSize > 0 {
dataSizeMB := float64(e.DataSize) / mb
tags = append(tags, attribute.KeyValue{
Key: tagDataSize,
Value: attribute.StringValue(fmt.Sprintf("%.2f MB", dataSizeMB)),
})

if e.Duration > 0 {
throughput := dataSizeMB / e.Duration.Seconds()
tags = append(tags, attribute.KeyValue{
Key: tagThroughput,
Value: attribute.StringValue(fmt.Sprintf("%.2f MB/s", throughput)),
})
}
}

return tags
}

// Identity implements the AggregatorEvent interface.
func (e *ExportStats) Identity() bulk.TracingAggregatorEvent {
return &ExportStats{}
}
stevendanna marked this conversation as resolved.
Show resolved Hide resolved

// Combine implements the AggregatorEvent interface.
func (e *ExportStats) Combine(other bulk.TracingAggregatorEvent) {
otherExportStats, ok := other.(*ExportStats)
if !ok {
panic(fmt.Sprintf("`other` is not of type ExportStats: %T", other))
}
e.NumFiles += otherExportStats.NumFiles
e.DataSize += otherExportStats.DataSize
e.Duration += otherExportStats.Duration
}

// Tag implements the AggregatorEvent interface.
func (e *ExportStats) Tag() string {
return "ExportStats"
}

func init() {
protoreflect.RegisterShorthands((*BackupManifest)(nil), "backup", "backup_manifest")
}
28 changes: 11 additions & 17 deletions pkg/ccl/backupccl/backuppb/backup.proto
Original file line number Diff line number Diff line change
Expand Up @@ -193,22 +193,16 @@ message BackupProgressTraceEvent {
util.hlc.Timestamp revision_start_time = 3 [(gogoproto.nullable) = false];
}

// BackupExportTraceRequestEvent is the trace event recorded when an
// ExportRequest has been sent.
message BackupExportTraceRequestEvent {
string span = 1;
int32 attempt = 2;
string priority = 3;
string req_sent_time = 4;
}

// BackupExportTraceResponseEvent is the trace event recorded when we receive a
// response from the ExportRequest.
message BackupExportTraceResponseEvent {
string duration = 1;
int32 num_files = 2;
repeated roachpb.RowCount file_summaries = 3 [(gogoproto.nullable) = false];
reserved 4 ;
string retryable_error = 5;
// ExportStats is a message containing information about each
// Export{Request,Response}.
message ExportStats {
// NumFiles is the number of SST files produced by the ExportRequest.
int64 num_files = 1;
// DataSize is the byte size of all the SST files produced by the
// ExportRequest.
int64 data_size = 2;
// Duration is the total time taken to send an ExportRequest, receive an
// ExportResponse and push the response on a channel.
int64 duration = 3 [(gogoproto.casttype) = "time.Duration"];
}

4 changes: 3 additions & 1 deletion pkg/jobs/adopt.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package jobs

import (
"context"
"fmt"
"strconv"
"sync"

Expand Down Expand Up @@ -389,7 +390,8 @@ func (r *Registry) runJob(
// TODO(ajwerner): Move this writing up the trace ID down into
// stepThroughStateMachine where we're already often (and soon with
// exponential backoff, always) updating the job in that call.
ctx, span := r.ac.Tracer.StartSpanCtx(ctx, typ.String(), spanOptions...)
ctx, span := r.ac.Tracer.StartSpanCtx(ctx,
fmt.Sprintf("%s-%d", typ.String(), job.ID()), spanOptions...)
span.SetTag("job-id", attribute.Int64Value(int64(job.ID())))
defer span.Finish()
if span.TraceID() != 0 {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/batcheval/cmd_export.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func evalExport(
h := cArgs.Header
reply := resp.(*roachpb.ExportResponse)

ctx, evalExportSpan := tracing.ChildSpan(ctx, fmt.Sprintf("Export [%s,%s)", args.Key, args.EndKey))
ctx, evalExportSpan := tracing.ChildSpan(ctx, "evalExport")
defer evalExportSpan.Finish()

var evalExportTrace types.StringValue
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -5726,7 +5726,7 @@ func MVCCExportToSST(
ctx context.Context, cs *cluster.Settings, reader Reader, opts MVCCExportOptions, dest io.Writer,
) (roachpb.BulkOpSummary, MVCCKey, error) {
var span *tracing.Span
ctx, span = tracing.ChildSpan(ctx, "MVCCExportToSST")
ctx, span = tracing.ChildSpan(ctx, "storage.MVCCExportToSST")
defer span.Finish()
sstWriter := MakeBackupSSTWriter(ctx, cs, dest)
defer sstWriter.Close()
Expand Down
Loading