diff --git a/CHANGELOG.md b/CHANGELOG.md index 3fa2088e4a2..0a143c64f7e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,9 @@ * [FEATURE] Introduce list_blocks_concurrency on GCS and S3 backends to control backend load and performance. [#2652](https://github.com/grafana/tempo/pull/2652) (@zalegrala) * [FEATURE] Add per-tenant compaction window [#3129](https://github.com/grafana/tempo/pull/3129) (@zalegrala) * [ENHANCEMENT] Make the trace ID label name configurable for remote written exemplars [#3074](https://github.com/grafana/tempo/pull/3074) +* [CHANGE] **Breaking Change** Fix issue where tempo drops the entire batch if one trace triggers an error [#2571](https://github.com/grafana/tempo/pull/2571) (@ie-pham) + Distributor now returns 200 for any batch containing only trace_too_large and max_live_traces errors + The number of discarded spans are still reflected in the tempo_discarded_spans_total metrics * [ENHANCEMENT] Update poller to make use of previous results and reduce backend load. [#2652](https://github.com/grafana/tempo/pull/2652) (@zalegrala) * [ENHANCEMENT] Improve TraceQL regex performance in certain queries. [#3139](https://github.com/grafana/tempo/pull/3139) (@joe-elliott) * [ENHANCEMENT] Improve TraceQL performance in complex queries. [#3113](https://github.com/grafana/tempo/pull/3113) (@joe-elliott) @@ -153,6 +156,7 @@ defaults: ## v2.2.0 / 2023-07-31 + * [CHANGE] Make vParquet2 the default block format [#2526](https://github.com/grafana/tempo/pull/2526) (@stoewer) * [CHANGE] Disable tempo-query by default in Jsonnet libs. [#2462](https://github.com/grafana/tempo/pull/2462) (@electron0zero) * [CHANGE] Integrate `gofumpt` into CI for formatting requirements [2584](https://github.com/grafana/tempo/pull/2584) (@zalegrala) diff --git a/integration/e2e/config-limits-partial-success.yaml b/integration/e2e/config-limits-partial-success.yaml new file mode 100644 index 00000000000..2bab7f28aa7 --- /dev/null +++ b/integration/e2e/config-limits-partial-success.yaml @@ -0,0 +1,35 @@ +target: all + +server: + http_listen_port: 3200 + +distributor: + receivers: + otlp: + protocols: + grpc: + +overrides: + max_bytes_per_trace: 600 + max_traces_per_user: 6 + ingestion_rate_limit_bytes: 6000 + ingestion_burst_size_bytes: 6000 + +ingester: + lifecycler: + address: 127.0.0.1 + ring: + kvstore: + store: inmemory + replication_factor: 1 + final_sleep: 0s + trace_idle_period: 3600s + +storage: + trace: + backend: local + local: + path: /var/tempo + pool: + max_workers: 10 + queue_depth: 100 diff --git a/integration/e2e/limits_test.go b/integration/e2e/limits_test.go index fc7844ec367..5fb893fd998 100644 --- a/integration/e2e/limits_test.go +++ b/integration/e2e/limits_test.go @@ -2,11 +2,14 @@ package e2e import ( "context" + crand "crypto/rand" "encoding/binary" "testing" "time" + "github.com/grafana/dskit/user" "github.com/prometheus/prometheus/model/labels" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "google.golang.org/genproto/googleapis/rpc/errdetails" "google.golang.org/grpc/status" @@ -15,11 +18,16 @@ import ( util "github.com/grafana/tempo/integration" "github.com/grafana/tempo/pkg/httpclient" tempoUtil "github.com/grafana/tempo/pkg/util" + "github.com/grafana/tempo/pkg/util/test" + + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/pdata/ptrace" ) const ( - configLimits = "config-limits.yaml" - configLimitsQuery = "config-limits-query.yaml" + configLimits = "config-limits.yaml" + configLimitsQuery = "config-limits-query.yaml" + configLimitsPartialError = "config-limits-partial-success.yaml" ) func TestLimits(t *testing.T) { @@ -38,14 +46,14 @@ func TestLimits(t *testing.T) { // should fail b/c the trace is too large. each batch should be ~70 bytes batch := makeThriftBatchWithSpanCount(2) - require.Error(t, c.EmitBatch(context.Background(), batch), "max trace size") + require.NoError(t, c.EmitBatch(context.Background(), batch), "max trace size") // push a trace require.NoError(t, c.EmitBatch(context.Background(), makeThriftBatchWithSpanCount(1))) // should fail b/c this will be too many traces batch = makeThriftBatch() - require.Error(t, c.EmitBatch(context.Background(), batch), "too many traces") + require.NoError(t, c.EmitBatch(context.Background(), batch), "too many traces") // should fail b/c due to ingestion rate limit batch = makeThriftBatchWithSpanCount(10) @@ -133,3 +141,77 @@ func TestQueryLimits(t *testing.T) { require.ErrorContains(t, err, "trace exceeds max size") require.ErrorContains(t, err, "failed with response: 400") // confirm querier returns 400 } + +func TestLimitsPartialSuccess(t *testing.T) { + s, err := e2e.NewScenario("tempo_e2e") + require.NoError(t, err) + defer s.Close() + require.NoError(t, util.CopyFileToSharedDir(s, configLimitsPartialError, "config.yaml")) + tempo := util.NewTempoAllInOne() + require.NoError(t, s.StartAndWaitReady(tempo)) + + // otel grpc exporter + exporter, err := util.NewOtelGRPCExporter(tempo.Endpoint(4317)) + require.NoError(t, err) + + err = exporter.Start(context.Background(), componenttest.NewNopHost()) + require.NoError(t, err) + + // make request + traceIDs := make([][]byte, 6) + for index := range traceIDs { + traceID := make([]byte, 16) + _, err = crand.Read(traceID) + require.NoError(t, err) + traceIDs[index] = traceID + } + + // 3 traces with trace_too_large and 3 with no error + spanCountsByTrace := []int{1, 4, 1, 5, 6, 1} + req := test.MakeReqWithMultipleTraceWithSpanCount(spanCountsByTrace, traceIDs) + + b, err := req.Marshal() + require.NoError(t, err) + + // unmarshal into otlp proto + traces, err := (&ptrace.ProtoUnmarshaler{}).UnmarshalTraces(b) + require.NoError(t, err) + require.NotNil(t, traces) + + ctx := user.InjectOrgID(context.Background(), tempoUtil.FakeTenantID) + ctx, err = user.InjectIntoGRPCRequest(ctx) + require.NoError(t, err) + + // send traces to tempo + // partial success = no error + err = exporter.ConsumeTraces(ctx, traces) + require.NoError(t, err) + + // shutdown to ensure traces are flushed + require.NoError(t, exporter.Shutdown(context.Background())) + + // query for the one trace that didn't trigger an error + client := httpclient.New("http://"+tempo.Endpoint(3200), tempoUtil.FakeTenantID) + for i, count := range spanCountsByTrace { + if count == 1 { + result, err := client.QueryTrace(tempoUtil.TraceIDToHexString(traceIDs[i])) + require.NoError(t, err) + assert.Equal(t, 1, len(result.Batches)) + } + } + + // test metrics + // 3 traces with trace_too_large each with 4+5+6 spans + err = tempo.WaitSumMetricsWithOptions(e2e.Equals(15), + []string{"tempo_discarded_spans_total"}, + e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "reason", "trace_too_large")), + ) + require.NoError(t, err) + + // this metric should never exist + err = tempo.WaitSumMetricsWithOptions(e2e.Equals(0), + []string{"tempo_discarded_spans_total"}, + e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "reason", "unknown_error")), + ) + require.NoError(t, err) +} diff --git a/integration/util.go b/integration/util.go index 3f3b7ea9ade..18cf3e92367 100644 --- a/integration/util.go +++ b/integration/util.go @@ -20,6 +20,13 @@ import ( "github.com/grafana/e2e" jaeger_grpc "github.com/jaegertracing/jaeger/cmd/agent/app/reporter/grpc" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/configgrpc" + "go.opentelemetry.io/collector/config/configtls" + "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/exporter/otlpexporter" + mnoop "go.opentelemetry.io/otel/metric/noop" + tnoop "go.opentelemetry.io/otel/trace/noop" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" @@ -267,6 +274,31 @@ func TempoBackoff() backoff.Config { } } +func NewOtelGRPCExporter(endpoint string) (exporter.Traces, error) { + factory := otlpexporter.NewFactory() + exporterCfg := factory.CreateDefaultConfig() + otlpCfg := exporterCfg.(*otlpexporter.Config) + otlpCfg.GRPCClientSettings = configgrpc.GRPCClientSettings{ + Endpoint: endpoint, + TLSSetting: configtls.TLSClientSetting{ + Insecure: true, + }, + } + logger, _ := zap.NewDevelopment() + return factory.CreateTracesExporter( + context.Background(), + exporter.CreateSettings{ + TelemetrySettings: component.TelemetrySettings{ + Logger: logger, + TracerProvider: tnoop.NewTracerProvider(), + MeterProvider: mnoop.NewMeterProvider(), + }, + BuildInfo: component.NewDefaultBuildInfo(), + }, + otlpCfg, + ) +} + func NewJaegerGRPCClient(endpoint string) (*jaeger_grpc.Reporter, error) { // new jaeger grpc exporter conn, err := grpc.Dial(endpoint, grpc.WithTransportCredentials(insecure.NewCredentials())) diff --git a/modules/distributor/distributor.go b/modules/distributor/distributor.go index 7df59f27aa4..f758cbb24c8 100644 --- a/modules/distributor/distributor.go +++ b/modules/distributor/distributor.go @@ -5,7 +5,7 @@ import ( "encoding/hex" "fmt" "math" - "strings" + "sync" "time" "github.com/go-kit/log" @@ -35,6 +35,7 @@ import ( "github.com/grafana/tempo/pkg/tempopb" v1 "github.com/grafana/tempo/pkg/tempopb/trace/v1" tempo_util "github.com/grafana/tempo/pkg/util" + "github.com/grafana/tempo/pkg/validation" ) @@ -47,6 +48,8 @@ const ( reasonLiveTracesExceeded = "live_traces_exceeded" // reasonInternalError indicates an unexpected error occurred processing these spans. analogous to a 500 reasonInternalError = "internal_error" + // reasonUnknown indicates a pushByte error at the ingester level not related to GRPC + reasonUnknown = "unknown_error" distributorRingKey = "distributor" ) @@ -107,10 +110,11 @@ var ( // rebatchedTrace is used to more cleanly pass the set of data type rebatchedTrace struct { - id []byte - trace *tempopb.Trace - start uint32 // unix epoch seconds - end uint32 // unix epoch seconds + id []byte + trace *tempopb.Trace + start uint32 // unix epoch seconds + end uint32 // unix epoch seconds + spanCount int } // Distributor coordinates replicates and distribution of log streams. @@ -277,7 +281,7 @@ func (d *Distributor) checkForRateLimits(tracesSize, spanCount int, userID strin if !d.ingestionRateLimiter.AllowN(now, userID, tracesSize) { overrides.RecordDiscardedSpans(spanCount, reasonRateLimited, userID) return status.Errorf(codes.ResourceExhausted, - "%s ingestion rate limit (%d bytes) exceeded while adding %d bytes for user %s", + "%s: ingestion rate limit (%d bytes) exceeded while adding %d bytes for user %s", overrides.ErrorPrefixRateLimited, int(d.ingestionRateLimiter.Limit(now, userID)), tracesSize, userID) @@ -347,9 +351,8 @@ func (d *Distributor) PushTraces(ctx context.Context, traces ptrace.Traces) (*te return nil, err } - err = d.sendToIngestersViaBytes(ctx, userID, rebatchedTraces, keys) + err = d.sendToIngestersViaBytes(ctx, userID, spanCount, rebatchedTraces, keys) if err != nil { - recordDiscaredSpans(err, userID, spanCount) return nil, err } @@ -364,8 +367,7 @@ func (d *Distributor) PushTraces(ctx context.Context, traces ptrace.Traces) (*te return nil, nil // PushRequest is ignored, so no reason to create one } -func (d *Distributor) sendToIngestersViaBytes(ctx context.Context, userID string, traces []*rebatchedTrace, keys []uint32) error { - // Marshal to bytes once +func (d *Distributor) sendToIngestersViaBytes(ctx context.Context, userID string, totalSpanCount int, traces []*rebatchedTrace, keys []uint32) error { marshalledTraces := make([][]byte, len(traces)) for i, t := range traces { b, err := d.traceEncoder.PrepareForWrite(t.trace, t.start, t.end) @@ -380,6 +382,12 @@ func (d *Distributor) sendToIngestersViaBytes(ctx context.Context, userID string op = ring.Write } + numOfTraces := len(keys) + numSuccessByTraceIndex := make([]int, numOfTraces) + lastErrorReasonByTraceIndex := make([]tempopb.PushErrorReason, numOfTraces) + + var mu sync.Mutex + err := ring.DoBatch(ctx, op, d.ingestersRing, keys, func(ingester ring.InstanceDesc, indexes []int) error { localCtx, cancel := context.WithTimeout(ctx, d.clientCfg.RemoteTimeout) defer cancel() @@ -401,15 +409,37 @@ func (d *Distributor) sendToIngestersViaBytes(ctx context.Context, userID string return err } - _, err = c.(tempopb.PusherClient).PushBytesV2(localCtx, &req) + pushResponse, err := c.(tempopb.PusherClient).PushBytesV2(localCtx, &req) metricIngesterAppends.WithLabelValues(ingester.Addr).Inc() - if err != nil { + + if err != nil { // internal error, drop entire batch metricIngesterAppendFailures.WithLabelValues(ingester.Addr).Inc() + return err } - return err + + mu.Lock() + defer mu.Unlock() + + d.processPushResponse(pushResponse, numSuccessByTraceIndex, lastErrorReasonByTraceIndex, numOfTraces, indexes) + + return nil }, func() {}) + // if err != nil, we discarded everything because of an internal error + if err != nil { + overrides.RecordDiscardedSpans(totalSpanCount, reasonInternalError, userID) + return err + } - return err + // count discarded span count + mu.Lock() + defer mu.Unlock() + + maxLiveDiscardedCount, traceTooLargeDiscardedCount, unknownErrorCount := countDiscaredSpans(numSuccessByTraceIndex, lastErrorReasonByTraceIndex, traces, d.ingestersRing.ReplicationFactor()) + overrides.RecordDiscardedSpans(maxLiveDiscardedCount, reasonLiveTracesExceeded, userID) + overrides.RecordDiscardedSpans(traceTooLargeDiscardedCount, reasonTraceTooLarge, userID) + overrides.RecordDiscardedSpans(unknownErrorCount, reasonUnknown, userID) + + return nil } func (d *Distributor) sendToGenerators(ctx context.Context, userID string, keys []uint32, traces []*rebatchedTrace) error { @@ -493,8 +523,9 @@ func requestsByTraceID(batches []*v1.ResourceSpans, userID string, spanCount int trace: &tempopb.Trace{ Batches: make([]*v1.ResourceSpans, 0, spanCount/tracesPerBatch), }, - start: math.MaxUint32, - end: 0, + start: math.MaxUint32, + end: 0, + spanCount: 0, } tracesByID[traceKey] = existingTrace @@ -513,6 +544,9 @@ func requestsByTraceID(batches []*v1.ResourceSpans, userID string, spanCount int ScopeSpans: []*v1.ScopeSpans{existingILS}, }) } + + // increase span count for trace + existingTrace.spanCount = existingTrace.spanCount + 1 } } } @@ -530,19 +564,58 @@ func requestsByTraceID(batches []*v1.ResourceSpans, userID string, spanCount int return keys, traces, nil } -func recordDiscaredSpans(err error, userID string, spanCount int) { - s := status.Convert(err) - if s == nil { +func countDiscaredSpans(numSuccessByTraceIndex []int, lastErrorReasonByTraceIndex []tempopb.PushErrorReason, traces []*rebatchedTrace, repFactor int) (maxLiveDiscardedCount, traceTooLargeDiscardedCount, unknownErrorCount int) { + quorum := int(math.Floor(float64(repFactor)/2)) + 1 // min success required + + for traceIndex, numSuccess := range numSuccessByTraceIndex { + // we will count anything that did not receive min success as discarded + if numSuccess >= quorum { + continue + } + spanCount := traces[traceIndex].spanCount + switch lastErrorReasonByTraceIndex[traceIndex] { + case tempopb.PushErrorReason_MAX_LIVE_TRACES: + maxLiveDiscardedCount += spanCount + case tempopb.PushErrorReason_TRACE_TOO_LARGE: + traceTooLargeDiscardedCount += spanCount + case tempopb.PushErrorReason_UNKNOWN_ERROR: + unknownErrorCount += spanCount + } + } + + return maxLiveDiscardedCount, traceTooLargeDiscardedCount, unknownErrorCount +} + +func (d *Distributor) processPushResponse(pushResponse *tempopb.PushResponse, numSuccessByTraceIndex []int, lastErrorReasonByTraceIndex []tempopb.PushErrorReason, numOfTraces int, indexes []int) { + // no errors + if len(pushResponse.ErrorsByTrace) == 0 { + for _, reqBatchIndex := range indexes { + if reqBatchIndex > numOfTraces { + level.Warn(d.logger).Log("msg", fmt.Sprintf("batch index %d out of bound for length %d", reqBatchIndex, numOfTraces)) + continue + } + numSuccessByTraceIndex[reqBatchIndex]++ + } return } - desc := s.Message() - if strings.HasPrefix(desc, overrides.ErrorPrefixLiveTracesExceeded) { - overrides.RecordDiscardedSpans(spanCount, reasonLiveTracesExceeded, userID) - } else if strings.HasPrefix(desc, overrides.ErrorPrefixTraceTooLarge) { - overrides.RecordDiscardedSpans(spanCount, reasonTraceTooLarge, userID) - } else { - overrides.RecordDiscardedSpans(spanCount, reasonInternalError, userID) + for ringIndex, pushError := range pushResponse.ErrorsByTrace { + // translate index of ring batch and req batch + // since the request batch gets split up into smaller batches based on the indexes + // like [0,1] [1] [2] [0,2] + reqBatchIndex := indexes[ringIndex] + if reqBatchIndex > numOfTraces { + level.Warn(d.logger).Log("msg", fmt.Sprintf("batch index %d out of bound for length %d", reqBatchIndex, numOfTraces)) + continue + } + + // if no error, record number of success + if pushError == tempopb.PushErrorReason_NO_ERROR { + numSuccessByTraceIndex[reqBatchIndex]++ + continue + } + // else record last error + lastErrorReasonByTraceIndex[reqBatchIndex] = pushError } } diff --git a/modules/distributor/distributor_test.go b/modules/distributor/distributor_test.go index e4a04f15df7..f7d7470cd38 100644 --- a/modules/distributor/distributor_test.go +++ b/modules/distributor/distributor_test.go @@ -13,12 +13,10 @@ import ( "testing" "time" - "github.com/go-kit/log" kitlog "github.com/go-kit/log" "github.com/gogo/status" "github.com/golang/protobuf/proto" // nolint: all //ProtoReflect "github.com/grafana/dskit/flagext" - "github.com/grafana/dskit/kv" dslog "github.com/grafana/dskit/log" "github.com/grafana/dskit/ring" ring_client "github.com/grafana/dskit/ring/client" @@ -44,7 +42,10 @@ import ( ) const ( - numIngesters = 5 + numIngesters = 5 + noError = tempopb.PushErrorReason_NO_ERROR + maxLiveTraceError = tempopb.PushErrorReason_MAX_LIVE_TRACES + traceTooLargeError = tempopb.PushErrorReason_TRACE_TOO_LARGE ) var ctx = user.InjectOrgID(context.Background(), "test") @@ -770,7 +771,7 @@ func TestDistributor(t *testing.T) { limits.RegisterFlagsAndApplyDefaults(&flag.FlagSet{}) // todo: test limits - d := prepare(t, limits, nil, nil) + d := prepare(t, limits, nil) b := test.MakeBatch(tc.lines, []byte{}) traces := batchesToTraces(t, []*v1.ResourceSpans{b}) @@ -953,7 +954,7 @@ func TestLogSpans(t *testing.T) { buf := &bytes.Buffer{} logger := kitlog.NewJSONLogger(kitlog.NewSyncWriter(buf)) - d := prepare(t, limits, nil, logger) + d := prepare(t, limits, logger) d.cfg.LogReceivedSpans = LogReceivedSpansConfig{ Enabled: tc.LogReceivedSpansEnabled, FilterByStatusError: tc.filterByStatusError, @@ -994,7 +995,7 @@ func TestRateLimitRespected(t *testing.T) { } buf := &bytes.Buffer{} logger := kitlog.NewJSONLogger(kitlog.NewSyncWriter(buf)) - d := prepare(t, overridesConfig, nil, logger) + d := prepare(t, overridesConfig, logger) batches := []*v1.ResourceSpans{ makeResourceSpans("test-service", []*v1.ScopeSpans{ makeScope( @@ -1025,6 +1026,284 @@ func TestRateLimitRespected(t *testing.T) { assert.True(t, status.Code() == codes.ResourceExhausted, "Wrong status code") } +func TestDiscardCountReplicationFactor(t *testing.T) { + tt := []struct { + name string + pushErrorByTrace [][]tempopb.PushErrorReason + replicationFactor int + expectedLiveTracesDiscardedCount int + expectedTraceTooLargeDiscardedCount int + }{ + // trace sizes + // trace[0] = 5 spans + // trace[1] = 10 spans + // trace[2] = 15 spans + { + name: "no errors, minimum responses", + pushErrorByTrace: [][]tempopb.PushErrorReason{{noError, noError, noError}, {noError, noError, noError}}, + replicationFactor: 3, + expectedLiveTracesDiscardedCount: 0, + expectedTraceTooLargeDiscardedCount: 0, + }, + { + name: "no error, max responses", + pushErrorByTrace: [][]tempopb.PushErrorReason{{noError, noError, noError}, {noError, noError, noError}, {noError, noError, noError}}, + replicationFactor: 3, + expectedLiveTracesDiscardedCount: 0, + expectedTraceTooLargeDiscardedCount: 0, + }, + { + name: "one mlt error, minimum responses", + pushErrorByTrace: [][]tempopb.PushErrorReason{{maxLiveTraceError, noError, noError}, {noError, noError, noError}}, + replicationFactor: 3, + expectedLiveTracesDiscardedCount: 5, + expectedTraceTooLargeDiscardedCount: 0, + }, + { + name: "one mlt error, max responses", + pushErrorByTrace: [][]tempopb.PushErrorReason{{maxLiveTraceError, noError, noError}, {noError, noError, noError}, {noError, noError, noError}}, + replicationFactor: 3, + expectedLiveTracesDiscardedCount: 0, + expectedTraceTooLargeDiscardedCount: 0, + }, + { + name: "one ttl error, minimum responses", + pushErrorByTrace: [][]tempopb.PushErrorReason{{noError, traceTooLargeError, noError}, {noError, noError, noError}}, + replicationFactor: 3, + expectedLiveTracesDiscardedCount: 0, + expectedTraceTooLargeDiscardedCount: 10, + }, + { + name: "one ttl error, max responses", + pushErrorByTrace: [][]tempopb.PushErrorReason{{noError, traceTooLargeError, noError}, {noError, noError, noError}, {noError, noError, noError}}, + replicationFactor: 3, + expectedLiveTracesDiscardedCount: 0, + expectedTraceTooLargeDiscardedCount: 0, + }, + { + name: "two mlt errors, minimum responses", + pushErrorByTrace: [][]tempopb.PushErrorReason{{maxLiveTraceError, noError, noError}, {maxLiveTraceError, noError, noError}}, + replicationFactor: 3, + expectedLiveTracesDiscardedCount: 5, + expectedTraceTooLargeDiscardedCount: 0, + }, + { + name: "two ttl errors, max responses", + pushErrorByTrace: [][]tempopb.PushErrorReason{{noError, traceTooLargeError, noError}, {noError, traceTooLargeError, noError}, {noError, noError, noError}}, + replicationFactor: 3, + expectedLiveTracesDiscardedCount: 0, + expectedTraceTooLargeDiscardedCount: 10, + }, + { + name: "three ttl errors, max responses", + pushErrorByTrace: [][]tempopb.PushErrorReason{{noError, traceTooLargeError, noError}, {noError, traceTooLargeError, noError}, {noError, traceTooLargeError, noError}}, + replicationFactor: 3, + expectedLiveTracesDiscardedCount: 0, + expectedTraceTooLargeDiscardedCount: 10, + }, + { + name: "three mix errors, max responses", + pushErrorByTrace: [][]tempopb.PushErrorReason{{noError, traceTooLargeError, noError}, {noError, maxLiveTraceError, noError}, {noError, traceTooLargeError, noError}}, + replicationFactor: 3, + expectedLiveTracesDiscardedCount: 0, + expectedTraceTooLargeDiscardedCount: 10, + }, + { + name: "three mix trace errors, max responses", + pushErrorByTrace: [][]tempopb.PushErrorReason{{noError, traceTooLargeError, noError}, {noError, noError, traceTooLargeError}, {noError, maxLiveTraceError, traceTooLargeError}}, + replicationFactor: 3, + expectedLiveTracesDiscardedCount: 10, + expectedTraceTooLargeDiscardedCount: 15, + }, + { + name: "one ttl error rep factor 5 min (3) response", + pushErrorByTrace: [][]tempopb.PushErrorReason{{noError, traceTooLargeError, noError}, {noError, noError, noError}, {noError, noError, noError}}, + replicationFactor: 5, + expectedLiveTracesDiscardedCount: 0, + expectedTraceTooLargeDiscardedCount: 10, + }, + { + name: "one error rep factor 5 with 4 responses", + pushErrorByTrace: [][]tempopb.PushErrorReason{{noError, traceTooLargeError, noError}, {noError, noError, noError}, {noError, noError, noError}, {noError, noError, noError}}, + replicationFactor: 5, + expectedLiveTracesDiscardedCount: 0, + expectedTraceTooLargeDiscardedCount: 0, + }, + { + name: "replication factor 1", + pushErrorByTrace: [][]tempopb.PushErrorReason{{noError, traceTooLargeError, noError}}, + replicationFactor: 1, + expectedLiveTracesDiscardedCount: 0, + expectedTraceTooLargeDiscardedCount: 10, + }, + } + + for _, tc := range tt { + t.Run(tc.name, func(t *testing.T) { + traceByID := make([]*rebatchedTrace, 3) + // batch with 3 traces + traceByID[0] = &rebatchedTrace{ + spanCount: 5, + } + + traceByID[1] = &rebatchedTrace{ + spanCount: 15, + } + + traceByID[2] = &rebatchedTrace{ + spanCount: 10, + } + + keys := []int{0, 2, 1} + + numSuccessByTraceIndex := make([]int, len(traceByID)) + lastErrorReasonByTraceIndex := make([]tempopb.PushErrorReason, len(traceByID)) + + for _, ErrorByTrace := range tc.pushErrorByTrace { + for ringIndex, err := range ErrorByTrace { + // translate + traceIndex := keys[ringIndex] + + currentNumSuccess := numSuccessByTraceIndex[traceIndex] + if err == tempopb.PushErrorReason_NO_ERROR { + numSuccessByTraceIndex[traceIndex] = currentNumSuccess + 1 + } else { + lastErrorReasonByTraceIndex[traceIndex] = err + } + } + } + + liveTraceDiscardedCount, traceTooLongDiscardedCount, _ := countDiscaredSpans(numSuccessByTraceIndex, lastErrorReasonByTraceIndex, traceByID, tc.replicationFactor) + + require.Equal(t, tc.expectedLiveTracesDiscardedCount, liveTraceDiscardedCount) + require.Equal(t, tc.expectedTraceTooLargeDiscardedCount, traceTooLongDiscardedCount) + }) + } +} + +func TestProcessIngesterPushByteResponse(t *testing.T) { + // batch has 5 traces [0, 1, 2, 3, 4, 5] + numOfTraces := 5 + tt := []struct { + name string + pushErrorByTrace []tempopb.PushErrorReason + indexes []int + expectedSuccessIndex []int + expectedLastErrorIndex []tempopb.PushErrorReason + }{ + { + name: "explicit no errors, first three traces", + pushErrorByTrace: []tempopb.PushErrorReason{noError, noError, noError}, + indexes: []int{0, 1, 2}, + expectedSuccessIndex: []int{1, 1, 1, 0, 0}, + expectedLastErrorIndex: make([]tempopb.PushErrorReason, numOfTraces), + }, + { + name: "no errors, no ErrorsByTrace value", + pushErrorByTrace: []tempopb.PushErrorReason{}, + indexes: []int{1, 2, 3}, + expectedSuccessIndex: []int{0, 1, 1, 1, 0}, + expectedLastErrorIndex: make([]tempopb.PushErrorReason, numOfTraces), + }, + { + name: "all errors, first three traces", + pushErrorByTrace: []tempopb.PushErrorReason{traceTooLargeError, traceTooLargeError, traceTooLargeError}, + indexes: []int{0, 1, 2}, + expectedSuccessIndex: []int{0, 0, 0, 0, 0}, + expectedLastErrorIndex: []tempopb.PushErrorReason{traceTooLargeError, traceTooLargeError, traceTooLargeError, noError, noError}, + }, + { + name: "random errors, random three traces", + pushErrorByTrace: []tempopb.PushErrorReason{traceTooLargeError, maxLiveTraceError, noError}, + indexes: []int{0, 2, 4}, + expectedSuccessIndex: []int{0, 0, 0, 0, 1}, + expectedLastErrorIndex: []tempopb.PushErrorReason{traceTooLargeError, noError, maxLiveTraceError, noError, noError}, + }, + } + + // prepare test data + overridesConfig := overrides.Config{} + buf := &bytes.Buffer{} + logger := kitlog.NewJSONLogger(kitlog.NewSyncWriter(buf)) + d := prepare(t, overridesConfig, logger) + + for _, tc := range tt { + t.Run(tc.name, func(t *testing.T) { + numSuccessByTraceIndex := make([]int, numOfTraces) + lastErrorReasonByTraceIndex := make([]tempopb.PushErrorReason, numOfTraces) + pushByteResponse := &tempopb.PushResponse{ + ErrorsByTrace: tc.pushErrorByTrace, + } + d.processPushResponse(pushByteResponse, numSuccessByTraceIndex, lastErrorReasonByTraceIndex, numOfTraces, tc.indexes) + assert.Equal(t, numSuccessByTraceIndex, tc.expectedSuccessIndex) + assert.Equal(t, lastErrorReasonByTraceIndex, tc.expectedLastErrorIndex) + }) + } +} + +func TestIngesterPushBytes(t *testing.T) { + // prepare test data + overridesConfig := overrides.Config{} + buf := &bytes.Buffer{} + logger := kitlog.NewJSONLogger(kitlog.NewSyncWriter(buf)) + d := prepare(t, overridesConfig, logger) + + traces := []*rebatchedTrace{ + { + spanCount: 1, + }, + { + spanCount: 5, + }, + { + spanCount: 10, + }, + { + spanCount: 15, + }, + { + spanCount: 20, + }, + } + numOfTraces := len(traces) + numSuccessByTraceIndex := make([]int, numOfTraces) + lastErrorReasonByTraceIndex := make([]tempopb.PushErrorReason, numOfTraces) + + // 0 = trace_too_large, trace_too_large || discard count: 1 + // 1 = no error, trace_too_large || discard count: 5 + // 2 = no error, no error || discard count: 0 + // 3 = max_live, max_live || discard count: 15 + // 4 = trace_too_large, max_live || discard count: 20 + // total ttl: 6, mlt: 35 + + batches := [][]int{ + {0, 1, 2}, + {1, 3}, + {0, 2}, + {3, 4}, + {4}, + } + + errorsByTraces := [][]tempopb.PushErrorReason{ + {traceTooLargeError, noError, noError}, + {traceTooLargeError, maxLiveTraceError}, + {traceTooLargeError, noError}, + {maxLiveTraceError, traceTooLargeError}, + {maxLiveTraceError}, + } + + for i, indexes := range batches { + pushResponse := &tempopb.PushResponse{ + ErrorsByTrace: errorsByTraces[i], + } + d.processPushResponse(pushResponse, numSuccessByTraceIndex, lastErrorReasonByTraceIndex, numOfTraces, indexes) + } + + maxLiveDiscardedCount, traceTooLargeDiscardedCount, _ := countDiscaredSpans(numSuccessByTraceIndex, lastErrorReasonByTraceIndex, traces, 3) + assert.Equal(t, traceTooLargeDiscardedCount, 6) + assert.Equal(t, maxLiveDiscardedCount, 35) +} + type testLogSpan struct { Msg string `json:"msg"` Level string `json:"level"` @@ -1103,9 +1382,9 @@ func makeResourceSpans(serviceName string, ils []*v1.ScopeSpans, attributes ...* return rs } -func prepare(t *testing.T, limits overrides.Config, kvStore kv.Client, logger log.Logger) *Distributor { +func prepare(t *testing.T, limits overrides.Config, logger kitlog.Logger) *Distributor { if logger == nil { - logger = log.NewNopLogger() + logger = kitlog.NewNopLogger() } var ( @@ -1134,7 +1413,7 @@ func prepare(t *testing.T, limits overrides.Config, kvStore kv.Client, logger lo distributorConfig.DistributorRing.HeartbeatPeriod = 100 * time.Millisecond distributorConfig.DistributorRing.InstanceID = strconv.Itoa(rand.Int()) - distributorConfig.DistributorRing.KVStore.Mock = kvStore + distributorConfig.DistributorRing.KVStore.Mock = nil distributorConfig.DistributorRing.InstanceInterfaceNames = []string{"eth0", "en0", "lo0"} distributorConfig.factory = func(addr string) (ring_client.PoolClient, error) { return ingesters[addr], nil @@ -1156,11 +1435,11 @@ type mockIngester struct { var _ tempopb.PusherClient = (*mockIngester)(nil) func (i *mockIngester) PushBytes(context.Context, *tempopb.PushBytesRequest, ...grpc.CallOption) (*tempopb.PushResponse, error) { - return nil, nil + return &tempopb.PushResponse{}, nil } func (i *mockIngester) PushBytesV2(context.Context, *tempopb.PushBytesRequest, ...grpc.CallOption) (*tempopb.PushResponse, error) { - return nil, nil + return &tempopb.PushResponse{}, nil } func (i *mockIngester) Close() error { diff --git a/modules/ingester/ingester.go b/modules/ingester/ingester.go index d4c9f8156f0..7725a575f99 100644 --- a/modules/ingester/ingester.go +++ b/modules/ingester/ingester.go @@ -241,15 +241,11 @@ func (i *Ingester) PushBytesV2(ctx context.Context, req *tempopb.PushBytesReques instance, err := i.getOrCreateInstance(instanceID) if err != nil { + level.Warn(log.Logger).Log(err.Error()) return nil, err } - err = instance.PushBytesRequest(ctx, req) - if err != nil { - return nil, err - } - - return &tempopb.PushResponse{}, nil + return instance.PushBytesRequest(ctx, req), nil } // FindTraceByID implements tempopb.Querier.f diff --git a/modules/ingester/ingester_test.go b/modules/ingester/ingester_test.go index 2e030e35dbd..37e1591783c 100644 --- a/modules/ingester/ingester_test.go +++ b/modules/ingester/ingester_test.go @@ -505,27 +505,8 @@ func defaultOverridesConfig() overrides.Config { func pushBatchV2(t testing.TB, i *Ingester, batch *v1.ResourceSpans, id []byte) { ctx := user.InjectOrgID(context.Background(), "test") - batchDecoder := model.MustNewSegmentDecoder(model_v2.Encoding) - pbTrace := &tempopb.Trace{ - Batches: []*v1.ResourceSpans{batch}, - } - - buffer, err := batchDecoder.PrepareForWrite(pbTrace, 0, 0) - require.NoError(t, err) - - _, err = i.PushBytesV2(ctx, &tempopb.PushBytesRequest{ - Traces: []tempopb.PreallocBytes{ - { - Slice: buffer, - }, - }, - Ids: []tempopb.PreallocBytes{ - { - Slice: id, - }, - }, - }) + _, err := i.PushBytesV2(ctx, makePushBytesRequest(id, batch)) require.NoError(t, err) } diff --git a/modules/ingester/instance.go b/modules/ingester/instance.go index dfddbb1fa0a..5db973aa11a 100644 --- a/modules/ingester/instance.go +++ b/modules/ingester/instance.go @@ -34,32 +34,22 @@ import ( "github.com/grafana/tempo/tempodb/encoding/common" ) -type traceTooLargeError struct { - traceID common.ID - instanceID string - maxBytes, reqSize int -} +var ( + errTraceTooLarge = errors.New(overrides.ErrorPrefixTraceTooLarge) + errMaxLiveTraces = errors.New(overrides.ErrorPrefixLiveTracesExceeded) +) -func newTraceTooLargeError(traceID common.ID, instanceID string, maxBytes, reqSize int) *traceTooLargeError { - return &traceTooLargeError{ - traceID: traceID, - instanceID: instanceID, - maxBytes: maxBytes, - reqSize: reqSize, - } +func newTraceTooLargeError(traceID common.ID, instanceID string, maxBytes, reqSize int) error { + level.Warn(log.Logger).Log("msg", fmt.Sprintf("%s: max size of trace (%d) exceeded while adding %d bytes to trace %s for tenant %s", + overrides.ErrorPrefixTraceTooLarge, maxBytes, reqSize, hex.EncodeToString(traceID), instanceID)) + return errTraceTooLarge } -func (e *traceTooLargeError) Error() string { - return fmt.Sprintf( - "%s max size of trace (%d) exceeded while adding %d bytes to trace %s for tenant %s", - overrides.ErrorPrefixTraceTooLarge, e.maxBytes, e.reqSize, hex.EncodeToString(e.traceID), e.instanceID) +func newMaxLiveTracesError(instanceID string, limit string) error { + level.Warn(log.Logger).Log("msg", fmt.Sprintf("%s: max live traces exceeded for tenant %s: %v", overrides.ErrorPrefixLiveTracesExceeded, instanceID, limit)) + return errMaxLiveTraces } -// Errors returned on Query. -var ( - ErrTraceMissing = errors.New("trace missing") -) - const ( traceDataType = "trace" ) @@ -154,14 +144,47 @@ func newInstance(instanceID string, limiter *Limiter, overrides ingesterOverride return i, nil } -func (i *instance) PushBytesRequest(ctx context.Context, req *tempopb.PushBytesRequest) error { +func (i *instance) PushBytesRequest(ctx context.Context, req *tempopb.PushBytesRequest) *tempopb.PushResponse { + pr := &tempopb.PushResponse{} + for j := range req.Traces { err := i.PushBytes(ctx, req.Ids[j].Slice, req.Traces[j].Slice) - if err != nil { - return err + pr.ErrorsByTrace = i.addTraceError(pr.ErrorsByTrace, err, len(req.Traces), j) + } + + return pr +} + +func (i *instance) addTraceError(errorsByTrace []tempopb.PushErrorReason, pushError error, numTraces int, traceIndex int) []tempopb.PushErrorReason { + if pushError != nil { + // only make list if there is at least one error + if len(errorsByTrace) == 0 { + errorsByTrace = make([]tempopb.PushErrorReason, 0, numTraces) + // because this is the first error, fill list with NO_ERROR for the traces before this one + for k := 0; k < traceIndex; k++ { + errorsByTrace = append(errorsByTrace, tempopb.PushErrorReason_NO_ERROR) + } + } + if errors.Is(pushError, errMaxLiveTraces) { + errorsByTrace = append(errorsByTrace, tempopb.PushErrorReason_MAX_LIVE_TRACES) + return errorsByTrace + } + + if errors.Is(pushError, errTraceTooLarge) { + errorsByTrace = append(errorsByTrace, tempopb.PushErrorReason_TRACE_TOO_LARGE) + return errorsByTrace } + + // error is not either MaxLiveTraces or TraceTooLarge + level.Error(log.Logger).Log("msg", "Unexpected error during PushBytes", "tenant", i.instanceID, "error", pushError) + errorsByTrace = append(errorsByTrace, tempopb.PushErrorReason_UNKNOWN_ERROR) + return errorsByTrace + + } else if pushError == nil && len(errorsByTrace) > 0 { + errorsByTrace = append(errorsByTrace, tempopb.PushErrorReason_NO_ERROR) } - return nil + + return errorsByTrace } // PushBytes is used to push an unmarshalled tempopb.Trace to the instance @@ -175,7 +198,7 @@ func (i *instance) PushBytes(ctx context.Context, id, traceBytes []byte) error { // check for max traces before grabbing the lock to better load shed err := i.limiter.AssertMaxTracesPerUser(i.instanceID, int(i.traceCount.Load())) if err != nil { - return status.Errorf(codes.FailedPrecondition, "%s max live traces exceeded for tenant %s: %v", overrides.ErrorPrefixLiveTracesExceeded, i.instanceID, err) + return newMaxLiveTracesError(i.instanceID, err.Error()) } return i.push(ctx, id, traceBytes) @@ -192,7 +215,7 @@ func (i *instance) push(ctx context.Context, id, traceBytes []byte) error { prevSize := int(i.traceSizes[tkn]) reqSize := len(traceBytes) if prevSize+reqSize > maxBytes { - return status.Errorf(codes.FailedPrecondition, newTraceTooLargeError(id, i.instanceID, maxBytes, reqSize).Error()) + return newTraceTooLargeError(id, i.instanceID, maxBytes, reqSize) } } @@ -200,10 +223,6 @@ func (i *instance) push(ctx context.Context, id, traceBytes []byte) error { err := trace.Push(ctx, i.instanceID, traceBytes) if err != nil { - var ttlErr *traceTooLargeError - if ok := errors.As(err, &ttlErr); ok { - return status.Errorf(codes.FailedPrecondition, err.Error()) - } return err } diff --git a/modules/ingester/instance_test.go b/modules/ingester/instance_test.go index 55902d48991..0f5b30de8dd 100644 --- a/modules/ingester/instance_test.go +++ b/modules/ingester/instance_test.go @@ -37,11 +37,11 @@ func TestInstance(t *testing.T) { i, ingester := defaultInstance(t) - err := i.PushBytesRequest(context.Background(), request) - require.NoError(t, err) + response := i.PushBytesRequest(context.Background(), request) + require.NotNil(t, response) require.Equal(t, int(i.traceCount.Load()), len(i.traces)) - err = i.CutCompleteTraces(0, true) + err := i.CutCompleteTraces(0, true) require.NoError(t, err) require.Equal(t, int(i.traceCount.Load()), len(i.traces)) @@ -171,8 +171,9 @@ func TestInstanceDoesNotRace(t *testing.T) { } go concurrent(func() { request := makeRequest([]byte{}) - err := i.PushBytesRequest(context.Background(), request) - require.NoError(t, err, "error pushing traces") + response := i.PushBytesRequest(context.Background(), request) + errored, _, _ := CheckPushBytesError(response) + require.False(t, errored) }) go concurrent(func() { @@ -229,8 +230,12 @@ func TestInstanceLimits(t *testing.T) { type push struct { req *tempopb.PushBytesRequest expectsError bool + errorReason string } + traceTooLarge := "trace_too_large" + maxLiveTraces := "max_live_traces" + tests := []struct { name string pushes []push @@ -258,6 +263,7 @@ func TestInstanceLimits(t *testing.T) { { req: makeRequestWithByteLimit(1500, []byte{}), expectsError: true, + errorReason: traceTooLarge, }, { req: makeRequestWithByteLimit(600, []byte{}), @@ -273,6 +279,7 @@ func TestInstanceLimits(t *testing.T) { { req: makeRequestWithByteLimit(700, []byte{0x01}), expectsError: true, + errorReason: traceTooLarge, }, }, }, @@ -294,6 +301,7 @@ func TestInstanceLimits(t *testing.T) { { req: makeRequestWithByteLimit(100, []byte{}), expectsError: true, + errorReason: maxLiveTraces, }, }, }, @@ -306,9 +314,22 @@ func TestInstanceLimits(t *testing.T) { require.NoError(t, err, "unexpected error creating new instance") for j, push := range tt.pushes { - err = i.PushBytesRequest(context.Background(), push.req) + response := i.PushBytesRequest(context.Background(), push.req) + if push.expectsError && push.errorReason == traceTooLarge { + errored, maxLiveCount, traceTooLargeCount := CheckPushBytesError(response) + require.True(t, errored) + require.Zero(t, maxLiveCount, "push %d failed: %w", j, err) + require.NotZero(t, traceTooLargeCount, "push %d failed: %w", j, err) + } else if push.expectsError && push.errorReason == maxLiveTraces { + errored, maxLiveCount, traceTooLargeCount := CheckPushBytesError(response) + require.True(t, errored) + require.NotZero(t, maxLiveCount, "push %d failed: %w", j, err) + require.Zero(t, traceTooLargeCount, "push %d failed: %w", j, err) + } else { + errored, _, _ := CheckPushBytesError(response) + require.False(t, errored, "push %d failed: %w", j, err) + } - require.Equalf(t, push.expectsError, err != nil, "push %d failed: %w", j, err) } }) } @@ -497,8 +518,9 @@ func TestInstanceMetrics(t *testing.T) { count := 100 for j := 0; j < count; j++ { request := makeRequest([]byte{}) - err := i.PushBytesRequest(context.Background(), request) - require.NoError(t, err) + response := i.PushBytesRequest(context.Background(), request) + errored, _, _ := CheckPushBytesError(response) + require.False(t, errored, "push %d failed: %w", j, response.ErrorsByTrace) } cutAndVerify(count) cutAndVerify(0) @@ -524,31 +546,105 @@ func TestInstanceFailsLargeTracesEvenAfterFlushing(t *testing.T) { i, err := ingester.getOrCreateInstance(testTenantID) require.NoError(t, err) - req := makeRequestWithByteLimit(maxTraceBytes-200, id) + req := makeRequestWithByteLimit(maxTraceBytes-300, id) reqSize := 0 for _, b := range req.Traces { reqSize += len(b.Slice) } // Fill up trace to max - err = i.PushBytesRequest(ctx, req) - require.NoError(t, err) + response := i.PushBytesRequest(ctx, req) + errored, _, _ := CheckPushBytesError(response) + require.False(t, errored, "push failed: %w", response.ErrorsByTrace) // Pushing again fails - err = i.PushBytesRequest(ctx, req) - require.Contains(t, err.Error(), (newTraceTooLargeError(id, i.instanceID, maxTraceBytes, reqSize)).Error()) + response = i.PushBytesRequest(ctx, req) + _, _, traceTooLargeCount := CheckPushBytesError(response) + assert.Equal(t, true, traceTooLargeCount > 0) // Pushing still fails after flush err = i.CutCompleteTraces(0, true) require.NoError(t, err) - err = i.PushBytesRequest(ctx, req) - require.Contains(t, err.Error(), (newTraceTooLargeError(id, i.instanceID, maxTraceBytes, reqSize)).Error()) + response = i.PushBytesRequest(ctx, req) + _, _, traceTooLargeCount = CheckPushBytesError(response) + assert.Equal(t, true, traceTooLargeCount > 0) // Cut block and then pushing works again _, err = i.CutBlockIfReady(0, 0, true) require.NoError(t, err) - err = i.PushBytesRequest(ctx, req) + response = i.PushBytesRequest(ctx, req) + errored, _, _ = CheckPushBytesError(response) + require.False(t, errored, "push failed: %w", response.ErrorsByTrace) +} + +func TestInstancePartialSuccess(t *testing.T) { + ctx := context.Background() + maxTraceBytes := 1000 + + limits, err := overrides.NewOverrides(overrides.Config{ + Defaults: overrides.Overrides{ + Global: overrides.GlobalOverrides{ + MaxBytesPerTrace: maxTraceBytes, + }, + Ingestion: overrides.IngestionOverrides{ + MaxLocalTracesPerUser: 2, + }, + }, + }) require.NoError(t, err) + limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1) + + ingester, _, _ := defaultIngester(t, t.TempDir()) + ingester.limiter = limiter + + delete(ingester.instances, testTenantID) // force recreate instance to reset limits + i, err := ingester.getOrCreateInstance(testTenantID) + require.NoError(t, err, "unexpected error creating new instance") + + count := 5 + ids := make([][]byte, 0, count) + for j := 0; j < count; j++ { + id := make([]byte, 16) + _, err := crand.Read(id) + require.NoError(t, err) + ids = append(ids, id) + } + + // one with no error [0], two with trace_too_large [1,2], one with no error [3], one should trigger live_traces_exceeded [4] + multiMaxBytes := []int{maxTraceBytes - 300, maxTraceBytes + 200, maxTraceBytes + 200, maxTraceBytes - 300, maxTraceBytes - 200} + req := makePushBytesRequestMultiTraces(ids, multiMaxBytes) + + // Pushing pass + // response should contain errors for both LIVE_TRACES_EXCEEDED and TRACE_TOO_LARGE + response := i.PushBytesRequest(ctx, req) + errored, maxLiveCount, traceTooLargeCount := CheckPushBytesError(response) + + assert.True(t, errored) + assert.Equal(t, true, maxLiveCount > 0) + assert.Equal(t, true, traceTooLargeCount > 0) + + // check that the two good ones actually made it + result, err := i.FindTraceByID(ctx, ids[0]) + require.NoError(t, err, "error finding trace by id") + assert.Equal(t, 1, len(result.Batches)) + + result, err = i.FindTraceByID(ctx, ids[3]) + require.NoError(t, err, "error finding trace by id") + assert.Equal(t, 1, len(result.Batches)) + + // check that the three traces that had errors did not actually make it + var expected *tempopb.Trace + result, err = i.FindTraceByID(ctx, ids[1]) + require.NoError(t, err, "error finding trace by id") + assert.Equal(t, expected, result) + + result, err = i.FindTraceByID(ctx, ids[2]) + require.NoError(t, err, "error finding trace by id") + assert.Equal(t, expected, result) + + result, err = i.FindTraceByID(ctx, ids[4]) + require.NoError(t, err, "error finding trace by id") + assert.Equal(t, expected, result) } func TestSortByteSlices(t *testing.T) { @@ -610,8 +706,9 @@ func BenchmarkInstancePush(b *testing.B) { for i := 0; i < b.N; i++ { // Rotate trace ID binary.LittleEndian.PutUint32(request.Ids[0].Slice, uint32(i)) - err := instance.PushBytesRequest(context.Background(), request) - require.NoError(b, err) + response := instance.PushBytesRequest(context.Background(), request) + errored, _, _ := CheckPushBytesError(response) + require.False(b, errored, "push failed: %w", response.ErrorsByTrace) } } @@ -621,8 +718,9 @@ func BenchmarkInstancePushExistingTrace(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - err := instance.PushBytesRequest(context.Background(), request) - require.NoError(b, err) + response := instance.PushBytesRequest(context.Background(), request) + errored, _, _ := CheckPushBytesError(response) + require.False(b, errored, "push failed: %w", response.ErrorsByTrace) } } @@ -630,11 +728,12 @@ func BenchmarkInstanceFindTraceByIDFromCompleteBlock(b *testing.B) { instance, _ := defaultInstance(b) traceID := test.ValidTraceID([]byte{1, 2, 3, 4, 5, 6, 7, 8}) request := makeRequest(traceID) - err := instance.PushBytesRequest(context.Background(), request) - require.NoError(b, err) + response := instance.PushBytesRequest(context.Background(), request) + errored, _, _ := CheckPushBytesError(response) + require.False(b, errored, "push failed: %w", response.ErrorsByTrace) // force the trace to be in a complete block - err = instance.CutCompleteTraces(0, true) + err := instance.CutCompleteTraces(0, true) require.NoError(b, err) id, err := instance.CutBlockIfReady(0, 0, true) require.NoError(b, err) @@ -663,8 +762,9 @@ func benchmarkInstanceSearch(b testing.TB) { instance, _ := defaultInstance(b) for i := 0; i < 1000; i++ { request := makeRequest(nil) - err := instance.PushBytesRequest(context.Background(), request) - require.NoError(b, err) + response := instance.PushBytesRequest(context.Background(), request) + errored, _, _ := CheckPushBytesError(response) + require.False(b, errored, "push failed: %w", response.ErrorsByTrace) if i%100 == 0 { err := instance.CutCompleteTraces(0, true) @@ -710,11 +810,7 @@ func makeRequest(traceID []byte) *tempopb.PushBytesRequest { // Note that this fn will generate a request with size **close to** maxBytes func makeRequestWithByteLimit(maxBytes int, traceID []byte) *tempopb.PushBytesRequest { traceID = test.ValidTraceID(traceID) - batch := test.MakeBatch(1, traceID) - - for batch.Size() < maxBytes { - batch.ScopeSpans[0].Spans = append(batch.ScopeSpans[0].Spans, test.MakeSpanWithAttributeCount(traceID, 0)) - } + batch := makeBatchWithMaxBytes(maxBytes, traceID) return makePushBytesRequest(traceID, batch) } @@ -768,8 +864,9 @@ func BenchmarkInstanceContention(t *testing.B) { } go concurrent(func() { request := makeRequestWithByteLimit(10_000, nil) - err := i.PushBytesRequest(ctx, request) - require.NoError(t, err, "error pushing traces") + response := i.PushBytesRequest(ctx, request) + errored, _, _ := CheckPushBytesError(response) + require.False(t, errored, "push failed: %w", response.ErrorsByTrace) pushes++ }) @@ -838,3 +935,71 @@ func BenchmarkInstanceContention(t *testing.B) { report(searchBytes, "searchedBytes") report(searchTags, "searchTags") } + +func makeBatchWithMaxBytes(maxBytes int, traceID []byte) *v1_trace.ResourceSpans { + traceID = test.ValidTraceID(traceID) + batch := test.MakeBatch(1, traceID) + + for batch.Size() < maxBytes { + batch.ScopeSpans[0].Spans = append(batch.ScopeSpans[0].Spans, test.MakeSpanWithAttributeCount(traceID, 0)) + } + + return batch +} + +func makeTraces(batches []*v1_trace.ResourceSpans) []*tempopb.Trace { + traces := make([]*tempopb.Trace, 0, len(batches)) + for _, batch := range batches { + traces = append(traces, &tempopb.Trace{Batches: []*v1_trace.ResourceSpans{batch}}) + } + + return traces +} + +func makePushBytesRequestMultiTraces(traceIDs [][]byte, maxBytes []int) *tempopb.PushBytesRequest { + batches := make([]*v1_trace.ResourceSpans, 0, len(traceIDs)) + for index, id := range traceIDs { + batch := makeBatchWithMaxBytes(maxBytes[index], id) + batches = append(batches, batch) + } + traces := makeTraces(batches) + + byteIDs := make([]tempopb.PreallocBytes, 0, len(traceIDs)) + byteTraces := make([]tempopb.PreallocBytes, 0, len(traceIDs)) + + for index, id := range traceIDs { + buffer, err := model.MustNewSegmentDecoder(model.CurrentEncoding).PrepareForWrite(traces[index], 0, 0) + if err != nil { + panic(err) + } + + byteIDs = append(byteIDs, tempopb.PreallocBytes{ + Slice: id, + }) + byteTraces = append(byteTraces, tempopb.PreallocBytes{ + Slice: buffer, + }) + } + + return &tempopb.PushBytesRequest{ + Ids: byteIDs, + Traces: byteTraces, + } +} + +func CheckPushBytesError(response *tempopb.PushResponse) (errored bool, maxLiveTracesCount int, traceTooLargeCount int) { + for _, result := range response.ErrorsByTrace { + switch result { + case tempopb.PushErrorReason_MAX_LIVE_TRACES: + maxLiveTracesCount++ + case tempopb.PushErrorReason_TRACE_TOO_LARGE: + traceTooLargeCount++ + } + } + + if (maxLiveTracesCount + traceTooLargeCount) > 0 { + errored = true + } + + return errored, maxLiveTracesCount, traceTooLargeCount +} diff --git a/modules/overrides/config.go b/modules/overrides/config.go index 47abba9cd43..8787536190d 100644 --- a/modules/overrides/config.go +++ b/modules/overrides/config.go @@ -30,11 +30,11 @@ const ( GlobalIngestionRateStrategy = "global" // ErrorPrefixLiveTracesExceeded is used to flag batches from the ingester that were rejected b/c they had too many traces - ErrorPrefixLiveTracesExceeded = "LIVE_TRACES_EXCEEDED:" + ErrorPrefixLiveTracesExceeded = "LIVE_TRACES_EXCEEDED" // ErrorPrefixTraceTooLarge is used to flag batches from the ingester that were rejected b/c they exceeded the single trace limit - ErrorPrefixTraceTooLarge = "TRACE_TOO_LARGE:" + ErrorPrefixTraceTooLarge = "TRACE_TOO_LARGE" // ErrorPrefixRateLimited is used to flag batches that have exceeded the spans/second of the tenant - ErrorPrefixRateLimited = "RATE_LIMITED:" + ErrorPrefixRateLimited = "RATE_LIMITED" // metrics MetricMaxLocalTracesPerUser = "max_local_traces_per_user" diff --git a/pkg/tempopb/tempo.pb.go b/pkg/tempopb/tempo.pb.go index 323fc8de640..fb84a680070 100644 --- a/pkg/tempopb/tempo.pb.go +++ b/pkg/tempopb/tempo.pb.go @@ -30,6 +30,37 @@ var _ = math.Inf // proto package needs to be updated. const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package +type PushErrorReason int32 + +const ( + PushErrorReason_NO_ERROR PushErrorReason = 0 + PushErrorReason_MAX_LIVE_TRACES PushErrorReason = 1 + PushErrorReason_TRACE_TOO_LARGE PushErrorReason = 2 + PushErrorReason_UNKNOWN_ERROR PushErrorReason = 3 +) + +var PushErrorReason_name = map[int32]string{ + 0: "NO_ERROR", + 1: "MAX_LIVE_TRACES", + 2: "TRACE_TOO_LARGE", + 3: "UNKNOWN_ERROR", +} + +var PushErrorReason_value = map[string]int32{ + "NO_ERROR": 0, + "MAX_LIVE_TRACES": 1, + "TRACE_TOO_LARGE": 2, + "UNKNOWN_ERROR": 3, +} + +func (x PushErrorReason) String() string { + return proto.EnumName(PushErrorReason_name, int32(x)) +} + +func (PushErrorReason) EnumDescriptor() ([]byte, []int) { + return fileDescriptor_f22805646f4f62b6, []int{0} +} + type DedicatedColumn_Scope int32 const ( @@ -1318,6 +1349,7 @@ func (m *Trace) GetBatches() []*v11.ResourceSpans { // Write type PushResponse struct { + ErrorsByTrace []PushErrorReason `protobuf:"varint,1,rep,packed,name=errorsByTrace,proto3,enum=tempopb.PushErrorReason" json:"errorsByTrace,omitempty"` } func (m *PushResponse) Reset() { *m = PushResponse{} } @@ -1353,6 +1385,13 @@ func (m *PushResponse) XXX_DiscardUnknown() { var xxx_messageInfo_PushResponse proto.InternalMessageInfo +func (m *PushResponse) GetErrorsByTrace() []PushErrorReason { + if m != nil { + return m.ErrorsByTrace + } + return nil +} + // PushBytesRequest pushes slices of traces, ids and searchdata. Traces are encoded using the // current BatchDecoder in ./pkg/model type PushBytesRequest struct { @@ -2155,6 +2194,7 @@ func (m *TraceQLStatic) GetKind() int32 { } func init() { + proto.RegisterEnum("tempopb.PushErrorReason", PushErrorReason_name, PushErrorReason_value) proto.RegisterEnum("tempopb.DedicatedColumn_Scope", DedicatedColumn_Scope_name, DedicatedColumn_Scope_value) proto.RegisterEnum("tempopb.DedicatedColumn_Type", DedicatedColumn_Type_name, DedicatedColumn_Type_value) proto.RegisterType((*TraceByIDRequest)(nil), "tempopb.TraceByIDRequest") @@ -2197,132 +2237,138 @@ func init() { func init() { proto.RegisterFile("pkg/tempopb/tempo.proto", fileDescriptor_f22805646f4f62b6) } var fileDescriptor_f22805646f4f62b6 = []byte{ - // 1996 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xc4, 0x58, 0x4f, 0x6f, 0x1b, 0xc7, - 0x15, 0xd7, 0x8a, 0x7f, 0x44, 0x3e, 0x51, 0x32, 0x3d, 0x71, 0x64, 0x9a, 0x76, 0x24, 0x61, 0x63, - 0xb4, 0x4a, 0xe1, 0x50, 0x32, 0x63, 0x21, 0x75, 0x5c, 0xb4, 0x88, 0x2c, 0xd5, 0x96, 0x63, 0x39, - 0xf2, 0x50, 0x71, 0x81, 0x5e, 0x82, 0xe5, 0xee, 0x98, 0xde, 0x8a, 0xdc, 0x65, 0x76, 0x87, 0xaa, - 0xd9, 0x63, 0x81, 0x16, 0x28, 0xd0, 0x43, 0x0f, 0xed, 0xb5, 0x40, 0x4f, 0x41, 0x3f, 0x47, 0x81, - 0x22, 0x97, 0x16, 0x39, 0x16, 0x3d, 0x04, 0x85, 0xfd, 0x09, 0xfa, 0x0d, 0x8a, 0xf7, 0x66, 0x66, - 0xff, 0x90, 0x2b, 0x19, 0x69, 0x0f, 0x3d, 0x71, 0xde, 0x6f, 0x7e, 0xf3, 0xe6, 0xcd, 0x9b, 0x37, - 0xef, 0x3d, 0x2e, 0x5c, 0x1d, 0x9f, 0x0e, 0xb6, 0xa5, 0x18, 0x8d, 0xc3, 0x71, 0x5f, 0xfd, 0x76, - 0xc6, 0x51, 0x28, 0x43, 0xb6, 0xa4, 0xc1, 0xf6, 0x15, 0x19, 0x39, 0xae, 0xd8, 0x3e, 0xbb, 0xbd, - 0x4d, 0x03, 0x35, 0xdd, 0x5e, 0x73, 0xc3, 0xd1, 0x28, 0x0c, 0x10, 0x56, 0x23, 0x8d, 0xbf, 0x3f, - 0xf0, 0xe5, 0x8b, 0x49, 0xbf, 0xe3, 0x86, 0xa3, 0xed, 0x41, 0x38, 0x08, 0xb7, 0x09, 0xee, 0x4f, - 0x9e, 0x93, 0x44, 0x02, 0x8d, 0x14, 0xdd, 0xfe, 0xb5, 0x05, 0xcd, 0x13, 0x54, 0xbb, 0x37, 0x3d, - 0xdc, 0xe7, 0xe2, 0x8b, 0x89, 0x88, 0x25, 0x6b, 0xc1, 0x12, 0x6d, 0x75, 0xb8, 0xdf, 0xb2, 0x36, - 0xad, 0xad, 0x06, 0x37, 0x22, 0x5b, 0x07, 0xe8, 0x0f, 0x43, 0xf7, 0xb4, 0x27, 0x9d, 0x48, 0xb6, - 0x16, 0x37, 0xad, 0xad, 0x3a, 0xcf, 0x20, 0xac, 0x0d, 0x35, 0x92, 0x0e, 0x02, 0xaf, 0x55, 0xa2, - 0xd9, 0x44, 0x66, 0x37, 0xa0, 0xfe, 0xc5, 0x44, 0x44, 0xd3, 0xa3, 0xd0, 0x13, 0xad, 0x0a, 0x4d, - 0xa6, 0x80, 0x1d, 0xc0, 0xe5, 0x8c, 0x1d, 0xf1, 0x38, 0x0c, 0x62, 0xc1, 0x6e, 0x42, 0x85, 0x76, - 0x26, 0x33, 0x96, 0xbb, 0xab, 0x1d, 0xed, 0x93, 0x0e, 0x51, 0xb9, 0x9a, 0x64, 0x1f, 0xc0, 0xd2, - 0x48, 0xc8, 0xc8, 0x77, 0x63, 0xb2, 0x68, 0xb9, 0x7b, 0x2d, 0xcf, 0x43, 0x95, 0x47, 0x8a, 0xc0, - 0x0d, 0xd3, 0x66, 0x99, 0x73, 0xeb, 0x49, 0xfb, 0xef, 0x8b, 0xb0, 0xd2, 0x13, 0x4e, 0xe4, 0xbe, - 0x30, 0x9e, 0xf8, 0x08, 0xca, 0x27, 0xce, 0x20, 0x6e, 0x59, 0x9b, 0xa5, 0xad, 0xe5, 0xee, 0x66, - 0xa2, 0x37, 0xc7, 0xea, 0x20, 0xe5, 0x20, 0x90, 0xd1, 0x74, 0xaf, 0xfc, 0xd5, 0x37, 0x1b, 0x0b, - 0x9c, 0xd6, 0xb0, 0x9b, 0xb0, 0x72, 0xe4, 0x07, 0xfb, 0x93, 0xc8, 0x91, 0x7e, 0x18, 0x1c, 0x29, - 0xe3, 0x56, 0x78, 0x1e, 0x24, 0x96, 0xf3, 0x32, 0xc3, 0x2a, 0x69, 0x56, 0x16, 0x64, 0x57, 0xa0, - 0xf2, 0xd8, 0x1f, 0xf9, 0xb2, 0x55, 0xa6, 0x59, 0x25, 0x20, 0x1a, 0xd3, 0x45, 0x54, 0x14, 0x4a, - 0x02, 0x6b, 0x42, 0x49, 0x04, 0x5e, 0xab, 0x4a, 0x18, 0x0e, 0x91, 0xf7, 0x14, 0x1d, 0xdd, 0xaa, - 0x91, 0xd7, 0x95, 0xc0, 0xb6, 0xe0, 0x52, 0x6f, 0xec, 0x04, 0xf1, 0xb1, 0x88, 0xf0, 0xb7, 0x27, - 0x64, 0xab, 0x4e, 0x6b, 0x66, 0xe1, 0xf6, 0x87, 0x50, 0x4f, 0x8e, 0x88, 0xea, 0x4f, 0xc5, 0x94, - 0x6e, 0xa4, 0xce, 0x71, 0x88, 0xea, 0xcf, 0x9c, 0xe1, 0x44, 0xe8, 0x78, 0x50, 0xc2, 0x47, 0x8b, - 0xdf, 0xb7, 0xec, 0xbf, 0x96, 0x80, 0x29, 0x57, 0xed, 0x61, 0x14, 0x18, 0xaf, 0xde, 0x81, 0x7a, - 0x6c, 0x1c, 0xa8, 0xaf, 0x76, 0xad, 0xd8, 0xb5, 0x3c, 0x25, 0x62, 0x54, 0x52, 0x2c, 0x1d, 0xee, - 0xeb, 0x8d, 0x8c, 0x88, 0x91, 0x45, 0x47, 0x3f, 0x76, 0x06, 0x42, 0xfb, 0x2f, 0x05, 0xd0, 0xc3, - 0x63, 0x67, 0x20, 0xe2, 0x93, 0x50, 0xa9, 0xd6, 0x3e, 0xcc, 0x83, 0x18, 0xb9, 0x22, 0x70, 0x43, - 0xcf, 0x0f, 0x06, 0x3a, 0x38, 0x13, 0x19, 0x35, 0xf8, 0x81, 0x27, 0x5e, 0xa2, 0xba, 0x9e, 0xff, - 0x0b, 0xa1, 0x7d, 0x9b, 0x07, 0x99, 0x0d, 0x0d, 0x19, 0x4a, 0x67, 0xc8, 0x85, 0x1b, 0x46, 0x5e, - 0xdc, 0x5a, 0x22, 0x52, 0x0e, 0x43, 0x8e, 0xe7, 0x48, 0xe7, 0xc0, 0xec, 0xa4, 0x2e, 0x24, 0x87, - 0xe1, 0x39, 0xcf, 0x44, 0x14, 0xfb, 0x61, 0x40, 0xf7, 0x51, 0xe7, 0x46, 0x64, 0x0c, 0xca, 0x31, - 0x6e, 0x0f, 0x9b, 0xd6, 0x56, 0x99, 0xd3, 0x18, 0x5f, 0xe4, 0xf3, 0x30, 0x94, 0x22, 0x22, 0xc3, - 0x96, 0x69, 0xcf, 0x0c, 0xc2, 0xf6, 0xa1, 0xe9, 0x09, 0xcf, 0x77, 0x1d, 0x29, 0xbc, 0xfb, 0xe1, - 0x70, 0x32, 0x0a, 0xe2, 0x56, 0x83, 0xa2, 0xb9, 0x95, 0xb8, 0x7c, 0x3f, 0x4f, 0xe0, 0x73, 0x2b, - 0xec, 0xbf, 0x58, 0x70, 0x69, 0x86, 0xc5, 0xee, 0x40, 0x25, 0x76, 0xc3, 0xb1, 0xf2, 0xf8, 0x6a, - 0x77, 0xfd, 0x3c, 0x75, 0x9d, 0x1e, 0xb2, 0xb8, 0x22, 0xe3, 0x19, 0x02, 0x67, 0x64, 0x62, 0x85, - 0xc6, 0xec, 0x36, 0x94, 0xe5, 0x74, 0xac, 0x5e, 0xf9, 0x6a, 0xf7, 0x9d, 0x73, 0x15, 0x9d, 0x4c, - 0xc7, 0x82, 0x13, 0xd5, 0xde, 0x80, 0x0a, 0xa9, 0x65, 0x35, 0x28, 0xf7, 0x8e, 0x3f, 0x7e, 0xd2, - 0x5c, 0x60, 0x0d, 0xa8, 0xf1, 0x83, 0xde, 0xa7, 0x9f, 0xf1, 0xfb, 0x07, 0x4d, 0xcb, 0x66, 0x50, - 0x46, 0x3a, 0x03, 0xa8, 0xf6, 0x4e, 0xf8, 0xe1, 0x93, 0x07, 0xcd, 0x05, 0xfb, 0x25, 0xac, 0x9a, - 0xe8, 0xd2, 0x09, 0xe6, 0x0e, 0x54, 0x29, 0x87, 0x98, 0x17, 0x7e, 0x23, 0x9f, 0x39, 0x14, 0xfb, - 0x48, 0x48, 0x07, 0x6f, 0x88, 0x6b, 0x2e, 0xdb, 0x99, 0x4d, 0x38, 0xb3, 0xd1, 0x3b, 0x97, 0x6d, - 0xbe, 0x5c, 0x84, 0xb7, 0x0a, 0x34, 0xce, 0x66, 0xda, 0x7a, 0x9a, 0x69, 0xb7, 0xe0, 0x52, 0x14, - 0x86, 0xb2, 0x27, 0xa2, 0x33, 0xdf, 0x15, 0x4f, 0x52, 0x97, 0xcd, 0xc2, 0x18, 0x9d, 0x08, 0x91, - 0x7a, 0xe2, 0xa9, 0xc4, 0x9b, 0x07, 0xd9, 0x2d, 0xb8, 0x4c, 0x4f, 0xe2, 0xc4, 0x1f, 0x89, 0xcf, - 0x02, 0xff, 0xe5, 0x13, 0x27, 0x08, 0xe9, 0x25, 0x94, 0xf9, 0xfc, 0x04, 0x46, 0x95, 0x97, 0xa6, - 0x24, 0x95, 0x5e, 0x32, 0x08, 0xfb, 0x1e, 0x2c, 0xc5, 0x3a, 0x67, 0x54, 0xc9, 0x03, 0xcd, 0xd4, - 0x03, 0x0a, 0xe7, 0x86, 0xc0, 0x6e, 0x41, 0x4d, 0x0f, 0xf1, 0x4d, 0x94, 0x0a, 0xc9, 0x09, 0xc3, - 0xfe, 0x95, 0x05, 0x4b, 0x1a, 0x65, 0xef, 0x42, 0x05, 0x71, 0x73, 0x39, 0x2b, 0xb9, 0x65, 0x5c, - 0xcd, 0xa1, 0x0b, 0x47, 0x8e, 0x74, 0x5f, 0x08, 0x4f, 0x27, 0x58, 0x23, 0xb2, 0x7b, 0x00, 0x8e, - 0x94, 0x91, 0xdf, 0x9f, 0x48, 0x81, 0x79, 0x15, 0x75, 0x5c, 0x4f, 0x74, 0xe8, 0xaa, 0x79, 0x76, - 0xbb, 0xf3, 0x89, 0x98, 0x3e, 0xc3, 0x94, 0xc5, 0x33, 0x74, 0x8c, 0xf8, 0x32, 0x6e, 0xc3, 0xd6, - 0xa0, 0x8a, 0x1b, 0x25, 0x37, 0xa4, 0xa5, 0xc2, 0x40, 0x2e, 0x74, 0x72, 0xe9, 0x3c, 0x27, 0xdf, - 0x84, 0x15, 0xe3, 0x52, 0x94, 0x63, 0x7d, 0x1d, 0x79, 0x70, 0xe6, 0x14, 0x95, 0x6f, 0x77, 0x8a, - 0x7f, 0x5b, 0xa6, 0xa2, 0xe9, 0x90, 0xc4, 0xb8, 0xf2, 0x83, 0x78, 0x2c, 0x5c, 0x29, 0xbc, 0x13, - 0x13, 0xfa, 0x94, 0xf5, 0x67, 0x60, 0xf6, 0x1d, 0x58, 0x4d, 0xa0, 0xbd, 0x29, 0x6e, 0xbe, 0x48, - 0xf6, 0xcd, 0xa0, 0x6c, 0x13, 0x96, 0x29, 0xc7, 0x51, 0x8a, 0x37, 0xf5, 0x2b, 0x0b, 0xe1, 0x41, - 0xdd, 0x70, 0x34, 0x1e, 0x0a, 0x29, 0xbc, 0x47, 0x61, 0x3f, 0x36, 0x19, 0x38, 0x07, 0x62, 0x16, - 0xa7, 0x45, 0xc4, 0x50, 0x21, 0x97, 0x02, 0x68, 0x77, 0xaa, 0x52, 0x99, 0x53, 0x25, 0x73, 0x66, - 0x61, 0xfb, 0x3d, 0xb8, 0xac, 0x8e, 0x8c, 0x35, 0xcb, 0x94, 0x9c, 0x2b, 0x26, 0x59, 0xa9, 0x4b, - 0x54, 0x82, 0xbd, 0x63, 0xca, 0x93, 0xa2, 0xea, 0xa4, 0xd0, 0x86, 0x9a, 0x74, 0x06, 0xf8, 0x6a, - 0x54, 0xe4, 0xd5, 0x79, 0x22, 0xdb, 0x8f, 0xe0, 0x4a, 0xba, 0xe2, 0x59, 0x37, 0x59, 0xd3, 0x85, - 0x2a, 0xa9, 0x34, 0xb1, 0xda, 0x9e, 0xc9, 0x08, 0x8a, 0xae, 0x32, 0xa1, 0x66, 0xda, 0xf7, 0xb2, - 0x86, 0xea, 0xc9, 0x24, 0xac, 0xac, 0x4c, 0x58, 0x31, 0x28, 0x4b, 0xec, 0x42, 0x16, 0xc9, 0x18, - 0x1a, 0xdb, 0x0f, 0x61, 0x2d, 0x59, 0x4c, 0xf7, 0x1e, 0x67, 0xbb, 0x37, 0x65, 0x6e, 0x92, 0x53, - 0x94, 0x88, 0x4e, 0xa0, 0x86, 0xcb, 0x14, 0x6a, 0x12, 0xec, 0x0f, 0xe1, 0xea, 0x9c, 0x26, 0x7d, - 0x2a, 0xbc, 0x12, 0x03, 0x6a, 0x57, 0xa4, 0x80, 0x7d, 0x07, 0x6a, 0x66, 0x09, 0x99, 0x38, 0x4d, - 0xdc, 0x4b, 0xe3, 0xe2, 0xbe, 0xc0, 0x7e, 0x0c, 0xd7, 0x66, 0xb6, 0xcb, 0xb8, 0x71, 0x7b, 0x76, - 0xc3, 0xe5, 0xee, 0xe5, 0x34, 0x25, 0xeb, 0x99, 0xac, 0x0d, 0x7b, 0x50, 0xa1, 0x70, 0x65, 0x77, - 0x61, 0xa9, 0x4f, 0xef, 0xde, 0xac, 0xdb, 0x48, 0xd6, 0xa9, 0xb6, 0xf9, 0xec, 0x76, 0x87, 0x8b, - 0x38, 0x9c, 0x44, 0xae, 0xa0, 0xfe, 0x86, 0x1b, 0xbe, 0xbd, 0x0a, 0x8d, 0xe3, 0x49, 0x9c, 0x14, - 0x05, 0xfb, 0x4f, 0x16, 0x34, 0x11, 0xa0, 0x70, 0x32, 0x5e, 0x7d, 0x3f, 0xa9, 0x14, 0x78, 0x0b, - 0x8d, 0xbd, 0xb7, 0xb1, 0xd3, 0xfb, 0xe7, 0x37, 0x1b, 0x2b, 0xc7, 0x91, 0x70, 0x86, 0xc3, 0xd0, - 0x55, 0x6c, 0x53, 0x22, 0xbe, 0x0b, 0x25, 0xdf, 0x53, 0x49, 0xe7, 0x5c, 0x2e, 0x32, 0xd8, 0x2e, - 0x80, 0x6a, 0x71, 0xf6, 0x1d, 0xe9, 0xb4, 0xca, 0x17, 0xf1, 0x33, 0x44, 0xfb, 0x48, 0x99, 0xa8, - 0x4e, 0xa2, 0x4d, 0xfc, 0x1f, 0x5c, 0x70, 0x13, 0x40, 0x77, 0xc3, 0xf8, 0xa2, 0xd7, 0x72, 0x55, - 0xb1, 0x61, 0x0e, 0x65, 0xff, 0x10, 0xea, 0x8f, 0xfd, 0xe0, 0xb4, 0x37, 0xf4, 0x5d, 0x2c, 0xda, - 0x95, 0xa1, 0x1f, 0x9c, 0x9a, 0xbd, 0xae, 0xcf, 0xef, 0x85, 0x7b, 0x74, 0x70, 0x01, 0x57, 0x4c, - 0xfb, 0x97, 0x16, 0x30, 0x04, 0x4d, 0x79, 0x4c, 0xdf, 0xa6, 0x0a, 0x4b, 0x2b, 0x13, 0x96, 0x18, - 0xc6, 0x83, 0x28, 0x9c, 0x8c, 0xf7, 0x4c, 0xb8, 0x1a, 0x11, 0xf9, 0x43, 0x6a, 0x86, 0x55, 0x66, - 0x55, 0x42, 0xda, 0x0c, 0x97, 0x0b, 0x9a, 0xe1, 0x4a, 0xd2, 0x0c, 0xdb, 0xbf, 0xb1, 0xe0, 0x5a, - 0xc6, 0x88, 0xde, 0x64, 0x34, 0x72, 0xa2, 0xe9, 0xff, 0xc7, 0x96, 0x3f, 0x5b, 0xf0, 0x56, 0xce, - 0x21, 0xe9, 0xbb, 0x13, 0xb1, 0xf4, 0x47, 0xd8, 0xfb, 0x90, 0x25, 0x35, 0x9e, 0x02, 0xd4, 0xee, - 0x8e, 0x9d, 0xe0, 0x7e, 0x38, 0x09, 0xa4, 0xce, 0xc9, 0x29, 0x80, 0x69, 0x5b, 0x44, 0x51, 0x48, - 0xcd, 0xbb, 0xa2, 0x28, 0xd3, 0x66, 0x50, 0xd6, 0x49, 0x9b, 0x98, 0x32, 0xdd, 0xe0, 0x95, 0x5c, - 0x79, 0x9d, 0x6b, 0x61, 0x7e, 0x00, 0x0d, 0xee, 0xfc, 0xfc, 0xa1, 0x1f, 0xcb, 0x70, 0x10, 0x39, - 0x23, 0x0c, 0x92, 0xfe, 0xc4, 0x3d, 0x15, 0x92, 0x0c, 0x2c, 0x73, 0x2d, 0xe1, 0xd9, 0xdd, 0x8c, - 0x65, 0x4a, 0xb0, 0x1f, 0x41, 0xcd, 0x14, 0xa8, 0x82, 0x7f, 0x10, 0xb7, 0xb2, 0x99, 0x22, 0xdb, - 0x4e, 0x51, 0x50, 0x3e, 0x7d, 0xdc, 0x93, 0x8e, 0xf4, 0x5d, 0x93, 0x41, 0x7e, 0x6f, 0xc1, 0x72, - 0xc6, 0x44, 0xb6, 0x07, 0x97, 0x87, 0x8e, 0x14, 0x81, 0x3b, 0xfd, 0xfc, 0x85, 0x31, 0x4f, 0x47, - 0xe5, 0xdb, 0x89, 0xa6, 0xac, 0xed, 0xbc, 0xa9, 0xf9, 0xe9, 0x69, 0xde, 0x83, 0x6a, 0x2c, 0x22, - 0x5f, 0x3f, 0xef, 0x6c, 0xd6, 0x49, 0xea, 0xaa, 0x26, 0xe0, 0xc1, 0xc9, 0x95, 0xb1, 0x76, 0xac, - 0x96, 0xec, 0xbf, 0xe5, 0xa3, 0x5b, 0x07, 0x56, 0xfe, 0xb6, 0xac, 0x37, 0xdf, 0xd6, 0x62, 0xe1, - 0x6d, 0xa5, 0xf6, 0x95, 0xde, 0x64, 0x5f, 0x13, 0x4a, 0xe3, 0xbb, 0x77, 0x75, 0x33, 0x81, 0x43, - 0x85, 0xec, 0x52, 0xe0, 0x11, 0xb2, 0xab, 0x90, 0x1d, 0x5d, 0x41, 0x71, 0x48, 0xc8, 0xee, 0x0e, - 0xfd, 0x69, 0x41, 0x64, 0x77, 0xc7, 0xfe, 0x09, 0xb4, 0x8b, 0xde, 0x89, 0x0e, 0xd1, 0xbb, 0x50, - 0x8f, 0x09, 0xf2, 0xc5, 0x7c, 0x0a, 0x28, 0x58, 0x97, 0xb2, 0xed, 0x3f, 0x58, 0xb0, 0x92, 0xbb, - 0xd8, 0x5c, 0xf5, 0xa8, 0xe8, 0xea, 0xd1, 0x00, 0x2b, 0x20, 0x67, 0x94, 0xb8, 0x15, 0xa0, 0xf4, - 0x9c, 0xfc, 0x6d, 0x71, 0xeb, 0x39, 0x4a, 0xaa, 0x89, 0xa8, 0x73, 0x2b, 0x46, 0xa9, 0x4f, 0x87, - 0xab, 0x71, 0xab, 0x8f, 0x92, 0xa7, 0x0f, 0x66, 0x79, 0xd4, 0xbd, 0x49, 0x47, 0x4e, 0xd4, 0xdf, - 0xb1, 0x0a, 0xd7, 0x12, 0xee, 0x78, 0xea, 0x07, 0x1e, 0xfd, 0x01, 0xab, 0x70, 0x1a, 0x77, 0x7f, - 0x6b, 0x41, 0x15, 0x93, 0xaa, 0x88, 0xd8, 0x8f, 0xa0, 0x9e, 0x54, 0x00, 0x96, 0x7e, 0x4e, 0x98, - 0xad, 0x0a, 0xed, 0xb7, 0x73, 0x53, 0x49, 0x05, 0x59, 0x60, 0x1f, 0xc3, 0x72, 0x42, 0x7e, 0xd6, - 0xfd, 0x6f, 0x54, 0x74, 0xff, 0x68, 0x41, 0x53, 0x3b, 0xf1, 0x81, 0x08, 0x44, 0xe4, 0xc8, 0x30, - 0x31, 0x8c, 0xd2, 0xf7, 0x8c, 0xd6, 0x6c, 0x2d, 0x38, 0xdf, 0xb0, 0x43, 0x80, 0x07, 0x42, 0x9a, - 0xa7, 0x53, 0x78, 0x65, 0x46, 0xc7, 0x8d, 0xe2, 0xc9, 0xc4, 0xc0, 0x2f, 0xcb, 0xb0, 0xf4, 0x74, - 0x82, 0x41, 0x17, 0xb1, 0x87, 0xb0, 0xf2, 0x63, 0x3f, 0xf0, 0x92, 0x4f, 0x2a, 0xac, 0xe0, 0x1b, - 0x8c, 0xd1, 0xdb, 0x2e, 0x9a, 0xca, 0x78, 0xae, 0x61, 0xfe, 0xa4, 0xb9, 0x22, 0x90, 0xec, 0x9c, - 0x2f, 0x03, 0xed, 0xab, 0x73, 0x78, 0xa2, 0xe2, 0x00, 0x96, 0x33, 0x5f, 0x1d, 0xb2, 0x87, 0x9c, - 0xfb, 0x16, 0x71, 0x91, 0x9a, 0x07, 0x00, 0x69, 0x7f, 0xc6, 0x8a, 0x3a, 0x3a, 0xa3, 0xe4, 0x7a, - 0xe1, 0x5c, 0xa2, 0xe8, 0x13, 0x73, 0x24, 0xd5, 0xe8, 0x5d, 0xa8, 0xea, 0x9d, 0xc2, 0xc6, 0x31, - 0xa3, 0xec, 0x19, 0x5c, 0x9a, 0xe9, 0x9f, 0xd8, 0xc6, 0xfc, 0x9a, 0x5c, 0x4b, 0xd8, 0xde, 0x3c, - 0x9f, 0x90, 0xe8, 0xfd, 0x69, 0xa6, 0x1b, 0x35, 0x7d, 0xd9, 0x9b, 0x35, 0xdb, 0xe7, 0x11, 0xb2, - 0x36, 0x77, 0x3f, 0x85, 0x66, 0x4f, 0x46, 0xc2, 0x19, 0xf9, 0xc1, 0xc0, 0x44, 0xcc, 0x3d, 0xa8, - 0xea, 0x4f, 0x2f, 0xdf, 0xf6, 0x86, 0x77, 0xac, 0xee, 0xcf, 0x60, 0xc9, 0x84, 0xf0, 0xe7, 0x85, - 0x59, 0xd7, 0xbe, 0x28, 0x17, 0x69, 0xfd, 0xef, 0x5e, 0xc8, 0x31, 0xc6, 0xef, 0xb5, 0xbe, 0x7a, - 0xb5, 0x6e, 0x7d, 0xfd, 0x6a, 0xdd, 0xfa, 0xd7, 0xab, 0x75, 0xeb, 0x77, 0xaf, 0xd7, 0x17, 0xbe, - 0x7e, 0xbd, 0xbe, 0xf0, 0x8f, 0xd7, 0xeb, 0x0b, 0xfd, 0x2a, 0x7d, 0x43, 0xfd, 0xe0, 0x3f, 0x01, - 0x00, 0x00, 0xff, 0xff, 0x1d, 0xf6, 0x35, 0x13, 0xc4, 0x15, 0x00, 0x00, + // 2089 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xc4, 0x58, 0x4b, 0x6f, 0x1b, 0xc9, + 0x11, 0xd6, 0x88, 0x0f, 0x89, 0xa5, 0x17, 0xd5, 0x7e, 0xd1, 0xb4, 0x57, 0x16, 0x66, 0x8d, 0x44, + 0xbb, 0xf0, 0x52, 0x32, 0xd7, 0xc6, 0xc6, 0xeb, 0x60, 0x03, 0xd3, 0x62, 0x6c, 0xd9, 0x7a, 0xd8, + 0x4d, 0xda, 0x1b, 0x04, 0x01, 0x84, 0xe1, 0xb0, 0x4d, 0x4f, 0x44, 0xce, 0x70, 0x67, 0x9a, 0x8a, + 0x99, 0x63, 0x80, 0x04, 0x08, 0x90, 0x43, 0x0e, 0xc9, 0x35, 0x40, 0x4e, 0x8b, 0xfc, 0x8e, 0x00, + 0xc1, 0x5e, 0x12, 0xec, 0x31, 0xc8, 0x61, 0x11, 0xd8, 0xbf, 0x20, 0xff, 0x20, 0xa8, 0xea, 0xee, + 0x79, 0x90, 0x94, 0x8d, 0x4d, 0x0e, 0x7b, 0x62, 0xd7, 0xd7, 0x5f, 0x57, 0x57, 0x57, 0x57, 0x57, + 0x15, 0x07, 0x2e, 0x0d, 0x4f, 0x7a, 0xdb, 0x52, 0x0c, 0x86, 0xc1, 0xb0, 0xa3, 0x7e, 0x6b, 0xc3, + 0x30, 0x90, 0x01, 0x5b, 0xd0, 0x60, 0xf5, 0xbc, 0x0c, 0x1d, 0x57, 0x6c, 0x9f, 0xde, 0xdc, 0xa6, + 0x81, 0x9a, 0xae, 0x5e, 0x74, 0x83, 0xc1, 0x20, 0xf0, 0x11, 0x56, 0x23, 0x8d, 0x7f, 0xd4, 0xf3, + 0xe4, 0xcb, 0x51, 0xa7, 0xe6, 0x06, 0x83, 0xed, 0x5e, 0xd0, 0x0b, 0xb6, 0x09, 0xee, 0x8c, 0x5e, + 0x90, 0x44, 0x02, 0x8d, 0x14, 0xdd, 0xfe, 0x8d, 0x05, 0xe5, 0x36, 0xaa, 0x6d, 0x8c, 0xf7, 0x76, + 0xb9, 0xf8, 0x62, 0x24, 0x22, 0xc9, 0x2a, 0xb0, 0x40, 0x5b, 0xed, 0xed, 0x56, 0xac, 0x4d, 0x6b, + 0x6b, 0x99, 0x1b, 0x91, 0x6d, 0x00, 0x74, 0xfa, 0x81, 0x7b, 0xd2, 0x92, 0x4e, 0x28, 0x2b, 0xf3, + 0x9b, 0xd6, 0x56, 0x89, 0xa7, 0x10, 0x56, 0x85, 0x45, 0x92, 0x9a, 0x7e, 0xb7, 0x92, 0xa3, 0xd9, + 0x58, 0x66, 0x57, 0xa1, 0xf4, 0xc5, 0x48, 0x84, 0xe3, 0x83, 0xa0, 0x2b, 0x2a, 0x05, 0x9a, 0x4c, + 0x00, 0xdb, 0x87, 0xf5, 0x94, 0x1d, 0xd1, 0x30, 0xf0, 0x23, 0xc1, 0xae, 0x43, 0x81, 0x76, 0x26, + 0x33, 0x96, 0xea, 0xab, 0x35, 0xed, 0x93, 0x1a, 0x51, 0xb9, 0x9a, 0x64, 0x1f, 0xc3, 0xc2, 0x40, + 0xc8, 0xd0, 0x73, 0x23, 0xb2, 0x68, 0xa9, 0x7e, 0x39, 0xcb, 0x43, 0x95, 0x07, 0x8a, 0xc0, 0x0d, + 0xd3, 0x66, 0xa9, 0x73, 0xeb, 0x49, 0xfb, 0x1f, 0xf3, 0xb0, 0xd2, 0x12, 0x4e, 0xe8, 0xbe, 0x34, + 0x9e, 0xf8, 0x14, 0xf2, 0x6d, 0xa7, 0x17, 0x55, 0xac, 0xcd, 0xdc, 0xd6, 0x52, 0x7d, 0x33, 0xd6, + 0x9b, 0x61, 0xd5, 0x90, 0xd2, 0xf4, 0x65, 0x38, 0x6e, 0xe4, 0xbf, 0xfa, 0xe6, 0xda, 0x1c, 0xa7, + 0x35, 0xec, 0x3a, 0xac, 0x1c, 0x78, 0xfe, 0xee, 0x28, 0x74, 0xa4, 0x17, 0xf8, 0x07, 0xca, 0xb8, + 0x15, 0x9e, 0x05, 0x89, 0xe5, 0xbc, 0x4a, 0xb1, 0x72, 0x9a, 0x95, 0x06, 0xd9, 0x79, 0x28, 0xec, + 0x7b, 0x03, 0x4f, 0x56, 0xf2, 0x34, 0xab, 0x04, 0x44, 0x23, 0xba, 0x88, 0x82, 0x42, 0x49, 0x60, + 0x65, 0xc8, 0x09, 0xbf, 0x5b, 0x29, 0x12, 0x86, 0x43, 0xe4, 0x3d, 0x45, 0x47, 0x57, 0x16, 0xc9, + 0xeb, 0x4a, 0x60, 0x5b, 0xb0, 0xd6, 0x1a, 0x3a, 0x7e, 0xf4, 0x44, 0x84, 0xf8, 0xdb, 0x12, 0xb2, + 0x52, 0xa2, 0x35, 0x93, 0x70, 0xf5, 0x13, 0x28, 0xc5, 0x47, 0x44, 0xf5, 0x27, 0x62, 0x4c, 0x37, + 0x52, 0xe2, 0x38, 0x44, 0xf5, 0xa7, 0x4e, 0x7f, 0x24, 0x74, 0x3c, 0x28, 0xe1, 0xd3, 0xf9, 0x1f, + 0x58, 0xf6, 0xdf, 0x72, 0xc0, 0x94, 0xab, 0x1a, 0x18, 0x05, 0xc6, 0xab, 0xb7, 0xa0, 0x14, 0x19, + 0x07, 0xea, 0xab, 0xbd, 0x38, 0xdb, 0xb5, 0x3c, 0x21, 0x62, 0x54, 0x52, 0x2c, 0xed, 0xed, 0xea, + 0x8d, 0x8c, 0x88, 0x91, 0x45, 0x47, 0x7f, 0xe2, 0xf4, 0x84, 0xf6, 0x5f, 0x02, 0xa0, 0x87, 0x87, + 0x4e, 0x4f, 0x44, 0xed, 0x40, 0xa9, 0xd6, 0x3e, 0xcc, 0x82, 0x18, 0xb9, 0xc2, 0x77, 0x83, 0xae, + 0xe7, 0xf7, 0x74, 0x70, 0xc6, 0x32, 0x6a, 0xf0, 0xfc, 0xae, 0x78, 0x85, 0xea, 0x5a, 0xde, 0x2f, + 0x85, 0xf6, 0x6d, 0x16, 0x64, 0x36, 0x2c, 0xcb, 0x40, 0x3a, 0x7d, 0x2e, 0xdc, 0x20, 0xec, 0x46, + 0x95, 0x05, 0x22, 0x65, 0x30, 0xe4, 0x74, 0x1d, 0xe9, 0x34, 0xcd, 0x4e, 0xea, 0x42, 0x32, 0x18, + 0x9e, 0xf3, 0x54, 0x84, 0x91, 0x17, 0xf8, 0x74, 0x1f, 0x25, 0x6e, 0x44, 0xc6, 0x20, 0x1f, 0xe1, + 0xf6, 0xb0, 0x69, 0x6d, 0xe5, 0x39, 0x8d, 0xf1, 0x45, 0xbe, 0x08, 0x02, 0x29, 0x42, 0x32, 0x6c, + 0x89, 0xf6, 0x4c, 0x21, 0x6c, 0x17, 0xca, 0x5d, 0xd1, 0xf5, 0x5c, 0x47, 0x8a, 0xee, 0xfd, 0xa0, + 0x3f, 0x1a, 0xf8, 0x51, 0x65, 0x99, 0xa2, 0xb9, 0x12, 0xbb, 0x7c, 0x37, 0x4b, 0xe0, 0x53, 0x2b, + 0xec, 0xbf, 0x5a, 0xb0, 0x36, 0xc1, 0x62, 0xb7, 0xa0, 0x10, 0xb9, 0xc1, 0x50, 0x79, 0x7c, 0xb5, + 0xbe, 0x71, 0x96, 0xba, 0x5a, 0x0b, 0x59, 0x5c, 0x91, 0xf1, 0x0c, 0xbe, 0x33, 0x30, 0xb1, 0x42, + 0x63, 0x76, 0x13, 0xf2, 0x72, 0x3c, 0x54, 0xaf, 0x7c, 0xb5, 0xfe, 0xde, 0x99, 0x8a, 0xda, 0xe3, + 0xa1, 0xe0, 0x44, 0xb5, 0xaf, 0x41, 0x81, 0xd4, 0xb2, 0x45, 0xc8, 0xb7, 0x9e, 0xdc, 0x3b, 0x2c, + 0xcf, 0xb1, 0x65, 0x58, 0xe4, 0xcd, 0xd6, 0xd1, 0x33, 0x7e, 0xbf, 0x59, 0xb6, 0x6c, 0x06, 0x79, + 0xa4, 0x33, 0x80, 0x62, 0xab, 0xcd, 0xf7, 0x0e, 0x1f, 0x94, 0xe7, 0xec, 0x57, 0xb0, 0x6a, 0xa2, + 0x4b, 0x27, 0x98, 0x5b, 0x50, 0xa4, 0x1c, 0x62, 0x5e, 0xf8, 0xd5, 0x6c, 0xe6, 0x50, 0xec, 0x03, + 0x21, 0x1d, 0xbc, 0x21, 0xae, 0xb9, 0x6c, 0x67, 0x32, 0xe1, 0x4c, 0x46, 0xef, 0x54, 0xb6, 0xf9, + 0x72, 0x1e, 0xce, 0xcd, 0xd0, 0x38, 0x99, 0x69, 0x4b, 0x49, 0xa6, 0xdd, 0x82, 0xb5, 0x30, 0x08, + 0x64, 0x4b, 0x84, 0xa7, 0x9e, 0x2b, 0x0e, 0x13, 0x97, 0x4d, 0xc2, 0x18, 0x9d, 0x08, 0x91, 0x7a, + 0xe2, 0xa9, 0xc4, 0x9b, 0x05, 0xd9, 0x0d, 0x58, 0xa7, 0x27, 0xd1, 0xf6, 0x06, 0xe2, 0x99, 0xef, + 0xbd, 0x3a, 0x74, 0xfc, 0x80, 0x5e, 0x42, 0x9e, 0x4f, 0x4f, 0x60, 0x54, 0x75, 0x93, 0x94, 0xa4, + 0xd2, 0x4b, 0x0a, 0x61, 0x1f, 0xc2, 0x42, 0xa4, 0x73, 0x46, 0x91, 0x3c, 0x50, 0x4e, 0x3c, 0xa0, + 0x70, 0x6e, 0x08, 0xec, 0x06, 0x2c, 0xea, 0x21, 0xbe, 0x89, 0xdc, 0x4c, 0x72, 0xcc, 0xb0, 0x7f, + 0x6d, 0xc1, 0x82, 0x46, 0xd9, 0xfb, 0x50, 0x40, 0xdc, 0x5c, 0xce, 0x4a, 0x66, 0x19, 0x57, 0x73, + 0xe8, 0xc2, 0x81, 0x23, 0xdd, 0x97, 0xa2, 0xab, 0x13, 0xac, 0x11, 0xd9, 0x5d, 0x00, 0x47, 0xca, + 0xd0, 0xeb, 0x8c, 0xa4, 0xc0, 0xbc, 0x8a, 0x3a, 0xae, 0xc4, 0x3a, 0x74, 0xd5, 0x3c, 0xbd, 0x59, + 0x7b, 0x2c, 0xc6, 0xcf, 0x31, 0x65, 0xf1, 0x14, 0x1d, 0x23, 0x3e, 0x8f, 0xdb, 0xb0, 0x8b, 0x50, + 0xc4, 0x8d, 0xe2, 0x1b, 0xd2, 0xd2, 0xcc, 0x40, 0x9e, 0xe9, 0xe4, 0xdc, 0x59, 0x4e, 0xbe, 0x0e, + 0x2b, 0xc6, 0xa5, 0x28, 0x47, 0xfa, 0x3a, 0xb2, 0xe0, 0xc4, 0x29, 0x0a, 0xdf, 0xee, 0x14, 0xff, + 0xb1, 0x4c, 0x45, 0xd3, 0x21, 0x89, 0x71, 0xe5, 0xf9, 0xd1, 0x50, 0xb8, 0x52, 0x74, 0xdb, 0x26, + 0xf4, 0x29, 0xeb, 0x4f, 0xc0, 0xec, 0x7b, 0xb0, 0x1a, 0x43, 0x8d, 0x31, 0x6e, 0x3e, 0x4f, 0xf6, + 0x4d, 0xa0, 0x6c, 0x13, 0x96, 0x28, 0xc7, 0x51, 0x8a, 0x37, 0xf5, 0x2b, 0x0d, 0xe1, 0x41, 0xdd, + 0x60, 0x30, 0xec, 0x0b, 0x29, 0xba, 0x8f, 0x82, 0x4e, 0x64, 0x32, 0x70, 0x06, 0xc4, 0x2c, 0x4e, + 0x8b, 0x88, 0xa1, 0x42, 0x2e, 0x01, 0xd0, 0xee, 0x44, 0xa5, 0x32, 0xa7, 0x48, 0xe6, 0x4c, 0xc2, + 0xf6, 0x07, 0xb0, 0xae, 0x8e, 0x8c, 0x35, 0xcb, 0x94, 0x9c, 0xf3, 0x26, 0x59, 0xa9, 0x4b, 0x54, + 0x82, 0xbd, 0x63, 0xca, 0x93, 0xa2, 0xea, 0xa4, 0x50, 0x85, 0x45, 0xe9, 0xf4, 0xf0, 0xd5, 0xa8, + 0xc8, 0x2b, 0xf1, 0x58, 0xb6, 0x1f, 0xc1, 0xf9, 0x64, 0xc5, 0xf3, 0x7a, 0xbc, 0xa6, 0x0e, 0x45, + 0x52, 0x69, 0x62, 0xb5, 0x3a, 0x91, 0x11, 0x14, 0x5d, 0x65, 0x42, 0xcd, 0xb4, 0xef, 0xa6, 0x0d, + 0xd5, 0x93, 0x71, 0x58, 0x59, 0xa9, 0xb0, 0x62, 0x90, 0x97, 0xd8, 0x85, 0xcc, 0x93, 0x31, 0x34, + 0xb6, 0x1f, 0xc2, 0xc5, 0x78, 0x31, 0xdd, 0x7b, 0x94, 0xee, 0xde, 0x94, 0xb9, 0x71, 0x4e, 0x51, + 0x22, 0x3a, 0x81, 0x1a, 0x2e, 0x53, 0xa8, 0x49, 0xb0, 0x3f, 0x81, 0x4b, 0x53, 0x9a, 0xf4, 0xa9, + 0xf0, 0x4a, 0x0c, 0xa8, 0x5d, 0x91, 0x00, 0xf6, 0x2d, 0x58, 0x34, 0x4b, 0xc8, 0xc4, 0x71, 0xec, + 0x5e, 0x1a, 0xcf, 0xee, 0x0b, 0xec, 0x7d, 0xb8, 0x3c, 0xb1, 0x5d, 0xca, 0x8d, 0xdb, 0x93, 0x1b, + 0x2e, 0xd5, 0xd7, 0x93, 0x94, 0xac, 0x67, 0xd2, 0x36, 0x34, 0xa0, 0x40, 0xe1, 0xca, 0xee, 0xc0, + 0x42, 0x87, 0xde, 0xbd, 0x59, 0x77, 0x2d, 0x5e, 0xa7, 0xda, 0xe6, 0xd3, 0x9b, 0x35, 0x2e, 0xa2, + 0x60, 0x14, 0xba, 0x82, 0xfa, 0x1b, 0x6e, 0xf8, 0xf6, 0x21, 0x2c, 0x3f, 0x19, 0x45, 0x49, 0x51, + 0xf8, 0x0c, 0x56, 0x44, 0x18, 0x06, 0x61, 0xd4, 0x18, 0xb7, 0x75, 0xf7, 0x99, 0xdb, 0x5a, 0x4d, + 0xd5, 0x4b, 0x64, 0x37, 0x91, 0xc1, 0x85, 0x13, 0x05, 0x3e, 0xcf, 0xd2, 0xed, 0x3f, 0x5b, 0x50, + 0x46, 0x0a, 0x85, 0xa3, 0xb9, 0x95, 0x8f, 0xe2, 0x4a, 0x83, 0xb7, 0xb8, 0xdc, 0xb8, 0x80, 0x9d, + 0xe2, 0xbf, 0xbe, 0xb9, 0xb6, 0xf2, 0x24, 0x14, 0x4e, 0xbf, 0x1f, 0xb8, 0x8a, 0x6d, 0x4a, 0xcc, + 0xf7, 0x21, 0xe7, 0x75, 0x55, 0xd2, 0x3a, 0x93, 0x8b, 0x0c, 0x76, 0x1b, 0x40, 0xb5, 0x48, 0xbb, + 0x8e, 0x74, 0x2a, 0xf9, 0xb7, 0xf1, 0x53, 0x44, 0xfb, 0x40, 0x99, 0xa8, 0x3c, 0xa1, 0x4d, 0xfc, + 0x3f, 0x5c, 0x78, 0x1d, 0x40, 0x77, 0xd3, 0x98, 0x11, 0x2e, 0x66, 0xaa, 0xea, 0xb2, 0x39, 0x94, + 0xfd, 0x19, 0x94, 0xf6, 0x3d, 0xff, 0xa4, 0xd5, 0xf7, 0x5c, 0x2c, 0xfa, 0x85, 0xbe, 0xe7, 0x9f, + 0x98, 0xbd, 0xae, 0x4c, 0xef, 0x85, 0x7b, 0xd4, 0x70, 0x01, 0x57, 0x4c, 0xfb, 0x57, 0x16, 0x30, + 0x04, 0x4d, 0x79, 0x4d, 0xde, 0xb6, 0x0a, 0x6b, 0x2b, 0x15, 0xd6, 0xf8, 0x0c, 0x7a, 0x61, 0x30, + 0x1a, 0x36, 0x4c, 0xb8, 0x1b, 0x11, 0xf9, 0x7d, 0x6a, 0xa6, 0x55, 0x66, 0x56, 0x42, 0xd2, 0x4c, + 0xe7, 0x67, 0x34, 0xd3, 0x85, 0xb8, 0x99, 0xb6, 0x7f, 0x6b, 0xc1, 0xe5, 0x94, 0x11, 0xad, 0xd1, + 0x60, 0xe0, 0x84, 0xe3, 0xef, 0xc6, 0x96, 0xbf, 0x58, 0x70, 0x2e, 0xe3, 0x90, 0xe4, 0xdd, 0x8a, + 0x48, 0x7a, 0x03, 0xec, 0x9d, 0xc8, 0x92, 0x45, 0x9e, 0x00, 0xd4, 0x2e, 0x0f, 0x1d, 0xff, 0x7e, + 0x30, 0xf2, 0xa5, 0xce, 0xe9, 0x09, 0x80, 0x69, 0x9f, 0xc2, 0xb9, 0x15, 0x53, 0x94, 0x69, 0x13, + 0x28, 0xab, 0x25, 0x4d, 0x50, 0x9e, 0x6e, 0xf0, 0x7c, 0xa6, 0x3c, 0x4f, 0xb5, 0x40, 0x3f, 0x84, + 0x65, 0xee, 0xfc, 0xe2, 0xa1, 0x17, 0xc9, 0xa0, 0x17, 0x3a, 0x03, 0x0c, 0x92, 0xce, 0xc8, 0x3d, + 0x11, 0x92, 0x0c, 0xcc, 0x73, 0x2d, 0xe1, 0xd9, 0xdd, 0x94, 0x65, 0x4a, 0xb0, 0x1f, 0xc1, 0xa2, + 0x29, 0x70, 0x33, 0xfe, 0x81, 0xdc, 0x48, 0x67, 0x9a, 0x74, 0x3b, 0x46, 0x41, 0xf9, 0x74, 0xbf, + 0x25, 0x1d, 0xe9, 0xb9, 0x26, 0x03, 0xfd, 0xc1, 0x82, 0xa5, 0x94, 0x89, 0xac, 0x01, 0xeb, 0x7d, + 0x47, 0x0a, 0xdf, 0x1d, 0x1f, 0xbf, 0x34, 0xe6, 0xe9, 0xa8, 0xbc, 0x10, 0x6b, 0x4a, 0xdb, 0xce, + 0xcb, 0x9a, 0x9f, 0x9c, 0xe6, 0x03, 0x28, 0x46, 0x22, 0xf4, 0xf4, 0xf3, 0x4e, 0x67, 0xad, 0xb8, + 0x2e, 0x6b, 0x02, 0x1e, 0x5c, 0xe5, 0x0b, 0xed, 0x58, 0x2d, 0xd9, 0x7f, 0xcf, 0x46, 0xb7, 0x0e, + 0xac, 0xec, 0x6d, 0x59, 0xef, 0xbe, 0xad, 0xf9, 0x99, 0xb7, 0x95, 0xd8, 0x97, 0x7b, 0x97, 0x7d, + 0x65, 0xc8, 0x0d, 0xef, 0xdc, 0xd1, 0xcd, 0x08, 0x0e, 0x15, 0x72, 0x9b, 0x02, 0x8f, 0x90, 0xdb, + 0x0a, 0xd9, 0xd1, 0x15, 0x18, 0x87, 0x84, 0xdc, 0xde, 0xa1, 0x3f, 0x3d, 0x88, 0xdc, 0xde, 0xb1, + 0x3f, 0x87, 0xea, 0xac, 0x77, 0xa2, 0x43, 0xf4, 0x0e, 0x94, 0x22, 0x82, 0x3c, 0x31, 0x9d, 0x02, + 0x66, 0xac, 0x4b, 0xd8, 0xf6, 0x1f, 0x2d, 0x58, 0xc9, 0x5c, 0x6c, 0xa6, 0xfa, 0x14, 0x74, 0xf5, + 0x59, 0x06, 0xcb, 0x27, 0x67, 0xe4, 0xb8, 0xe5, 0xa3, 0xf4, 0x82, 0xfc, 0x6d, 0x71, 0xeb, 0x05, + 0x4a, 0xaa, 0x09, 0x29, 0x71, 0x2b, 0x42, 0xa9, 0x43, 0x87, 0x5b, 0xe4, 0x56, 0x07, 0xa5, 0xae, + 0x3e, 0x98, 0xd5, 0xa5, 0xee, 0x4f, 0x3a, 0x72, 0xa4, 0xfe, 0xce, 0x15, 0xb8, 0x96, 0x70, 0xc7, + 0x13, 0xcf, 0xef, 0xd2, 0x1f, 0xb8, 0x02, 0xa7, 0xf1, 0x87, 0x3f, 0x83, 0xb5, 0x89, 0xca, 0x80, + 0xff, 0x49, 0x0e, 0x8f, 0x8e, 0x9b, 0x9c, 0x1f, 0xf1, 0xf2, 0x1c, 0x3b, 0x07, 0x6b, 0x07, 0xf7, + 0x7e, 0x72, 0xbc, 0xbf, 0xf7, 0xbc, 0x79, 0xdc, 0xe6, 0xf7, 0xee, 0x37, 0x5b, 0x65, 0x0b, 0x41, + 0x1a, 0x1f, 0xb7, 0x8f, 0x8e, 0x8e, 0xf7, 0xef, 0xf1, 0x07, 0xcd, 0xf2, 0x3c, 0x5b, 0x87, 0x95, + 0x67, 0x87, 0x8f, 0x0f, 0x8f, 0x3e, 0x3f, 0xd4, 0x8b, 0x73, 0xf5, 0xdf, 0x59, 0x50, 0x44, 0xf5, + 0x22, 0x64, 0x3f, 0x82, 0x52, 0x5c, 0x5f, 0xd8, 0xe5, 0x4c, 0x59, 0x4a, 0xd7, 0x9c, 0xea, 0x85, + 0xcc, 0x94, 0x71, 0xbd, 0x3d, 0xc7, 0xee, 0xc1, 0x52, 0x4c, 0x7e, 0x5e, 0xff, 0x5f, 0x54, 0xd4, + 0xff, 0x64, 0x41, 0x59, 0x5f, 0xd1, 0x03, 0xe1, 0x8b, 0xd0, 0x91, 0x41, 0x6c, 0x18, 0x15, 0x87, + 0x09, 0xad, 0xe9, 0x4a, 0x73, 0xb6, 0x61, 0x7b, 0x00, 0x0f, 0x84, 0x34, 0x0f, 0x73, 0x66, 0x40, + 0x18, 0x1d, 0x57, 0x67, 0x4f, 0xc6, 0x06, 0x7e, 0x99, 0x87, 0x85, 0xa7, 0x23, 0x0c, 0xe9, 0x90, + 0x3d, 0x84, 0x95, 0x1f, 0x7b, 0x7e, 0x37, 0xfe, 0xe0, 0xc3, 0x66, 0x7c, 0x21, 0x32, 0x7a, 0xab, + 0xb3, 0xa6, 0x52, 0x9e, 0x5b, 0x36, 0x7f, 0x21, 0x5d, 0xe1, 0x4b, 0x76, 0xc6, 0x77, 0x8b, 0xea, + 0xa5, 0x29, 0x3c, 0x56, 0xd1, 0x84, 0xa5, 0xd4, 0x37, 0x91, 0xf4, 0x21, 0xa7, 0xbe, 0x94, 0xbc, + 0x4d, 0xcd, 0x03, 0x80, 0xa4, 0x7b, 0x64, 0xb3, 0xfa, 0x4d, 0xa3, 0xe4, 0xca, 0xcc, 0xb9, 0x58, + 0xd1, 0x63, 0x73, 0x24, 0xd5, 0x86, 0xbe, 0x55, 0xd5, 0x7b, 0x33, 0xdb, 0xda, 0x94, 0xb2, 0xe7, + 0xb0, 0x36, 0xd1, 0xdd, 0xb1, 0x6b, 0xd3, 0x6b, 0x32, 0x0d, 0x6b, 0x75, 0xf3, 0x6c, 0x42, 0xac, + 0xf7, 0xa7, 0xa9, 0x5e, 0xd9, 0x74, 0x8d, 0xef, 0xd6, 0x6c, 0x9f, 0x45, 0x48, 0xdb, 0x5c, 0x3f, + 0x82, 0x72, 0x4b, 0x86, 0xc2, 0x19, 0x78, 0x7e, 0xcf, 0x44, 0xcc, 0x5d, 0x28, 0xea, 0x0f, 0x43, + 0xdf, 0xf6, 0x86, 0x77, 0xac, 0xfa, 0xcf, 0x61, 0xc1, 0x84, 0xf0, 0xf1, 0xcc, 0x9c, 0x6e, 0xbf, + 0x2d, 0xd3, 0x69, 0xfd, 0xef, 0xbf, 0x95, 0x63, 0x8c, 0x6f, 0x54, 0xbe, 0x7a, 0xbd, 0x61, 0x7d, + 0xfd, 0x7a, 0xc3, 0xfa, 0xf7, 0xeb, 0x0d, 0xeb, 0xf7, 0x6f, 0x36, 0xe6, 0xbe, 0x7e, 0xb3, 0x31, + 0xf7, 0xcf, 0x37, 0x1b, 0x73, 0x9d, 0x22, 0x7d, 0xe1, 0xfd, 0xf8, 0xbf, 0x01, 0x00, 0x00, 0xff, + 0xff, 0x17, 0x51, 0x57, 0x33, 0x62, 0x16, 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. @@ -3994,6 +4040,24 @@ func (m *PushResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if len(m.ErrorsByTrace) > 0 { + dAtA7 := make([]byte, len(m.ErrorsByTrace)*10) + var j6 int + for _, num := range m.ErrorsByTrace { + for num >= 1<<7 { + dAtA7[j6] = uint8(uint64(num)&0x7f | 0x80) + num >>= 7 + j6++ + } + dAtA7[j6] = uint8(num) + j6++ + } + i -= j6 + copy(dAtA[i:], dAtA7[:j6]) + i = encodeVarintTempo(dAtA, i, uint64(j6)) + i-- + dAtA[i] = 0xa + } return len(dAtA) - i, nil } @@ -5091,6 +5155,13 @@ func (m *PushResponse) Size() (n int) { } var l int _ = l + if len(m.ErrorsByTrace) > 0 { + l = 0 + for _, e := range m.ErrorsByTrace { + l += sovTempo(uint64(e)) + } + n += 1 + sovTempo(uint64(l)) + l + } return n } @@ -8273,6 +8344,75 @@ func (m *PushResponse) Unmarshal(dAtA []byte) error { return fmt.Errorf("proto: PushResponse: illegal tag %d (wire type %d)", fieldNum, wire) } switch fieldNum { + case 1: + if wireType == 0 { + var v PushErrorReason + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTempo + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= PushErrorReason(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.ErrorsByTrace = append(m.ErrorsByTrace, v) + } else if wireType == 2 { + var packedLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTempo + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + packedLen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if packedLen < 0 { + return ErrInvalidLengthTempo + } + postIndex := iNdEx + packedLen + if postIndex < 0 { + return ErrInvalidLengthTempo + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + var elementCount int + if elementCount != 0 && len(m.ErrorsByTrace) == 0 { + m.ErrorsByTrace = make([]PushErrorReason, 0, elementCount) + } + for iNdEx < postIndex { + var v PushErrorReason + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTempo + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= PushErrorReason(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.ErrorsByTrace = append(m.ErrorsByTrace, v) + } + } else { + return fmt.Errorf("proto: wrong wireType = %d for field ErrorsByTrace", wireType) + } default: iNdEx = preIndex skippy, err := skipTempo(dAtA[iNdEx:]) diff --git a/pkg/tempopb/tempo.proto b/pkg/tempopb/tempo.proto index ebe7dad66c0..c20e6695d7b 100644 --- a/pkg/tempopb/tempo.proto +++ b/pkg/tempopb/tempo.proto @@ -176,6 +176,14 @@ message Trace { // Write message PushResponse { + repeated PushErrorReason errorsByTrace = 1; +} + +enum PushErrorReason { + NO_ERROR = 0; + MAX_LIVE_TRACES = 1; + TRACE_TOO_LARGE = 2; + UNKNOWN_ERROR = 3; } // PushBytesRequest pushes slices of traces, ids and searchdata. Traces are encoded using the diff --git a/pkg/util/test/req.go b/pkg/util/test/req.go index 24b3f0253b5..acbcf64d4e9 100644 --- a/pkg/util/test/req.go +++ b/pkg/util/test/req.go @@ -228,6 +228,22 @@ func AddDedicatedAttributes(trace *tempopb.Trace) *tempopb.Trace { return trace } +func MakeReqWithMultipleTraceWithSpanCount(spanCounts []int, traceIDs [][]byte) *tempopb.Trace { + if len(spanCounts) != len(traceIDs) { + panic("spanCounts and traceIDs lengths do not match") + } + trace := &tempopb.Trace{ + Batches: make([]*v1_trace.ResourceSpans, 0), + } + + for index, traceID := range traceIDs { + traceID = ValidTraceID(traceID) + trace.Batches = append(trace.Batches, MakeBatch(spanCounts[index], traceID)) + } + + return trace +} + // MakeDedicatedColumns creates a dedicated column assignment that matches the attributes // generated by AddDedicatedAttributes. func MakeDedicatedColumns() backend.DedicatedColumns {