From f5ab738f0e095be32a25effda0d7f1c9f32b416d Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Tue, 19 Mar 2024 13:04:47 -0400 Subject: [PATCH] Handle prefixes when listing blocks from S3 and GCS (#3466) (#3502) * Handle prefixes when listing blocks from S3 fixes #3465 * Handle prefixes when listing blocks from GCS * Add test for prefixes when listing blocks from Azure * Update unit tests to check for actual block IDs instead of just length of the slices Cleanup unit tests * Further refine S3/GCS backend for ListBlocks Brings logic more in line with Azure object parsing. Also has the benefit of handling prefixes without a trailing slash. * Update poller integration test to exercise prefixes * Update e2e test to exercise prefixes * Fix format check error * Fix failing e2e tests * Remove unnecessary prefix permutations from e2e test * Remove unnecessary test config file copy * Ignore lint --------- Co-authored-by: Zach Leslie (cherry picked from commit 8e6e7fe86f79d1e02972d0d01b35174d25237f81) Co-authored-by: Ben Foster --- CHANGELOG.md | 9 + .../e2e/config-all-in-one-azurite.yaml | 1 + integration/e2e/config-all-in-one-gcs.yaml | 1 + integration/e2e/config-all-in-one-s3.yaml | 1 + integration/e2e/e2e_test.go | 140 +++++---- integration/e2e/overrides_test.go | 8 +- integration/poller/poller_test.go | 270 ++++++++++-------- tempodb/backend/azure/azure_test.go | 153 ++++++++++ tempodb/backend/gcs/gcs.go | 12 +- tempodb/backend/gcs/gcs_test.go | 113 ++++++++ tempodb/backend/s3/s3.go | 12 +- tempodb/backend/s3/s3_test.go | 120 ++++++++ 12 files changed, 646 insertions(+), 194 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 80e3cbee22b..5a8fdb38e24 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,15 @@ ## main / unreleased * [BUGFIX] Fix some instances where spanmetrics histograms could be inconsistent [#3412](https://github.com/grafana/tempo/pull/3412) (@mdisibio) +* [ENHANCEMENT] Add string interning to TraceQL queries [#3411](https://github.com/grafana/tempo/pull/3411) (@mapno) +* [ENHANCEMENT] Add new (unsafe) query hints for metrics queries [#3396](https://github.com/grafana/tempo/pull/3396) (@mdisibio) +* [BUGFIX] Fix metrics query results when filtering and rating on the same attribute [#3428](https://github.com/grafana/tempo/issues/3428) (@mdisibio) +* [BUGFIX] Fix metrics query results when series contain empty strings or nil values [#3429](https://github.com/grafana/tempo/issues/3429) (@mdisibio) +* [BUGFIX] Fix metrics query duration check, add per-tenant override for max metrics query duration [#3479](https://github.com/grafana/tempo/issues/3479) (@mdisibio) +* [BUGFIX] Return unfiltered results when a bad TraceQL query is provided in autocomplete. [#3426](https://github.com/grafana/tempo/pull/3426) (@mapno) +* [BUGFIX] Correctly handle 429s in GRPC search streaming. [#3469](https://github.com/grafana/tempo/pull/3469) (@joe-ellitot) +* [BUGFIX] Correctly cancel GRPC and HTTP contexts in the frontend to prevent having to rely on http write timeout. [#3443](https://github.com/grafana/tempo/pull/3443) (@joe-elliott) +* [BUGFIX] Fix compaction/retention in AWS S3 and GCS when a prefix is configured. [#3465](https://github.com/grafana/tempo/issues/3465) (@bpfoster) ## v2.4.0-rc.0 diff --git a/integration/e2e/config-all-in-one-azurite.yaml b/integration/e2e/config-all-in-one-azurite.yaml index 0eadfcd8866..3def0ff6f1b 100644 --- a/integration/e2e/config-all-in-one-azurite.yaml +++ b/integration/e2e/config-all-in-one-azurite.yaml @@ -37,6 +37,7 @@ storage: endpoint_suffix: tempo_e2e-azurite:10000 storage_account_name: "devstoreaccount1" storage_account_key: "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==" + prefix: {{ .Prefix }} pool: max_workers: 10 queue_depth: 100 diff --git a/integration/e2e/config-all-in-one-gcs.yaml b/integration/e2e/config-all-in-one-gcs.yaml index 06970879de3..efffae085c9 100644 --- a/integration/e2e/config-all-in-one-gcs.yaml +++ b/integration/e2e/config-all-in-one-gcs.yaml @@ -36,6 +36,7 @@ storage: bucket_name: tempo endpoint: https://tempo_e2e-gcs:4443/storage/v1/ insecure: true + prefix: {{ .Prefix }} pool: max_workers: 10 queue_depth: 1000 diff --git a/integration/e2e/config-all-in-one-s3.yaml b/integration/e2e/config-all-in-one-s3.yaml index ab54c060987..a429b183628 100644 --- a/integration/e2e/config-all-in-one-s3.yaml +++ b/integration/e2e/config-all-in-one-s3.yaml @@ -38,6 +38,7 @@ storage: access_key: Cheescake # TODO: use cortex_e2e.MinioAccessKey secret_key: supersecret # TODO: use cortex_e2e.MinioSecretKey insecure: true + prefix: {{ .Prefix }} pool: max_workers: 10 queue_depth: 100 diff --git a/integration/e2e/e2e_test.go b/integration/e2e/e2e_test.go index 5fe1bff8349..81ecb2832f9 100644 --- a/integration/e2e/e2e_test.go +++ b/integration/e2e/e2e_test.go @@ -58,83 +58,103 @@ func TestAllInOne(t *testing.T) { }, } + storageBackendTestPermutations := []struct { + name string + prefix string + }{ + { + name: "no-prefix", + }, + { + name: "prefix", + prefix: "a/b/c/", + }, + } + for _, tc := range testBackends { - t.Run(tc.name, func(t *testing.T) { - s, err := e2e.NewScenario("tempo_e2e") - require.NoError(t, err) - defer s.Close() + for _, pc := range storageBackendTestPermutations { + t.Run(tc.name+"-"+pc.name, func(t *testing.T) { + s, err := e2e.NewScenario("tempo_e2e") + require.NoError(t, err) + defer s.Close() - // set up the backend - cfg := app.Config{} - buff, err := os.ReadFile(tc.configFile) - require.NoError(t, err) - err = yaml.UnmarshalStrict(buff, &cfg) - require.NoError(t, err) - _, err = backend.New(s, cfg) - require.NoError(t, err) + // copy config template to shared directory and expand template variables + tmplConfig := map[string]any{"Prefix": pc.prefix} + configFile, err := util.CopyTemplateToSharedDir(s, tc.configFile, "config.yaml", tmplConfig) + require.NoError(t, err) - require.NoError(t, util.CopyFileToSharedDir(s, tc.configFile, "config.yaml")) - tempo := util.NewTempoAllInOne() - require.NoError(t, s.StartAndWaitReady(tempo)) + // set up the backend + cfg := app.Config{} + buff, err := os.ReadFile(configFile) + require.NoError(t, err) + err = yaml.UnmarshalStrict(buff, &cfg) + require.NoError(t, err) + _, err = backend.New(s, cfg) + require.NoError(t, err) - // Get port for the Jaeger gRPC receiver endpoint - c, err := util.NewJaegerGRPCClient(tempo.Endpoint(14250)) - require.NoError(t, err) - require.NotNil(t, c) + tempo := util.NewTempoAllInOne() + require.NoError(t, s.StartAndWaitReady(tempo)) - info := tempoUtil.NewTraceInfo(time.Now(), "") - require.NoError(t, info.EmitAllBatches(c)) + // Get port for the Jaeger gRPC receiver endpoint + c, err := util.NewJaegerGRPCClient(tempo.Endpoint(14250)) + require.NoError(t, err) + require.NotNil(t, c) - expected, err := info.ConstructTraceFromEpoch() - require.NoError(t, err) + info := tempoUtil.NewTraceInfo(time.Now(), "") + require.NoError(t, info.EmitAllBatches(c)) - // test metrics - require.NoError(t, tempo.WaitSumMetrics(e2e.Equals(spanCount(expected)), "tempo_distributor_spans_received_total")) + expected, err := info.ConstructTraceFromEpoch() + require.NoError(t, err) - // test echo - assertEcho(t, "http://"+tempo.Endpoint(3200)+"/api/echo") + // test metrics + require.NoError(t, tempo.WaitSumMetrics(e2e.Equals(spanCount(expected)), "tempo_distributor_spans_received_total")) - apiClient := httpclient.New("http://"+tempo.Endpoint(3200), "") + // test echo + // nolint:goconst + assertEcho(t, "http://"+tempo.Endpoint(3200)+"/api/echo") - // query an in-memory trace - queryAndAssertTrace(t, apiClient, info) + apiClient := httpclient.New("http://"+tempo.Endpoint(3200), "") - // wait trace_idle_time and ensure trace is created in ingester - require.NoError(t, tempo.WaitSumMetricsWithOptions(e2e.Less(3), []string{"tempo_ingester_traces_created_total"}, e2e.WaitMissingMetrics)) + // query an in-memory trace + queryAndAssertTrace(t, apiClient, info) - // flush trace to backend - callFlush(t, tempo) + // wait trace_idle_time and ensure trace is created in ingester + require.NoError(t, tempo.WaitSumMetricsWithOptions(e2e.Less(3), []string{"tempo_ingester_traces_created_total"}, e2e.WaitMissingMetrics)) - // search for trace in backend - util.SearchAndAssertTrace(t, apiClient, info) - util.SearchTraceQLAndAssertTrace(t, apiClient, info) + // flush trace to backend + callFlush(t, tempo) - // sleep - time.Sleep(10 * time.Second) + // search for trace in backend + util.SearchAndAssertTrace(t, apiClient, info) + util.SearchTraceQLAndAssertTrace(t, apiClient, info) - // force clear completed block - callFlush(t, tempo) + // sleep + time.Sleep(10 * time.Second) - // test metrics - require.NoError(t, tempo.WaitSumMetrics(e2e.Equals(1), "tempo_ingester_blocks_flushed_total")) - require.NoError(t, tempo.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"tempodb_blocklist_length"}, e2e.WaitMissingMetrics)) - require.NoError(t, tempo.WaitSumMetrics(e2e.Equals(3), "tempo_query_frontend_queries_total")) + // force clear completed block + callFlush(t, tempo) - // query trace - should fetch from backend - queryAndAssertTrace(t, apiClient, info) + // test metrics + require.NoError(t, tempo.WaitSumMetrics(e2e.Equals(1), "tempo_ingester_blocks_flushed_total")) + require.NoError(t, tempo.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"tempodb_blocklist_length"}, e2e.WaitMissingMetrics)) + require.NoError(t, tempo.WaitSumMetrics(e2e.Equals(3), "tempo_query_frontend_queries_total")) - // search the backend. this works b/c we're passing a start/end AND setting query ingesters within min/max to 0 - now := time.Now() - util.SearchAndAssertTraceBackend(t, apiClient, info, now.Add(-20*time.Minute).Unix(), now.Unix()) + // query trace - should fetch from backend + queryAndAssertTrace(t, apiClient, info) - util.SearchAndAsserTagsBackend(t, apiClient, now.Add(-20*time.Minute).Unix(), now.Unix()) + // search the backend. this works b/c we're passing a start/end AND setting query ingesters within min/max to 0 + now := time.Now() + util.SearchAndAssertTraceBackend(t, apiClient, info, now.Add(-20*time.Minute).Unix(), now.Unix()) - // find the trace with streaming. using the http server b/c that's what Grafana will do - grpcClient, err := util.NewSearchGRPCClient(context.Background(), tempo.Endpoint(3200)) - require.NoError(t, err) + util.SearchAndAsserTagsBackend(t, apiClient, now.Add(-20*time.Minute).Unix(), now.Unix()) - util.SearchStreamAndAssertTrace(t, context.Background(), grpcClient, info, now.Add(-20*time.Minute).Unix(), now.Unix()) - }) + // find the trace with streaming. using the http server b/c that's what Grafana will do + grpcClient, err := util.NewSearchGRPCClient(context.Background(), tempo.Endpoint(3200)) + require.NoError(t, err) + + util.SearchStreamAndAssertTrace(t, context.Background(), grpcClient, info, now.Add(-20*time.Minute).Unix(), now.Unix()) + }) + } } } @@ -317,16 +337,20 @@ func TestShutdownDelay(t *testing.T) { require.NoError(t, err) defer s.Close() + // copy config template to shared directory and expand template variables + tmplConfig := map[string]any{"Prefix": ""} + configFile, err := util.CopyTemplateToSharedDir(s, configAllInOneS3, "config.yaml", tmplConfig) + require.NoError(t, err) + // set up the backend cfg := app.Config{} - buff, err := os.ReadFile(configAllInOneS3) + buff, err := os.ReadFile(configFile) require.NoError(t, err) err = yaml.UnmarshalStrict(buff, &cfg) require.NoError(t, err) _, err = backend.New(s, cfg) require.NoError(t, err) - require.NoError(t, util.CopyFileToSharedDir(s, configAllInOneS3, "config.yaml")) tempo := util.NewTempoAllInOne("-shutdown-delay=5s") // this line tests confirms that the readiness flag is up diff --git a/integration/e2e/overrides_test.go b/integration/e2e/overrides_test.go index 98cf0b7fbfa..2e49dbbf476 100644 --- a/integration/e2e/overrides_test.go +++ b/integration/e2e/overrides_test.go @@ -49,16 +49,20 @@ func TestOverrides(t *testing.T) { require.NoError(t, err) defer s.Close() + // copy config template to shared directory and expand template variables + tmplConfig := map[string]any{"Prefix": ""} + configFile, err := util.CopyTemplateToSharedDir(s, tc.configFile, "config.yaml", tmplConfig) + require.NoError(t, err) + // set up the backend cfg := app.Config{} - buff, err := os.ReadFile(tc.configFile) + buff, err := os.ReadFile(configFile) require.NoError(t, err) err = yaml.UnmarshalStrict(buff, &cfg) require.NoError(t, err) _, err = backend.New(s, cfg) require.NoError(t, err) - require.NoError(t, util.CopyFileToSharedDir(s, tc.configFile, "config.yaml")) tempo := util.NewTempoAllInOne() require.NoError(t, s.StartAndWaitReady(tempo)) diff --git a/integration/poller/poller_test.go b/integration/poller/poller_test.go index 65af8b2541e..5d013455dae 100644 --- a/integration/poller/poller_test.go +++ b/integration/poller/poller_test.go @@ -61,132 +61,158 @@ func TestPollerOwnership(t *testing.T) { }, } + storageBackendTestPermutations := []struct { + name string + prefix string + }{ + { + name: "empty-string-prefix", + prefix: "", + }, + { + name: "no-prefix", + }, + { + name: "prefix", + prefix: "a/b/c/", + }, + { + name: "prefix-no-trailing-slash", + prefix: "a/b/c", + }, + } + logger := log.NewLogfmtLogger(os.Stdout) var hhh *e2e.HTTPService t.Parallel() for _, tc := range testCompactorOwnershipBackends { - t.Run(tc.name, func(t *testing.T) { - s, err := e2e.NewScenario("tempo-integration") - require.NoError(t, err) - defer s.Close() - - // set up the backend - cfg := app.Config{} - buff, err := os.ReadFile(tc.configFile) - require.NoError(t, err) - err = yaml.UnmarshalStrict(buff, &cfg) - require.NoError(t, err) - hhh, err = e2eBackend.New(s, cfg) - require.NoError(t, err) - - err = hhh.WaitReady() - require.NoError(t, err) - - err = hhh.Ready() - require.NoError(t, err) - - // Give some time for startup - time.Sleep(1 * time.Second) - - t.Logf("backend: %s", hhh.Endpoint(hhh.HTTPPort())) - - require.NoError(t, util.CopyFileToSharedDir(s, tc.configFile, "config.yaml")) - - var rr backend.RawReader - var ww backend.RawWriter - var cc backend.Compactor - - concurrency := 3 - - e := hhh.Endpoint(hhh.HTTPPort()) - switch tc.name { - case "s3": - cfg.StorageConfig.Trace.S3.ListBlocksConcurrency = concurrency - cfg.StorageConfig.Trace.S3.Endpoint = e - cfg.Overrides.UserConfigurableOverridesConfig.Client.S3.Endpoint = e - rr, ww, cc, err = s3.New(cfg.StorageConfig.Trace.S3) - case "gcs": - cfg.StorageConfig.Trace.GCS.ListBlocksConcurrency = concurrency - cfg.StorageConfig.Trace.GCS.Endpoint = e - cfg.Overrides.UserConfigurableOverridesConfig.Client.GCS.Endpoint = e - rr, ww, cc, err = gcs.New(cfg.StorageConfig.Trace.GCS) - case "azure": - cfg.StorageConfig.Trace.Azure.Endpoint = e - cfg.Overrides.UserConfigurableOverridesConfig.Client.Azure.Endpoint = e - rr, ww, cc, err = azure.New(cfg.StorageConfig.Trace.Azure) - } - require.NoError(t, err) - - r := backend.NewReader(rr) - w := backend.NewWriter(ww) - - blocklistPoller := blocklist.NewPoller(&blocklist.PollerConfig{ - PollConcurrency: 3, - TenantIndexBuilders: 1, - }, OwnsEverythingSharder, r, cc, w, logger) - - // Use the block boundaries in the GCS and S3 implementation - bb := blockboundary.CreateBlockBoundaries(concurrency) - // Pick a boundary to use for this test - base := bb[1] - expected := []uuid.UUID{} - - expected = append(expected, uuid.MustParse("00000000-0000-0000-0000-000000000000")) - expected = append(expected, uuid.MustParse("ffffffff-ffff-ffff-ffff-ffffffffffff")) - - // Grab the one before the boundary - decrementUUIDBytes(base) - expected = append(expected, uuid.UUID(base)) - - incrementUUIDBytes(base) - expected = append(expected, uuid.UUID(base)) - - incrementUUIDBytes(base) - expected = append(expected, uuid.UUID(base)) - - incrementUUIDBytes(base) - expected = append(expected, uuid.UUID(base)) - - writeTenantBlocks(t, w, tenant, expected) - - sort.Slice(expected, func(i, j int) bool { return expected[i].String() < expected[j].String() }) - t.Logf("expected: %v", expected) - - mmResults, cmResults, err := rr.ListBlocks(context.Background(), tenant) - require.NoError(t, err) - sort.Slice(mmResults, func(i, j int) bool { return mmResults[i].String() < mmResults[j].String() }) - t.Logf("mmResults: %s", mmResults) - t.Logf("cmResults: %s", cmResults) - - assert.Equal(t, expected, mmResults) - assert.Equal(t, len(expected), len(mmResults)) - assert.Equal(t, 0, len(cmResults)) - - l := blocklist.New() - mm, cm, err := blocklistPoller.Do(l) - require.NoError(t, err) - t.Logf("mm: %v", mm) - t.Logf("cm: %v", cm) - - l.ApplyPollResults(mm, cm) - - metas := l.Metas(tenant) - - actual := []uuid.UUID{} - for _, m := range metas { - actual = append(actual, m.BlockID) - } - - sort.Slice(actual, func(i, j int) bool { return actual[i].String() < actual[j].String() }) - - assert.Equal(t, expected, actual) - assert.Equal(t, len(expected), len(metas)) - t.Logf("actual: %v", actual) - - for _, e := range expected { - assert.True(t, found(e, metas)) - } - }) + for _, pc := range storageBackendTestPermutations { + t.Run(tc.name+"-"+pc.name, func(t *testing.T) { + s, err := e2e.NewScenario("tempo-integration") + require.NoError(t, err) + defer s.Close() + + // set up the backend + cfg := app.Config{} + buff, err := os.ReadFile(tc.configFile) + require.NoError(t, err) + err = yaml.UnmarshalStrict(buff, &cfg) + require.NoError(t, err) + hhh, err = e2eBackend.New(s, cfg) + require.NoError(t, err) + + err = hhh.WaitReady() + require.NoError(t, err) + + err = hhh.Ready() + require.NoError(t, err) + + // Give some time for startup + time.Sleep(1 * time.Second) + + t.Logf("backend: %s", hhh.Endpoint(hhh.HTTPPort())) + + require.NoError(t, util.CopyFileToSharedDir(s, tc.configFile, "config.yaml")) + + var rr backend.RawReader + var ww backend.RawWriter + var cc backend.Compactor + + concurrency := 3 + + e := hhh.Endpoint(hhh.HTTPPort()) + switch tc.name { + case "s3": + cfg.StorageConfig.Trace.S3.ListBlocksConcurrency = concurrency + cfg.StorageConfig.Trace.S3.Endpoint = e + cfg.StorageConfig.Trace.S3.Prefix = pc.prefix + cfg.Overrides.UserConfigurableOverridesConfig.Client.S3.Endpoint = e + rr, ww, cc, err = s3.New(cfg.StorageConfig.Trace.S3) + case "gcs": + cfg.StorageConfig.Trace.GCS.ListBlocksConcurrency = concurrency + cfg.StorageConfig.Trace.GCS.Endpoint = e + cfg.StorageConfig.Trace.GCS.Prefix = pc.prefix + cfg.Overrides.UserConfigurableOverridesConfig.Client.GCS.Endpoint = e + rr, ww, cc, err = gcs.New(cfg.StorageConfig.Trace.GCS) + case "azure": + cfg.StorageConfig.Trace.Azure.Endpoint = e + cfg.StorageConfig.Trace.Azure.Prefix = pc.prefix + cfg.Overrides.UserConfigurableOverridesConfig.Client.Azure.Endpoint = e + rr, ww, cc, err = azure.New(cfg.StorageConfig.Trace.Azure) + } + require.NoError(t, err) + + r := backend.NewReader(rr) + w := backend.NewWriter(ww) + + blocklistPoller := blocklist.NewPoller(&blocklist.PollerConfig{ + PollConcurrency: 3, + TenantIndexBuilders: 1, + }, OwnsEverythingSharder, r, cc, w, logger) + + // Use the block boundaries in the GCS and S3 implementation + bb := blockboundary.CreateBlockBoundaries(concurrency) + // Pick a boundary to use for this test + base := bb[1] + expected := []uuid.UUID{} + + expected = append(expected, uuid.MustParse("00000000-0000-0000-0000-000000000000")) + expected = append(expected, uuid.MustParse("ffffffff-ffff-ffff-ffff-ffffffffffff")) + + // Grab the one before the boundary + decrementUUIDBytes(base) + expected = append(expected, uuid.UUID(base)) + + incrementUUIDBytes(base) + expected = append(expected, uuid.UUID(base)) + + incrementUUIDBytes(base) + expected = append(expected, uuid.UUID(base)) + + incrementUUIDBytes(base) + expected = append(expected, uuid.UUID(base)) + + writeTenantBlocks(t, w, tenant, expected) + + sort.Slice(expected, func(i, j int) bool { return expected[i].String() < expected[j].String() }) + t.Logf("expected: %v", expected) + + mmResults, cmResults, err := rr.ListBlocks(context.Background(), tenant) + require.NoError(t, err) + sort.Slice(mmResults, func(i, j int) bool { return mmResults[i].String() < mmResults[j].String() }) + t.Logf("mmResults: %s", mmResults) + t.Logf("cmResults: %s", cmResults) + + assert.Equal(t, expected, mmResults) + assert.Equal(t, len(expected), len(mmResults)) + assert.Equal(t, 0, len(cmResults)) + + l := blocklist.New() + mm, cm, err := blocklistPoller.Do(l) + require.NoError(t, err) + t.Logf("mm: %v", mm) + t.Logf("cm: %v", cm) + + l.ApplyPollResults(mm, cm) + + metas := l.Metas(tenant) + + actual := []uuid.UUID{} + for _, m := range metas { + actual = append(actual, m.BlockID) + } + + sort.Slice(actual, func(i, j int) bool { return actual[i].String() < actual[j].String() }) + + assert.Equal(t, expected, actual) + assert.Equal(t, len(expected), len(metas)) + t.Logf("actual: %v", actual) + + for _, e := range expected { + assert.True(t, found(e, metas)) + } + }) + } } } diff --git a/tempodb/backend/azure/azure_test.go b/tempodb/backend/azure/azure_test.go index 8e028de00af..c923259a03e 100644 --- a/tempodb/backend/azure/azure_test.go +++ b/tempodb/backend/azure/azure_test.go @@ -252,6 +252,159 @@ func TestObjectWithPrefix(t *testing.T) { } } +func TestListBlocksWithPrefix(t *testing.T) { + tests := []struct { + name string + prefix string + liveBlockIDs []uuid.UUID + compactedBlockIDs []uuid.UUID + tenant string + httpHandler func(t *testing.T) http.HandlerFunc + }{ + { + name: "with prefix", + prefix: "a/b/c/", + tenant: "single-tenant", + liveBlockIDs: []uuid.UUID{uuid.MustParse("00000000-0000-0000-0000-000000000000")}, + compactedBlockIDs: []uuid.UUID{uuid.MustParse("00000000-0000-0000-0000-000000000001")}, + httpHandler: func(t *testing.T) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + if r.Method == "GET" { + assert.Equal(t, "a/b/c/single-tenant/", r.URL.Query().Get("prefix")) + + _, _ = w.Write([]byte(` + + + a/b/c/ + 100 + + + a/b/c/single-tenant/00000000-0000-0000-0000-000000000000/meta.json + https://myaccount.blob.core.windows.net/mycontainer/a/b/c/single-tenant/00000000-0000-0000-0000-000000000000/meta.json + + Fri, 01 Mar 2024 00:00:00 GMT + 0x8CBFF45D8A29A19 + 100 + text/html + + en-US + + no-cache + BlockBlob + unlocked + + + + + a/b/c/single-tenant/00000000-0000-0000-0000-000000000001/meta.compacted.json + https://myaccount.blob.core.windows.net/mycontainer/a/b/c/single-tenant/00000000-0000-0000-0000-000000000001/meta.compacted.json + + Fri, 01 Mar 2024 00:00:00 GMT + 0x8CBFF45D8A29A19 + 100 + text/html + + en-US + + no-cache + BlockBlob + unlocked + + + + + + `)) + return + } + } + }, + }, + { + name: "without prefix", + prefix: "", + tenant: "single-tenant", + liveBlockIDs: []uuid.UUID{uuid.MustParse("00000000-0000-0000-0000-000000000000")}, + compactedBlockIDs: []uuid.UUID{uuid.MustParse("00000000-0000-0000-0000-000000000001")}, + httpHandler: func(t *testing.T) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + if r.Method == "GET" { + assert.Equal(t, "single-tenant/", r.URL.Query().Get("prefix")) + + _, _ = w.Write([]byte(` + + + + 100 + + + single-tenant/00000000-0000-0000-0000-000000000000/meta.json + https://myaccount.blob.core.windows.net/mycontainer/single-tenant/00000000-0000-0000-0000-000000000000/meta.json + + Fri, 01 Mar 2024 00:00:00 GMT + 0x8CBFF45D8A29A19 + 100 + text/html + + en-US + + no-cache + BlockBlob + unlocked + + + + + single-tenant/00000000-0000-0000-0000-000000000001/meta.compacted.json + https://myaccount.blob.core.windows.net/mycontainer/single-tenant/00000000-0000-0000-0000-000000000001/meta.compacted.json + + Fri, 01 Mar 2024 00:00:00 GMT + 0x8CBFF45D8A29A19 + 100 + text/html + + en-US + + no-cache + BlockBlob + unlocked + + + + + + `)) + return + } + } + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + server := testServer(t, tc.httpHandler(t)) + r, _, _, err := NewNoConfirm(&config.Config{ + StorageAccountName: "testing_account", + StorageAccountKey: flagext.SecretWithValue("YQo="), + MaxBuffers: 3, + BufferSize: 1000, + ContainerName: "blerg", + Prefix: tc.prefix, + Endpoint: server.URL[7:], // [7:] -> strip http://, + }) + require.NoError(t, err) + + ctx := context.Background() + blockIDs, compactedBlockIDs, err2 := r.ListBlocks(ctx, tc.tenant) + assert.NoError(t, err2) + + assert.ElementsMatchf(t, tc.liveBlockIDs, blockIDs, "Block IDs did not match") + assert.ElementsMatchf(t, tc.compactedBlockIDs, compactedBlockIDs, "Compacted block IDs did not match") + }) + } +} + func testServer(t *testing.T, httpHandler http.HandlerFunc) *httptest.Server { t.Helper() assert.NotNil(t, httpHandler) diff --git a/tempodb/backend/gcs/gcs.go b/tempodb/backend/gcs/gcs.go index c864daae3be..1d4b346e68b 100644 --- a/tempodb/backend/gcs/gcs.go +++ b/tempodb/backend/gcs/gcs.go @@ -253,20 +253,20 @@ func (rw *readerWriter) ListBlocks(ctx context.Context, tenant string) ([]uuid.U return } - parts = strings.Split(attrs.Name, "/") - // ie: //meta.json - if len(parts) != 3 { + parts = strings.Split(strings.TrimPrefix(attrs.Name, prefix), "/") + // ie: /meta.json + if len(parts) != 2 { continue } - switch parts[2] { + switch parts[1] { case backend.MetaName: case backend.CompactedMetaName: default: continue } - id, err = uuid.Parse(parts[1]) + id, err = uuid.Parse(parts[0]) if err != nil { continue } @@ -283,7 +283,7 @@ func (rw *readerWriter) ListBlocks(ctx context.Context, tenant string) ([]uuid.U } mtx.Lock() - switch parts[2] { + switch parts[1] { case backend.MetaName: blockIDs = append(blockIDs, id) case backend.CompactedMetaName: diff --git a/tempodb/backend/gcs/gcs_test.go b/tempodb/backend/gcs/gcs_test.go index 96face1846a..e0153eced1e 100644 --- a/tempodb/backend/gcs/gcs_test.go +++ b/tempodb/backend/gcs/gcs_test.go @@ -299,6 +299,119 @@ func TestObjectWithPrefix(t *testing.T) { } } +func TestListBlocksWithPrefix(t *testing.T) { + tests := []struct { + name string + prefix string + tenant string + liveBlockIDs []uuid.UUID + compactedBlockIDs []uuid.UUID + httpHandler func(t *testing.T) http.HandlerFunc + }{ + { + name: "with prefix", + prefix: "a/b/c/", + tenant: "single-tenant", + liveBlockIDs: []uuid.UUID{uuid.MustParse("00000000-0000-0000-0000-000000000000")}, + compactedBlockIDs: []uuid.UUID{uuid.MustParse("00000000-0000-0000-0000-000000000001")}, + httpHandler: func(t *testing.T) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + if r.Method == "GET" { + assert.Equal(t, "a/b/c/single-tenant/", r.URL.Query().Get("prefix")) + + _, _ = w.Write([]byte(` + { + "kind": "storage#objects", + "items": [{ + "kind": "storage#object", + "id": "1", + "name": "a/b/c/single-tenant/00000000-0000-0000-0000-000000000000/meta.json", + "bucket": "blerg", + "storageClass": "STANDARD", + "size": "1024", + "timeCreated": "2024-03-01T00:00:00.000Z", + "updated": "2024-03-01T00:00:00.000Z" + }, { + "kind": "storage#object", + "id": "2", + "name": "a/b/c/single-tenant/00000000-0000-0000-0000-000000000001/meta.compacted.json", + "bucket": "blerg", + "storageClass": "STANDARD", + "size": "1024", + "timeCreated": "2024-03-01T00:00:00.000Z", + "updated": "2024-03-01T00:00:00.000Z" + }] + } + `)) + return + } + } + }, + }, + { + name: "without prefix", + prefix: "", + tenant: "single-tenant", + liveBlockIDs: []uuid.UUID{uuid.MustParse("00000000-0000-0000-0000-000000000000")}, + compactedBlockIDs: []uuid.UUID{uuid.MustParse("00000000-0000-0000-0000-000000000001")}, + httpHandler: func(t *testing.T) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + if r.Method == "GET" { + assert.Equal(t, "single-tenant/", r.URL.Query().Get("prefix")) + + _, _ = w.Write([]byte(` + { + "kind": "storage#objects", + "items": [{ + "kind": "storage#object", + "id": "1", + "name": "single-tenant/00000000-0000-0000-0000-000000000000/meta.json", + "bucket": "blerg", + "storageClass": "STANDARD", + "size": "1024", + "timeCreated": "2024-03-01T00:00:00.000Z", + "updated": "2024-03-01T00:00:00.000Z" + }, { + "kind": "storage#object", + "id": "2", + "name": "single-tenant/00000000-0000-0000-0000-000000000001/meta.compacted.json", + "bucket": "blerg", + "storageClass": "STANDARD", + "size": "1024", + "timeCreated": "2024-03-01T00:00:00.000Z", + "updated": "2024-03-01T00:00:00.000Z" + }] + } + `)) + return + } + } + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + server := testServer(t, tc.httpHandler(t)) + r, _, _, err := NewNoConfirm(&Config{ + BucketName: "blerg", + Endpoint: server.URL, + Insecure: true, + Prefix: tc.prefix, + ListBlocksConcurrency: 1, + }) + require.NoError(t, err) + + ctx := context.Background() + blockIDs, compactedBlockIDs, err := r.ListBlocks(ctx, tc.tenant) + assert.NoError(t, err) + + assert.ElementsMatchf(t, tc.liveBlockIDs, blockIDs, "Block IDs did not match") + assert.ElementsMatchf(t, tc.compactedBlockIDs, compactedBlockIDs, "Compacted block IDs did not match") + }) + } +} + func testServer(t *testing.T, httpHandler http.HandlerFunc) *httptest.Server { t.Helper() assert.NotNil(t, httpHandler) diff --git a/tempodb/backend/s3/s3.go b/tempodb/backend/s3/s3.go index 709a844660c..dd037112e68 100644 --- a/tempodb/backend/s3/s3.go +++ b/tempodb/backend/s3/s3.go @@ -333,20 +333,20 @@ func (rw *readerWriter) ListBlocks( } for _, c := range res.Contents { - // i.e: /meta - parts := strings.Split(c.Key, "/") - if len(parts) != 3 { + // i.e: /meta + parts := strings.Split(strings.TrimPrefix(c.Key, prefix), "/") + if len(parts) != 2 { continue } - switch parts[2] { + switch parts[1] { case backend.MetaName: case backend.CompactedMetaName: default: continue } - id, err := uuid.Parse(parts[1]) + id, err := uuid.Parse(parts[0]) if err != nil { continue } @@ -363,7 +363,7 @@ func (rw *readerWriter) ListBlocks( } mtx.Lock() - switch parts[2] { + switch parts[1] { case backend.MetaName: blockIDs = append(blockIDs, id) case backend.CompactedMetaName: diff --git a/tempodb/backend/s3/s3_test.go b/tempodb/backend/s3/s3_test.go index f4f460972c0..bcccda6f39e 100644 --- a/tempodb/backend/s3/s3_test.go +++ b/tempodb/backend/s3/s3_test.go @@ -18,6 +18,8 @@ import ( "testing" "time" + "github.com/google/uuid" + "github.com/aws/aws-sdk-go/service/s3" "github.com/grafana/dskit/flagext" "github.com/minio/minio-go/v7" @@ -449,6 +451,124 @@ func TestObjectWithPrefix(t *testing.T) { } } +func TestListBlocksWithPrefix(t *testing.T) { + tests := []struct { + name string + prefix string + tenant string + liveBlockIDs []uuid.UUID + compactedBlockIDs []uuid.UUID + httpHandler func(t *testing.T) http.HandlerFunc + }{ + { + name: "with prefix", + prefix: "a/b/c/", + tenant: "single-tenant", + liveBlockIDs: []uuid.UUID{uuid.MustParse("00000000-0000-0000-0000-000000000000")}, + compactedBlockIDs: []uuid.UUID{uuid.MustParse("00000000-0000-0000-0000-000000000001")}, + httpHandler: func(t *testing.T) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + if r.Method == getMethod { + assert.Equal(t, "a/b/c/single-tenant/", r.URL.Query().Get("prefix")) + + _, _ = w.Write([]byte(` + + blerg + a/b/c + + 2 + 100 + url + false + + a/b/c/single-tenant/00000000-0000-0000-0000-000000000000/meta.json + 2024-03-01T00:00:00.000Z + "d42a22ddd183f61924c661b1c026c1ef" + 398 + STANDARD + + + + a/b/c/single-tenant/00000000-0000-0000-0000-000000000001/meta.compacted.json + 2024-03-01T00:00:00.000Z + "d42a22ddd183f61924c661b1c026c1ef" + 398 + STANDARD + + `)) + return + } + } + }, + }, + { + name: "without prefix", + prefix: "", + liveBlockIDs: []uuid.UUID{uuid.MustParse("00000000-0000-0000-0000-000000000000")}, + compactedBlockIDs: []uuid.UUID{uuid.MustParse("00000000-0000-0000-0000-000000000001")}, + tenant: "single-tenant", + httpHandler: func(t *testing.T) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + if r.Method == getMethod { + assert.Equal(t, "single-tenant/", r.URL.Query().Get("prefix")) + + _, _ = w.Write([]byte(` + + blerg + + + 2 + 100 + url + false + + single-tenant/00000000-0000-0000-0000-000000000000/meta.json + 2024-03-01T00:00:00.000Z + "d42a22ddd183f61924c661b1c026c1ef" + 398 + STANDARD + + + + single-tenant/00000000-0000-0000-0000-000000000001/meta.compacted.json + 2024-03-01T00:00:00.000Z + "d42a22ddd183f61924c661b1c026c1ef" + 398 + STANDARD + + `)) + return + } + } + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + server := testServer(t, tc.httpHandler(t)) + r, _, _, err := NewNoConfirm(&Config{ + Region: "blerg", + AccessKey: "test", + SecretKey: flagext.SecretWithValue("test"), + Bucket: "blerg", + Prefix: tc.prefix, + Insecure: true, + Endpoint: server.URL[7:], + ListBlocksConcurrency: 1, + }) + require.NoError(t, err) + + ctx := context.Background() + blockIDs, compactedBlockIDs, err := r.ListBlocks(ctx, tc.tenant) + assert.NoError(t, err) + + assert.ElementsMatchf(t, tc.liveBlockIDs, blockIDs, "Block IDs did not match") + assert.ElementsMatchf(t, tc.compactedBlockIDs, compactedBlockIDs, "Compacted block IDs did not match") + }) + } +} + func TestObjectStorageClass(t *testing.T) { tests := []struct { name string