Skip to content

Commit

Permalink
Handle Partial Success in ConsumeTraces (#2571)
Browse files Browse the repository at this point in the history
* testing

* undo logs

* add target_info_excluded_dimensions to userconfig

* allow multi error for pushbytes

* i love rebasing so much

* no longer returning error instance.pushbytes

* lint

* test stable

* squash

* remove error in return

* lint

* fix tests

* comments

* rebase

* fix

* test log commit

* refactor

* no list

* lint

* changed proto

* handle old proto

* rebase

* using two lists instead of nested list

* clean up tests

* make ingester more efficent

* lint

* lint

* lint

* remove pkg logger

* moar lint

* refactor response handling code

* refactor

* refactored instance

* add unknown error as reason

* lint

* more lint
  • Loading branch information
ie-pham authored Jan 4, 2024
1 parent 387479e commit 306fdd7
Show file tree
Hide file tree
Showing 14 changed files with 1,090 additions and 260 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
* [FEATURE] Introduce list_blocks_concurrency on GCS and S3 backends to control backend load and performance. [#2652](https://github.com/grafana/tempo/pull/2652) (@zalegrala)
* [FEATURE] Add per-tenant compaction window [#3129](https://github.com/grafana/tempo/pull/3129) (@zalegrala)
* [ENHANCEMENT] Make the trace ID label name configurable for remote written exemplars [#3074](https://github.com/grafana/tempo/pull/3074)
* [CHANGE] **Breaking Change** Fix issue where tempo drops the entire batch if one trace triggers an error [#2571](https://github.com/grafana/tempo/pull/2571) (@ie-pham)
Distributor now returns 200 for any batch containing only trace_too_large and max_live_traces errors
The number of discarded spans are still reflected in the tempo_discarded_spans_total metrics
* [ENHANCEMENT] Update poller to make use of previous results and reduce backend load. [#2652](https://github.com/grafana/tempo/pull/2652) (@zalegrala)
* [ENHANCEMENT] Improve TraceQL regex performance in certain queries. [#3139](https://github.com/grafana/tempo/pull/3139) (@joe-elliott)
* [ENHANCEMENT] Improve TraceQL performance in complex queries. [#3113](https://github.com/grafana/tempo/pull/3113) (@joe-elliott)
Expand Down Expand Up @@ -153,6 +156,7 @@ defaults:

## v2.2.0 / 2023-07-31


* [CHANGE] Make vParquet2 the default block format [#2526](https://github.com/grafana/tempo/pull/2526) (@stoewer)
* [CHANGE] Disable tempo-query by default in Jsonnet libs. [#2462](https://github.com/grafana/tempo/pull/2462) (@electron0zero)
* [CHANGE] Integrate `gofumpt` into CI for formatting requirements [2584](https://github.com/grafana/tempo/pull/2584) (@zalegrala)
Expand Down
35 changes: 35 additions & 0 deletions integration/e2e/config-limits-partial-success.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
target: all

server:
http_listen_port: 3200

distributor:
receivers:
otlp:
protocols:
grpc:

overrides:
max_bytes_per_trace: 600
max_traces_per_user: 6
ingestion_rate_limit_bytes: 6000
ingestion_burst_size_bytes: 6000

ingester:
lifecycler:
address: 127.0.0.1
ring:
kvstore:
store: inmemory
replication_factor: 1
final_sleep: 0s
trace_idle_period: 3600s

storage:
trace:
backend: local
local:
path: /var/tempo
pool:
max_workers: 10
queue_depth: 100
90 changes: 86 additions & 4 deletions integration/e2e/limits_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@ package e2e

import (
"context"
crand "crypto/rand"
"encoding/binary"
"testing"
"time"

"github.com/grafana/dskit/user"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/genproto/googleapis/rpc/errdetails"
"google.golang.org/grpc/status"
Expand All @@ -15,11 +18,16 @@ import (
util "github.com/grafana/tempo/integration"
"github.com/grafana/tempo/pkg/httpclient"
tempoUtil "github.com/grafana/tempo/pkg/util"
"github.com/grafana/tempo/pkg/util/test"

"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/pdata/ptrace"
)

const (
configLimits = "config-limits.yaml"
configLimitsQuery = "config-limits-query.yaml"
configLimits = "config-limits.yaml"
configLimitsQuery = "config-limits-query.yaml"
configLimitsPartialError = "config-limits-partial-success.yaml"
)

func TestLimits(t *testing.T) {
Expand All @@ -38,14 +46,14 @@ func TestLimits(t *testing.T) {

// should fail b/c the trace is too large. each batch should be ~70 bytes
batch := makeThriftBatchWithSpanCount(2)
require.Error(t, c.EmitBatch(context.Background(), batch), "max trace size")
require.NoError(t, c.EmitBatch(context.Background(), batch), "max trace size")

// push a trace
require.NoError(t, c.EmitBatch(context.Background(), makeThriftBatchWithSpanCount(1)))

// should fail b/c this will be too many traces
batch = makeThriftBatch()
require.Error(t, c.EmitBatch(context.Background(), batch), "too many traces")
require.NoError(t, c.EmitBatch(context.Background(), batch), "too many traces")

// should fail b/c due to ingestion rate limit
batch = makeThriftBatchWithSpanCount(10)
Expand Down Expand Up @@ -133,3 +141,77 @@ func TestQueryLimits(t *testing.T) {
require.ErrorContains(t, err, "trace exceeds max size")
require.ErrorContains(t, err, "failed with response: 400") // confirm querier returns 400
}

func TestLimitsPartialSuccess(t *testing.T) {
s, err := e2e.NewScenario("tempo_e2e")
require.NoError(t, err)
defer s.Close()
require.NoError(t, util.CopyFileToSharedDir(s, configLimitsPartialError, "config.yaml"))
tempo := util.NewTempoAllInOne()
require.NoError(t, s.StartAndWaitReady(tempo))

// otel grpc exporter
exporter, err := util.NewOtelGRPCExporter(tempo.Endpoint(4317))
require.NoError(t, err)

err = exporter.Start(context.Background(), componenttest.NewNopHost())
require.NoError(t, err)

// make request
traceIDs := make([][]byte, 6)
for index := range traceIDs {
traceID := make([]byte, 16)
_, err = crand.Read(traceID)
require.NoError(t, err)
traceIDs[index] = traceID
}

// 3 traces with trace_too_large and 3 with no error
spanCountsByTrace := []int{1, 4, 1, 5, 6, 1}
req := test.MakeReqWithMultipleTraceWithSpanCount(spanCountsByTrace, traceIDs)

b, err := req.Marshal()
require.NoError(t, err)

// unmarshal into otlp proto
traces, err := (&ptrace.ProtoUnmarshaler{}).UnmarshalTraces(b)
require.NoError(t, err)
require.NotNil(t, traces)

ctx := user.InjectOrgID(context.Background(), tempoUtil.FakeTenantID)
ctx, err = user.InjectIntoGRPCRequest(ctx)
require.NoError(t, err)

// send traces to tempo
// partial success = no error
err = exporter.ConsumeTraces(ctx, traces)
require.NoError(t, err)

// shutdown to ensure traces are flushed
require.NoError(t, exporter.Shutdown(context.Background()))

// query for the one trace that didn't trigger an error
client := httpclient.New("http://"+tempo.Endpoint(3200), tempoUtil.FakeTenantID)
for i, count := range spanCountsByTrace {
if count == 1 {
result, err := client.QueryTrace(tempoUtil.TraceIDToHexString(traceIDs[i]))
require.NoError(t, err)
assert.Equal(t, 1, len(result.Batches))
}
}

// test metrics
// 3 traces with trace_too_large each with 4+5+6 spans
err = tempo.WaitSumMetricsWithOptions(e2e.Equals(15),
[]string{"tempo_discarded_spans_total"},
e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "reason", "trace_too_large")),
)
require.NoError(t, err)

// this metric should never exist
err = tempo.WaitSumMetricsWithOptions(e2e.Equals(0),
[]string{"tempo_discarded_spans_total"},
e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "reason", "unknown_error")),
)
require.NoError(t, err)
}
32 changes: 32 additions & 0 deletions integration/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@ import (
"github.com/grafana/e2e"
jaeger_grpc "github.com/jaegertracing/jaeger/cmd/agent/app/reporter/grpc"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/otlpexporter"
mnoop "go.opentelemetry.io/otel/metric/noop"
tnoop "go.opentelemetry.io/otel/trace/noop"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
Expand Down Expand Up @@ -267,6 +274,31 @@ func TempoBackoff() backoff.Config {
}
}

func NewOtelGRPCExporter(endpoint string) (exporter.Traces, error) {
factory := otlpexporter.NewFactory()
exporterCfg := factory.CreateDefaultConfig()
otlpCfg := exporterCfg.(*otlpexporter.Config)
otlpCfg.GRPCClientSettings = configgrpc.GRPCClientSettings{
Endpoint: endpoint,
TLSSetting: configtls.TLSClientSetting{
Insecure: true,
},
}
logger, _ := zap.NewDevelopment()
return factory.CreateTracesExporter(
context.Background(),
exporter.CreateSettings{
TelemetrySettings: component.TelemetrySettings{
Logger: logger,
TracerProvider: tnoop.NewTracerProvider(),
MeterProvider: mnoop.NewMeterProvider(),
},
BuildInfo: component.NewDefaultBuildInfo(),
},
otlpCfg,
)
}

func NewJaegerGRPCClient(endpoint string) (*jaeger_grpc.Reporter, error) {
// new jaeger grpc exporter
conn, err := grpc.Dial(endpoint, grpc.WithTransportCredentials(insecure.NewCredentials()))
Expand Down
Loading

0 comments on commit 306fdd7

Please sign in to comment.