diff --git a/CHANGELOG.md b/CHANGELOG.md
index 80e3cbee22b..5a8fdb38e24 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,6 +1,15 @@
## main / unreleased
* [BUGFIX] Fix some instances where spanmetrics histograms could be inconsistent [#3412](https://github.com/grafana/tempo/pull/3412) (@mdisibio)
+* [ENHANCEMENT] Add string interning to TraceQL queries [#3411](https://github.com/grafana/tempo/pull/3411) (@mapno)
+* [ENHANCEMENT] Add new (unsafe) query hints for metrics queries [#3396](https://github.com/grafana/tempo/pull/3396) (@mdisibio)
+* [BUGFIX] Fix metrics query results when filtering and rating on the same attribute [#3428](https://github.com/grafana/tempo/issues/3428) (@mdisibio)
+* [BUGFIX] Fix metrics query results when series contain empty strings or nil values [#3429](https://github.com/grafana/tempo/issues/3429) (@mdisibio)
+* [BUGFIX] Fix metrics query duration check, add per-tenant override for max metrics query duration [#3479](https://github.com/grafana/tempo/issues/3479) (@mdisibio)
+* [BUGFIX] Return unfiltered results when a bad TraceQL query is provided in autocomplete. [#3426](https://github.com/grafana/tempo/pull/3426) (@mapno)
+* [BUGFIX] Correctly handle 429s in GRPC search streaming. [#3469](https://github.com/grafana/tempo/pull/3469) (@joe-ellitot)
+* [BUGFIX] Correctly cancel GRPC and HTTP contexts in the frontend to prevent having to rely on http write timeout. [#3443](https://github.com/grafana/tempo/pull/3443) (@joe-elliott)
+* [BUGFIX] Fix compaction/retention in AWS S3 and GCS when a prefix is configured. [#3465](https://github.com/grafana/tempo/issues/3465) (@bpfoster)
## v2.4.0-rc.0
diff --git a/integration/e2e/config-all-in-one-azurite.yaml b/integration/e2e/config-all-in-one-azurite.yaml
index 0eadfcd8866..3def0ff6f1b 100644
--- a/integration/e2e/config-all-in-one-azurite.yaml
+++ b/integration/e2e/config-all-in-one-azurite.yaml
@@ -37,6 +37,7 @@ storage:
endpoint_suffix: tempo_e2e-azurite:10000
storage_account_name: "devstoreaccount1"
storage_account_key: "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=="
+ prefix: {{ .Prefix }}
pool:
max_workers: 10
queue_depth: 100
diff --git a/integration/e2e/config-all-in-one-gcs.yaml b/integration/e2e/config-all-in-one-gcs.yaml
index 06970879de3..efffae085c9 100644
--- a/integration/e2e/config-all-in-one-gcs.yaml
+++ b/integration/e2e/config-all-in-one-gcs.yaml
@@ -36,6 +36,7 @@ storage:
bucket_name: tempo
endpoint: https://tempo_e2e-gcs:4443/storage/v1/
insecure: true
+ prefix: {{ .Prefix }}
pool:
max_workers: 10
queue_depth: 1000
diff --git a/integration/e2e/config-all-in-one-s3.yaml b/integration/e2e/config-all-in-one-s3.yaml
index ab54c060987..a429b183628 100644
--- a/integration/e2e/config-all-in-one-s3.yaml
+++ b/integration/e2e/config-all-in-one-s3.yaml
@@ -38,6 +38,7 @@ storage:
access_key: Cheescake # TODO: use cortex_e2e.MinioAccessKey
secret_key: supersecret # TODO: use cortex_e2e.MinioSecretKey
insecure: true
+ prefix: {{ .Prefix }}
pool:
max_workers: 10
queue_depth: 100
diff --git a/integration/e2e/e2e_test.go b/integration/e2e/e2e_test.go
index 5fe1bff8349..81ecb2832f9 100644
--- a/integration/e2e/e2e_test.go
+++ b/integration/e2e/e2e_test.go
@@ -58,83 +58,103 @@ func TestAllInOne(t *testing.T) {
},
}
+ storageBackendTestPermutations := []struct {
+ name string
+ prefix string
+ }{
+ {
+ name: "no-prefix",
+ },
+ {
+ name: "prefix",
+ prefix: "a/b/c/",
+ },
+ }
+
for _, tc := range testBackends {
- t.Run(tc.name, func(t *testing.T) {
- s, err := e2e.NewScenario("tempo_e2e")
- require.NoError(t, err)
- defer s.Close()
+ for _, pc := range storageBackendTestPermutations {
+ t.Run(tc.name+"-"+pc.name, func(t *testing.T) {
+ s, err := e2e.NewScenario("tempo_e2e")
+ require.NoError(t, err)
+ defer s.Close()
- // set up the backend
- cfg := app.Config{}
- buff, err := os.ReadFile(tc.configFile)
- require.NoError(t, err)
- err = yaml.UnmarshalStrict(buff, &cfg)
- require.NoError(t, err)
- _, err = backend.New(s, cfg)
- require.NoError(t, err)
+ // copy config template to shared directory and expand template variables
+ tmplConfig := map[string]any{"Prefix": pc.prefix}
+ configFile, err := util.CopyTemplateToSharedDir(s, tc.configFile, "config.yaml", tmplConfig)
+ require.NoError(t, err)
- require.NoError(t, util.CopyFileToSharedDir(s, tc.configFile, "config.yaml"))
- tempo := util.NewTempoAllInOne()
- require.NoError(t, s.StartAndWaitReady(tempo))
+ // set up the backend
+ cfg := app.Config{}
+ buff, err := os.ReadFile(configFile)
+ require.NoError(t, err)
+ err = yaml.UnmarshalStrict(buff, &cfg)
+ require.NoError(t, err)
+ _, err = backend.New(s, cfg)
+ require.NoError(t, err)
- // Get port for the Jaeger gRPC receiver endpoint
- c, err := util.NewJaegerGRPCClient(tempo.Endpoint(14250))
- require.NoError(t, err)
- require.NotNil(t, c)
+ tempo := util.NewTempoAllInOne()
+ require.NoError(t, s.StartAndWaitReady(tempo))
- info := tempoUtil.NewTraceInfo(time.Now(), "")
- require.NoError(t, info.EmitAllBatches(c))
+ // Get port for the Jaeger gRPC receiver endpoint
+ c, err := util.NewJaegerGRPCClient(tempo.Endpoint(14250))
+ require.NoError(t, err)
+ require.NotNil(t, c)
- expected, err := info.ConstructTraceFromEpoch()
- require.NoError(t, err)
+ info := tempoUtil.NewTraceInfo(time.Now(), "")
+ require.NoError(t, info.EmitAllBatches(c))
- // test metrics
- require.NoError(t, tempo.WaitSumMetrics(e2e.Equals(spanCount(expected)), "tempo_distributor_spans_received_total"))
+ expected, err := info.ConstructTraceFromEpoch()
+ require.NoError(t, err)
- // test echo
- assertEcho(t, "http://"+tempo.Endpoint(3200)+"/api/echo")
+ // test metrics
+ require.NoError(t, tempo.WaitSumMetrics(e2e.Equals(spanCount(expected)), "tempo_distributor_spans_received_total"))
- apiClient := httpclient.New("http://"+tempo.Endpoint(3200), "")
+ // test echo
+ // nolint:goconst
+ assertEcho(t, "http://"+tempo.Endpoint(3200)+"/api/echo")
- // query an in-memory trace
- queryAndAssertTrace(t, apiClient, info)
+ apiClient := httpclient.New("http://"+tempo.Endpoint(3200), "")
- // wait trace_idle_time and ensure trace is created in ingester
- require.NoError(t, tempo.WaitSumMetricsWithOptions(e2e.Less(3), []string{"tempo_ingester_traces_created_total"}, e2e.WaitMissingMetrics))
+ // query an in-memory trace
+ queryAndAssertTrace(t, apiClient, info)
- // flush trace to backend
- callFlush(t, tempo)
+ // wait trace_idle_time and ensure trace is created in ingester
+ require.NoError(t, tempo.WaitSumMetricsWithOptions(e2e.Less(3), []string{"tempo_ingester_traces_created_total"}, e2e.WaitMissingMetrics))
- // search for trace in backend
- util.SearchAndAssertTrace(t, apiClient, info)
- util.SearchTraceQLAndAssertTrace(t, apiClient, info)
+ // flush trace to backend
+ callFlush(t, tempo)
- // sleep
- time.Sleep(10 * time.Second)
+ // search for trace in backend
+ util.SearchAndAssertTrace(t, apiClient, info)
+ util.SearchTraceQLAndAssertTrace(t, apiClient, info)
- // force clear completed block
- callFlush(t, tempo)
+ // sleep
+ time.Sleep(10 * time.Second)
- // test metrics
- require.NoError(t, tempo.WaitSumMetrics(e2e.Equals(1), "tempo_ingester_blocks_flushed_total"))
- require.NoError(t, tempo.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"tempodb_blocklist_length"}, e2e.WaitMissingMetrics))
- require.NoError(t, tempo.WaitSumMetrics(e2e.Equals(3), "tempo_query_frontend_queries_total"))
+ // force clear completed block
+ callFlush(t, tempo)
- // query trace - should fetch from backend
- queryAndAssertTrace(t, apiClient, info)
+ // test metrics
+ require.NoError(t, tempo.WaitSumMetrics(e2e.Equals(1), "tempo_ingester_blocks_flushed_total"))
+ require.NoError(t, tempo.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"tempodb_blocklist_length"}, e2e.WaitMissingMetrics))
+ require.NoError(t, tempo.WaitSumMetrics(e2e.Equals(3), "tempo_query_frontend_queries_total"))
- // search the backend. this works b/c we're passing a start/end AND setting query ingesters within min/max to 0
- now := time.Now()
- util.SearchAndAssertTraceBackend(t, apiClient, info, now.Add(-20*time.Minute).Unix(), now.Unix())
+ // query trace - should fetch from backend
+ queryAndAssertTrace(t, apiClient, info)
- util.SearchAndAsserTagsBackend(t, apiClient, now.Add(-20*time.Minute).Unix(), now.Unix())
+ // search the backend. this works b/c we're passing a start/end AND setting query ingesters within min/max to 0
+ now := time.Now()
+ util.SearchAndAssertTraceBackend(t, apiClient, info, now.Add(-20*time.Minute).Unix(), now.Unix())
- // find the trace with streaming. using the http server b/c that's what Grafana will do
- grpcClient, err := util.NewSearchGRPCClient(context.Background(), tempo.Endpoint(3200))
- require.NoError(t, err)
+ util.SearchAndAsserTagsBackend(t, apiClient, now.Add(-20*time.Minute).Unix(), now.Unix())
- util.SearchStreamAndAssertTrace(t, context.Background(), grpcClient, info, now.Add(-20*time.Minute).Unix(), now.Unix())
- })
+ // find the trace with streaming. using the http server b/c that's what Grafana will do
+ grpcClient, err := util.NewSearchGRPCClient(context.Background(), tempo.Endpoint(3200))
+ require.NoError(t, err)
+
+ util.SearchStreamAndAssertTrace(t, context.Background(), grpcClient, info, now.Add(-20*time.Minute).Unix(), now.Unix())
+ })
+ }
}
}
@@ -317,16 +337,20 @@ func TestShutdownDelay(t *testing.T) {
require.NoError(t, err)
defer s.Close()
+ // copy config template to shared directory and expand template variables
+ tmplConfig := map[string]any{"Prefix": ""}
+ configFile, err := util.CopyTemplateToSharedDir(s, configAllInOneS3, "config.yaml", tmplConfig)
+ require.NoError(t, err)
+
// set up the backend
cfg := app.Config{}
- buff, err := os.ReadFile(configAllInOneS3)
+ buff, err := os.ReadFile(configFile)
require.NoError(t, err)
err = yaml.UnmarshalStrict(buff, &cfg)
require.NoError(t, err)
_, err = backend.New(s, cfg)
require.NoError(t, err)
- require.NoError(t, util.CopyFileToSharedDir(s, configAllInOneS3, "config.yaml"))
tempo := util.NewTempoAllInOne("-shutdown-delay=5s")
// this line tests confirms that the readiness flag is up
diff --git a/integration/e2e/overrides_test.go b/integration/e2e/overrides_test.go
index 98cf0b7fbfa..2e49dbbf476 100644
--- a/integration/e2e/overrides_test.go
+++ b/integration/e2e/overrides_test.go
@@ -49,16 +49,20 @@ func TestOverrides(t *testing.T) {
require.NoError(t, err)
defer s.Close()
+ // copy config template to shared directory and expand template variables
+ tmplConfig := map[string]any{"Prefix": ""}
+ configFile, err := util.CopyTemplateToSharedDir(s, tc.configFile, "config.yaml", tmplConfig)
+ require.NoError(t, err)
+
// set up the backend
cfg := app.Config{}
- buff, err := os.ReadFile(tc.configFile)
+ buff, err := os.ReadFile(configFile)
require.NoError(t, err)
err = yaml.UnmarshalStrict(buff, &cfg)
require.NoError(t, err)
_, err = backend.New(s, cfg)
require.NoError(t, err)
- require.NoError(t, util.CopyFileToSharedDir(s, tc.configFile, "config.yaml"))
tempo := util.NewTempoAllInOne()
require.NoError(t, s.StartAndWaitReady(tempo))
diff --git a/integration/poller/poller_test.go b/integration/poller/poller_test.go
index 65af8b2541e..5d013455dae 100644
--- a/integration/poller/poller_test.go
+++ b/integration/poller/poller_test.go
@@ -61,132 +61,158 @@ func TestPollerOwnership(t *testing.T) {
},
}
+ storageBackendTestPermutations := []struct {
+ name string
+ prefix string
+ }{
+ {
+ name: "empty-string-prefix",
+ prefix: "",
+ },
+ {
+ name: "no-prefix",
+ },
+ {
+ name: "prefix",
+ prefix: "a/b/c/",
+ },
+ {
+ name: "prefix-no-trailing-slash",
+ prefix: "a/b/c",
+ },
+ }
+
logger := log.NewLogfmtLogger(os.Stdout)
var hhh *e2e.HTTPService
t.Parallel()
for _, tc := range testCompactorOwnershipBackends {
- t.Run(tc.name, func(t *testing.T) {
- s, err := e2e.NewScenario("tempo-integration")
- require.NoError(t, err)
- defer s.Close()
-
- // set up the backend
- cfg := app.Config{}
- buff, err := os.ReadFile(tc.configFile)
- require.NoError(t, err)
- err = yaml.UnmarshalStrict(buff, &cfg)
- require.NoError(t, err)
- hhh, err = e2eBackend.New(s, cfg)
- require.NoError(t, err)
-
- err = hhh.WaitReady()
- require.NoError(t, err)
-
- err = hhh.Ready()
- require.NoError(t, err)
-
- // Give some time for startup
- time.Sleep(1 * time.Second)
-
- t.Logf("backend: %s", hhh.Endpoint(hhh.HTTPPort()))
-
- require.NoError(t, util.CopyFileToSharedDir(s, tc.configFile, "config.yaml"))
-
- var rr backend.RawReader
- var ww backend.RawWriter
- var cc backend.Compactor
-
- concurrency := 3
-
- e := hhh.Endpoint(hhh.HTTPPort())
- switch tc.name {
- case "s3":
- cfg.StorageConfig.Trace.S3.ListBlocksConcurrency = concurrency
- cfg.StorageConfig.Trace.S3.Endpoint = e
- cfg.Overrides.UserConfigurableOverridesConfig.Client.S3.Endpoint = e
- rr, ww, cc, err = s3.New(cfg.StorageConfig.Trace.S3)
- case "gcs":
- cfg.StorageConfig.Trace.GCS.ListBlocksConcurrency = concurrency
- cfg.StorageConfig.Trace.GCS.Endpoint = e
- cfg.Overrides.UserConfigurableOverridesConfig.Client.GCS.Endpoint = e
- rr, ww, cc, err = gcs.New(cfg.StorageConfig.Trace.GCS)
- case "azure":
- cfg.StorageConfig.Trace.Azure.Endpoint = e
- cfg.Overrides.UserConfigurableOverridesConfig.Client.Azure.Endpoint = e
- rr, ww, cc, err = azure.New(cfg.StorageConfig.Trace.Azure)
- }
- require.NoError(t, err)
-
- r := backend.NewReader(rr)
- w := backend.NewWriter(ww)
-
- blocklistPoller := blocklist.NewPoller(&blocklist.PollerConfig{
- PollConcurrency: 3,
- TenantIndexBuilders: 1,
- }, OwnsEverythingSharder, r, cc, w, logger)
-
- // Use the block boundaries in the GCS and S3 implementation
- bb := blockboundary.CreateBlockBoundaries(concurrency)
- // Pick a boundary to use for this test
- base := bb[1]
- expected := []uuid.UUID{}
-
- expected = append(expected, uuid.MustParse("00000000-0000-0000-0000-000000000000"))
- expected = append(expected, uuid.MustParse("ffffffff-ffff-ffff-ffff-ffffffffffff"))
-
- // Grab the one before the boundary
- decrementUUIDBytes(base)
- expected = append(expected, uuid.UUID(base))
-
- incrementUUIDBytes(base)
- expected = append(expected, uuid.UUID(base))
-
- incrementUUIDBytes(base)
- expected = append(expected, uuid.UUID(base))
-
- incrementUUIDBytes(base)
- expected = append(expected, uuid.UUID(base))
-
- writeTenantBlocks(t, w, tenant, expected)
-
- sort.Slice(expected, func(i, j int) bool { return expected[i].String() < expected[j].String() })
- t.Logf("expected: %v", expected)
-
- mmResults, cmResults, err := rr.ListBlocks(context.Background(), tenant)
- require.NoError(t, err)
- sort.Slice(mmResults, func(i, j int) bool { return mmResults[i].String() < mmResults[j].String() })
- t.Logf("mmResults: %s", mmResults)
- t.Logf("cmResults: %s", cmResults)
-
- assert.Equal(t, expected, mmResults)
- assert.Equal(t, len(expected), len(mmResults))
- assert.Equal(t, 0, len(cmResults))
-
- l := blocklist.New()
- mm, cm, err := blocklistPoller.Do(l)
- require.NoError(t, err)
- t.Logf("mm: %v", mm)
- t.Logf("cm: %v", cm)
-
- l.ApplyPollResults(mm, cm)
-
- metas := l.Metas(tenant)
-
- actual := []uuid.UUID{}
- for _, m := range metas {
- actual = append(actual, m.BlockID)
- }
-
- sort.Slice(actual, func(i, j int) bool { return actual[i].String() < actual[j].String() })
-
- assert.Equal(t, expected, actual)
- assert.Equal(t, len(expected), len(metas))
- t.Logf("actual: %v", actual)
-
- for _, e := range expected {
- assert.True(t, found(e, metas))
- }
- })
+ for _, pc := range storageBackendTestPermutations {
+ t.Run(tc.name+"-"+pc.name, func(t *testing.T) {
+ s, err := e2e.NewScenario("tempo-integration")
+ require.NoError(t, err)
+ defer s.Close()
+
+ // set up the backend
+ cfg := app.Config{}
+ buff, err := os.ReadFile(tc.configFile)
+ require.NoError(t, err)
+ err = yaml.UnmarshalStrict(buff, &cfg)
+ require.NoError(t, err)
+ hhh, err = e2eBackend.New(s, cfg)
+ require.NoError(t, err)
+
+ err = hhh.WaitReady()
+ require.NoError(t, err)
+
+ err = hhh.Ready()
+ require.NoError(t, err)
+
+ // Give some time for startup
+ time.Sleep(1 * time.Second)
+
+ t.Logf("backend: %s", hhh.Endpoint(hhh.HTTPPort()))
+
+ require.NoError(t, util.CopyFileToSharedDir(s, tc.configFile, "config.yaml"))
+
+ var rr backend.RawReader
+ var ww backend.RawWriter
+ var cc backend.Compactor
+
+ concurrency := 3
+
+ e := hhh.Endpoint(hhh.HTTPPort())
+ switch tc.name {
+ case "s3":
+ cfg.StorageConfig.Trace.S3.ListBlocksConcurrency = concurrency
+ cfg.StorageConfig.Trace.S3.Endpoint = e
+ cfg.StorageConfig.Trace.S3.Prefix = pc.prefix
+ cfg.Overrides.UserConfigurableOverridesConfig.Client.S3.Endpoint = e
+ rr, ww, cc, err = s3.New(cfg.StorageConfig.Trace.S3)
+ case "gcs":
+ cfg.StorageConfig.Trace.GCS.ListBlocksConcurrency = concurrency
+ cfg.StorageConfig.Trace.GCS.Endpoint = e
+ cfg.StorageConfig.Trace.GCS.Prefix = pc.prefix
+ cfg.Overrides.UserConfigurableOverridesConfig.Client.GCS.Endpoint = e
+ rr, ww, cc, err = gcs.New(cfg.StorageConfig.Trace.GCS)
+ case "azure":
+ cfg.StorageConfig.Trace.Azure.Endpoint = e
+ cfg.StorageConfig.Trace.Azure.Prefix = pc.prefix
+ cfg.Overrides.UserConfigurableOverridesConfig.Client.Azure.Endpoint = e
+ rr, ww, cc, err = azure.New(cfg.StorageConfig.Trace.Azure)
+ }
+ require.NoError(t, err)
+
+ r := backend.NewReader(rr)
+ w := backend.NewWriter(ww)
+
+ blocklistPoller := blocklist.NewPoller(&blocklist.PollerConfig{
+ PollConcurrency: 3,
+ TenantIndexBuilders: 1,
+ }, OwnsEverythingSharder, r, cc, w, logger)
+
+ // Use the block boundaries in the GCS and S3 implementation
+ bb := blockboundary.CreateBlockBoundaries(concurrency)
+ // Pick a boundary to use for this test
+ base := bb[1]
+ expected := []uuid.UUID{}
+
+ expected = append(expected, uuid.MustParse("00000000-0000-0000-0000-000000000000"))
+ expected = append(expected, uuid.MustParse("ffffffff-ffff-ffff-ffff-ffffffffffff"))
+
+ // Grab the one before the boundary
+ decrementUUIDBytes(base)
+ expected = append(expected, uuid.UUID(base))
+
+ incrementUUIDBytes(base)
+ expected = append(expected, uuid.UUID(base))
+
+ incrementUUIDBytes(base)
+ expected = append(expected, uuid.UUID(base))
+
+ incrementUUIDBytes(base)
+ expected = append(expected, uuid.UUID(base))
+
+ writeTenantBlocks(t, w, tenant, expected)
+
+ sort.Slice(expected, func(i, j int) bool { return expected[i].String() < expected[j].String() })
+ t.Logf("expected: %v", expected)
+
+ mmResults, cmResults, err := rr.ListBlocks(context.Background(), tenant)
+ require.NoError(t, err)
+ sort.Slice(mmResults, func(i, j int) bool { return mmResults[i].String() < mmResults[j].String() })
+ t.Logf("mmResults: %s", mmResults)
+ t.Logf("cmResults: %s", cmResults)
+
+ assert.Equal(t, expected, mmResults)
+ assert.Equal(t, len(expected), len(mmResults))
+ assert.Equal(t, 0, len(cmResults))
+
+ l := blocklist.New()
+ mm, cm, err := blocklistPoller.Do(l)
+ require.NoError(t, err)
+ t.Logf("mm: %v", mm)
+ t.Logf("cm: %v", cm)
+
+ l.ApplyPollResults(mm, cm)
+
+ metas := l.Metas(tenant)
+
+ actual := []uuid.UUID{}
+ for _, m := range metas {
+ actual = append(actual, m.BlockID)
+ }
+
+ sort.Slice(actual, func(i, j int) bool { return actual[i].String() < actual[j].String() })
+
+ assert.Equal(t, expected, actual)
+ assert.Equal(t, len(expected), len(metas))
+ t.Logf("actual: %v", actual)
+
+ for _, e := range expected {
+ assert.True(t, found(e, metas))
+ }
+ })
+ }
}
}
diff --git a/tempodb/backend/azure/azure_test.go b/tempodb/backend/azure/azure_test.go
index 8e028de00af..c923259a03e 100644
--- a/tempodb/backend/azure/azure_test.go
+++ b/tempodb/backend/azure/azure_test.go
@@ -252,6 +252,159 @@ func TestObjectWithPrefix(t *testing.T) {
}
}
+func TestListBlocksWithPrefix(t *testing.T) {
+ tests := []struct {
+ name string
+ prefix string
+ liveBlockIDs []uuid.UUID
+ compactedBlockIDs []uuid.UUID
+ tenant string
+ httpHandler func(t *testing.T) http.HandlerFunc
+ }{
+ {
+ name: "with prefix",
+ prefix: "a/b/c/",
+ tenant: "single-tenant",
+ liveBlockIDs: []uuid.UUID{uuid.MustParse("00000000-0000-0000-0000-000000000000")},
+ compactedBlockIDs: []uuid.UUID{uuid.MustParse("00000000-0000-0000-0000-000000000001")},
+ httpHandler: func(t *testing.T) http.HandlerFunc {
+ return func(w http.ResponseWriter, r *http.Request) {
+ if r.Method == "GET" {
+ assert.Equal(t, "a/b/c/single-tenant/", r.URL.Query().Get("prefix"))
+
+ _, _ = w.Write([]byte(`
+
+
+ a/b/c/
+ 100
+
+
+ a/b/c/single-tenant/00000000-0000-0000-0000-000000000000/meta.json
+ https://myaccount.blob.core.windows.net/mycontainer/a/b/c/single-tenant/00000000-0000-0000-0000-000000000000/meta.json
+
+ Fri, 01 Mar 2024 00:00:00 GMT
+ 0x8CBFF45D8A29A19
+ 100
+ text/html
+
+ en-US
+
+ no-cache
+ BlockBlob
+ unlocked
+
+
+
+
+ a/b/c/single-tenant/00000000-0000-0000-0000-000000000001/meta.compacted.json
+ https://myaccount.blob.core.windows.net/mycontainer/a/b/c/single-tenant/00000000-0000-0000-0000-000000000001/meta.compacted.json
+
+ Fri, 01 Mar 2024 00:00:00 GMT
+ 0x8CBFF45D8A29A19
+ 100
+ text/html
+
+ en-US
+
+ no-cache
+ BlockBlob
+ unlocked
+
+
+
+
+
+ `))
+ return
+ }
+ }
+ },
+ },
+ {
+ name: "without prefix",
+ prefix: "",
+ tenant: "single-tenant",
+ liveBlockIDs: []uuid.UUID{uuid.MustParse("00000000-0000-0000-0000-000000000000")},
+ compactedBlockIDs: []uuid.UUID{uuid.MustParse("00000000-0000-0000-0000-000000000001")},
+ httpHandler: func(t *testing.T) http.HandlerFunc {
+ return func(w http.ResponseWriter, r *http.Request) {
+ if r.Method == "GET" {
+ assert.Equal(t, "single-tenant/", r.URL.Query().Get("prefix"))
+
+ _, _ = w.Write([]byte(`
+
+
+
+ 100
+
+
+ single-tenant/00000000-0000-0000-0000-000000000000/meta.json
+ https://myaccount.blob.core.windows.net/mycontainer/single-tenant/00000000-0000-0000-0000-000000000000/meta.json
+
+ Fri, 01 Mar 2024 00:00:00 GMT
+ 0x8CBFF45D8A29A19
+ 100
+ text/html
+
+ en-US
+
+ no-cache
+ BlockBlob
+ unlocked
+
+
+
+
+ single-tenant/00000000-0000-0000-0000-000000000001/meta.compacted.json
+ https://myaccount.blob.core.windows.net/mycontainer/single-tenant/00000000-0000-0000-0000-000000000001/meta.compacted.json
+
+ Fri, 01 Mar 2024 00:00:00 GMT
+ 0x8CBFF45D8A29A19
+ 100
+ text/html
+
+ en-US
+
+ no-cache
+ BlockBlob
+ unlocked
+
+
+
+
+
+ `))
+ return
+ }
+ }
+ },
+ },
+ }
+
+ for _, tc := range tests {
+ t.Run(tc.name, func(t *testing.T) {
+ server := testServer(t, tc.httpHandler(t))
+ r, _, _, err := NewNoConfirm(&config.Config{
+ StorageAccountName: "testing_account",
+ StorageAccountKey: flagext.SecretWithValue("YQo="),
+ MaxBuffers: 3,
+ BufferSize: 1000,
+ ContainerName: "blerg",
+ Prefix: tc.prefix,
+ Endpoint: server.URL[7:], // [7:] -> strip http://,
+ })
+ require.NoError(t, err)
+
+ ctx := context.Background()
+ blockIDs, compactedBlockIDs, err2 := r.ListBlocks(ctx, tc.tenant)
+ assert.NoError(t, err2)
+
+ assert.ElementsMatchf(t, tc.liveBlockIDs, blockIDs, "Block IDs did not match")
+ assert.ElementsMatchf(t, tc.compactedBlockIDs, compactedBlockIDs, "Compacted block IDs did not match")
+ })
+ }
+}
+
func testServer(t *testing.T, httpHandler http.HandlerFunc) *httptest.Server {
t.Helper()
assert.NotNil(t, httpHandler)
diff --git a/tempodb/backend/gcs/gcs.go b/tempodb/backend/gcs/gcs.go
index c864daae3be..1d4b346e68b 100644
--- a/tempodb/backend/gcs/gcs.go
+++ b/tempodb/backend/gcs/gcs.go
@@ -253,20 +253,20 @@ func (rw *readerWriter) ListBlocks(ctx context.Context, tenant string) ([]uuid.U
return
}
- parts = strings.Split(attrs.Name, "/")
- // ie: //meta.json
- if len(parts) != 3 {
+ parts = strings.Split(strings.TrimPrefix(attrs.Name, prefix), "/")
+ // ie: /meta.json
+ if len(parts) != 2 {
continue
}
- switch parts[2] {
+ switch parts[1] {
case backend.MetaName:
case backend.CompactedMetaName:
default:
continue
}
- id, err = uuid.Parse(parts[1])
+ id, err = uuid.Parse(parts[0])
if err != nil {
continue
}
@@ -283,7 +283,7 @@ func (rw *readerWriter) ListBlocks(ctx context.Context, tenant string) ([]uuid.U
}
mtx.Lock()
- switch parts[2] {
+ switch parts[1] {
case backend.MetaName:
blockIDs = append(blockIDs, id)
case backend.CompactedMetaName:
diff --git a/tempodb/backend/gcs/gcs_test.go b/tempodb/backend/gcs/gcs_test.go
index 96face1846a..e0153eced1e 100644
--- a/tempodb/backend/gcs/gcs_test.go
+++ b/tempodb/backend/gcs/gcs_test.go
@@ -299,6 +299,119 @@ func TestObjectWithPrefix(t *testing.T) {
}
}
+func TestListBlocksWithPrefix(t *testing.T) {
+ tests := []struct {
+ name string
+ prefix string
+ tenant string
+ liveBlockIDs []uuid.UUID
+ compactedBlockIDs []uuid.UUID
+ httpHandler func(t *testing.T) http.HandlerFunc
+ }{
+ {
+ name: "with prefix",
+ prefix: "a/b/c/",
+ tenant: "single-tenant",
+ liveBlockIDs: []uuid.UUID{uuid.MustParse("00000000-0000-0000-0000-000000000000")},
+ compactedBlockIDs: []uuid.UUID{uuid.MustParse("00000000-0000-0000-0000-000000000001")},
+ httpHandler: func(t *testing.T) http.HandlerFunc {
+ return func(w http.ResponseWriter, r *http.Request) {
+ if r.Method == "GET" {
+ assert.Equal(t, "a/b/c/single-tenant/", r.URL.Query().Get("prefix"))
+
+ _, _ = w.Write([]byte(`
+ {
+ "kind": "storage#objects",
+ "items": [{
+ "kind": "storage#object",
+ "id": "1",
+ "name": "a/b/c/single-tenant/00000000-0000-0000-0000-000000000000/meta.json",
+ "bucket": "blerg",
+ "storageClass": "STANDARD",
+ "size": "1024",
+ "timeCreated": "2024-03-01T00:00:00.000Z",
+ "updated": "2024-03-01T00:00:00.000Z"
+ }, {
+ "kind": "storage#object",
+ "id": "2",
+ "name": "a/b/c/single-tenant/00000000-0000-0000-0000-000000000001/meta.compacted.json",
+ "bucket": "blerg",
+ "storageClass": "STANDARD",
+ "size": "1024",
+ "timeCreated": "2024-03-01T00:00:00.000Z",
+ "updated": "2024-03-01T00:00:00.000Z"
+ }]
+ }
+ `))
+ return
+ }
+ }
+ },
+ },
+ {
+ name: "without prefix",
+ prefix: "",
+ tenant: "single-tenant",
+ liveBlockIDs: []uuid.UUID{uuid.MustParse("00000000-0000-0000-0000-000000000000")},
+ compactedBlockIDs: []uuid.UUID{uuid.MustParse("00000000-0000-0000-0000-000000000001")},
+ httpHandler: func(t *testing.T) http.HandlerFunc {
+ return func(w http.ResponseWriter, r *http.Request) {
+ if r.Method == "GET" {
+ assert.Equal(t, "single-tenant/", r.URL.Query().Get("prefix"))
+
+ _, _ = w.Write([]byte(`
+ {
+ "kind": "storage#objects",
+ "items": [{
+ "kind": "storage#object",
+ "id": "1",
+ "name": "single-tenant/00000000-0000-0000-0000-000000000000/meta.json",
+ "bucket": "blerg",
+ "storageClass": "STANDARD",
+ "size": "1024",
+ "timeCreated": "2024-03-01T00:00:00.000Z",
+ "updated": "2024-03-01T00:00:00.000Z"
+ }, {
+ "kind": "storage#object",
+ "id": "2",
+ "name": "single-tenant/00000000-0000-0000-0000-000000000001/meta.compacted.json",
+ "bucket": "blerg",
+ "storageClass": "STANDARD",
+ "size": "1024",
+ "timeCreated": "2024-03-01T00:00:00.000Z",
+ "updated": "2024-03-01T00:00:00.000Z"
+ }]
+ }
+ `))
+ return
+ }
+ }
+ },
+ },
+ }
+
+ for _, tc := range tests {
+ t.Run(tc.name, func(t *testing.T) {
+ server := testServer(t, tc.httpHandler(t))
+ r, _, _, err := NewNoConfirm(&Config{
+ BucketName: "blerg",
+ Endpoint: server.URL,
+ Insecure: true,
+ Prefix: tc.prefix,
+ ListBlocksConcurrency: 1,
+ })
+ require.NoError(t, err)
+
+ ctx := context.Background()
+ blockIDs, compactedBlockIDs, err := r.ListBlocks(ctx, tc.tenant)
+ assert.NoError(t, err)
+
+ assert.ElementsMatchf(t, tc.liveBlockIDs, blockIDs, "Block IDs did not match")
+ assert.ElementsMatchf(t, tc.compactedBlockIDs, compactedBlockIDs, "Compacted block IDs did not match")
+ })
+ }
+}
+
func testServer(t *testing.T, httpHandler http.HandlerFunc) *httptest.Server {
t.Helper()
assert.NotNil(t, httpHandler)
diff --git a/tempodb/backend/s3/s3.go b/tempodb/backend/s3/s3.go
index 709a844660c..dd037112e68 100644
--- a/tempodb/backend/s3/s3.go
+++ b/tempodb/backend/s3/s3.go
@@ -333,20 +333,20 @@ func (rw *readerWriter) ListBlocks(
}
for _, c := range res.Contents {
- // i.e: /meta
- parts := strings.Split(c.Key, "/")
- if len(parts) != 3 {
+ // i.e: /meta
+ parts := strings.Split(strings.TrimPrefix(c.Key, prefix), "/")
+ if len(parts) != 2 {
continue
}
- switch parts[2] {
+ switch parts[1] {
case backend.MetaName:
case backend.CompactedMetaName:
default:
continue
}
- id, err := uuid.Parse(parts[1])
+ id, err := uuid.Parse(parts[0])
if err != nil {
continue
}
@@ -363,7 +363,7 @@ func (rw *readerWriter) ListBlocks(
}
mtx.Lock()
- switch parts[2] {
+ switch parts[1] {
case backend.MetaName:
blockIDs = append(blockIDs, id)
case backend.CompactedMetaName:
diff --git a/tempodb/backend/s3/s3_test.go b/tempodb/backend/s3/s3_test.go
index f4f460972c0..bcccda6f39e 100644
--- a/tempodb/backend/s3/s3_test.go
+++ b/tempodb/backend/s3/s3_test.go
@@ -18,6 +18,8 @@ import (
"testing"
"time"
+ "github.com/google/uuid"
+
"github.com/aws/aws-sdk-go/service/s3"
"github.com/grafana/dskit/flagext"
"github.com/minio/minio-go/v7"
@@ -449,6 +451,124 @@ func TestObjectWithPrefix(t *testing.T) {
}
}
+func TestListBlocksWithPrefix(t *testing.T) {
+ tests := []struct {
+ name string
+ prefix string
+ tenant string
+ liveBlockIDs []uuid.UUID
+ compactedBlockIDs []uuid.UUID
+ httpHandler func(t *testing.T) http.HandlerFunc
+ }{
+ {
+ name: "with prefix",
+ prefix: "a/b/c/",
+ tenant: "single-tenant",
+ liveBlockIDs: []uuid.UUID{uuid.MustParse("00000000-0000-0000-0000-000000000000")},
+ compactedBlockIDs: []uuid.UUID{uuid.MustParse("00000000-0000-0000-0000-000000000001")},
+ httpHandler: func(t *testing.T) http.HandlerFunc {
+ return func(w http.ResponseWriter, r *http.Request) {
+ if r.Method == getMethod {
+ assert.Equal(t, "a/b/c/single-tenant/", r.URL.Query().Get("prefix"))
+
+ _, _ = w.Write([]byte(`
+
+ blerg
+ a/b/c
+
+ 2
+ 100
+ url
+ false
+
+ a/b/c/single-tenant/00000000-0000-0000-0000-000000000000/meta.json
+ 2024-03-01T00:00:00.000Z
+ "d42a22ddd183f61924c661b1c026c1ef"
+ 398
+ STANDARD
+
+
+
+ a/b/c/single-tenant/00000000-0000-0000-0000-000000000001/meta.compacted.json
+ 2024-03-01T00:00:00.000Z
+ "d42a22ddd183f61924c661b1c026c1ef"
+ 398
+ STANDARD
+
+ `))
+ return
+ }
+ }
+ },
+ },
+ {
+ name: "without prefix",
+ prefix: "",
+ liveBlockIDs: []uuid.UUID{uuid.MustParse("00000000-0000-0000-0000-000000000000")},
+ compactedBlockIDs: []uuid.UUID{uuid.MustParse("00000000-0000-0000-0000-000000000001")},
+ tenant: "single-tenant",
+ httpHandler: func(t *testing.T) http.HandlerFunc {
+ return func(w http.ResponseWriter, r *http.Request) {
+ if r.Method == getMethod {
+ assert.Equal(t, "single-tenant/", r.URL.Query().Get("prefix"))
+
+ _, _ = w.Write([]byte(`
+
+ blerg
+
+
+ 2
+ 100
+ url
+ false
+
+ single-tenant/00000000-0000-0000-0000-000000000000/meta.json
+ 2024-03-01T00:00:00.000Z
+ "d42a22ddd183f61924c661b1c026c1ef"
+ 398
+ STANDARD
+
+
+
+ single-tenant/00000000-0000-0000-0000-000000000001/meta.compacted.json
+ 2024-03-01T00:00:00.000Z
+ "d42a22ddd183f61924c661b1c026c1ef"
+ 398
+ STANDARD
+
+ `))
+ return
+ }
+ }
+ },
+ },
+ }
+
+ for _, tc := range tests {
+ t.Run(tc.name, func(t *testing.T) {
+ server := testServer(t, tc.httpHandler(t))
+ r, _, _, err := NewNoConfirm(&Config{
+ Region: "blerg",
+ AccessKey: "test",
+ SecretKey: flagext.SecretWithValue("test"),
+ Bucket: "blerg",
+ Prefix: tc.prefix,
+ Insecure: true,
+ Endpoint: server.URL[7:],
+ ListBlocksConcurrency: 1,
+ })
+ require.NoError(t, err)
+
+ ctx := context.Background()
+ blockIDs, compactedBlockIDs, err := r.ListBlocks(ctx, tc.tenant)
+ assert.NoError(t, err)
+
+ assert.ElementsMatchf(t, tc.liveBlockIDs, blockIDs, "Block IDs did not match")
+ assert.ElementsMatchf(t, tc.compactedBlockIDs, compactedBlockIDs, "Compacted block IDs did not match")
+ })
+ }
+}
+
func TestObjectStorageClass(t *testing.T) {
tests := []struct {
name string