diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 0cf86486eda..46b958363b5 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -123,6 +123,9 @@ jobs: - name: Test run: make test-e2e + - name: Poller + run: make test-integration-poller + integration-tests-serverless: name: Test serverless integration e2e suite runs-on: ubuntu-latest diff --git a/Makefile b/Makefile index f57b75f3237..3cf6689f0b0 100644 --- a/Makefile +++ b/Makefile @@ -129,9 +129,13 @@ test-e2e: tools docker-tempo docker-tempo-query test-e2e-serverless: tools docker-tempo docker-serverless $(GOTEST) -v $(GOTEST_OPT) ./integration/e2e/serverless +.PHONY: test-integration-poller +test-integration-poller: tools + $(GOTEST) -v $(GOTEST_OPT) ./integration/poller + # test-all/bench use a docker image so build it first to make sure we're up to date .PHONY: test-all -test-all: test-with-cover test-e2e test-e2e-serverless +test-all: test-with-cover test-e2e test-e2e-serverless test-integration-poller .PHONY: test-bench test-bench: tools docker-tempo diff --git a/cmd/tempo-cli/cmd-analyse-blocks.go b/cmd/tempo-cli/cmd-analyse-blocks.go index e643b6e0384..02ea62bab46 100644 --- a/cmd/tempo-cli/cmd-analyse-blocks.go +++ b/cmd/tempo-cli/cmd-analyse-blocks.go @@ -21,7 +21,7 @@ func (cmd *analyseBlocksCmd) Run(ctx *globalOptions) error { } // TODO: Parallelize this - blocks, err := r.Blocks(context.Background(), cmd.TenantID) + blocks, _, err := r.Blocks(context.Background(), cmd.TenantID) if err != nil { return err } diff --git a/cmd/tempo-cli/cmd-migrate-tenant.go b/cmd/tempo-cli/cmd-migrate-tenant.go index 21162a76286..a3c9f48ea03 100644 --- a/cmd/tempo-cli/cmd-migrate-tenant.go +++ b/cmd/tempo-cli/cmd-migrate-tenant.go @@ -37,7 +37,7 @@ func (cmd *migrateTenantCmd) Run(opts *globalOptions) error { // TODO create dest directory if it doesn't exist yet? - blocksDest, err := readerDest.Blocks(ctx, cmd.DestTenantID) + blocksDest, _, err := readerDest.Blocks(ctx, cmd.DestTenantID) if err != nil { return err } diff --git a/cmd/tempo-cli/cmd-query-blocks.go b/cmd/tempo-cli/cmd-query-blocks.go index b6b89f0a744..df7c53d7993 100644 --- a/cmd/tempo-cli/cmd-query-blocks.go +++ b/cmd/tempo-cli/cmd-query-blocks.go @@ -84,13 +84,15 @@ func (cmd *queryBlocksCmd) Run(ctx *globalOptions) error { } func queryBucket(ctx context.Context, r backend.Reader, c backend.Compactor, tenantID string, traceID common.ID) ([]queryResults, error) { - blockIDs, err := r.Blocks(context.Background(), tenantID) + blockIDs, compactedBlockIDs, err := r.Blocks(context.Background(), tenantID) if err != nil { return nil, err } fmt.Println("total blocks to search: ", len(blockIDs)) + blockIDs = append(blockIDs, compactedBlockIDs...) + // Load in parallel wg := boundedwaitgroup.New(100) resultsCh := make(chan queryResults, len(blockIDs)) diff --git a/cmd/tempo-cli/cmd-query-trace-summary.go b/cmd/tempo-cli/cmd-query-trace-summary.go index f407b9ee219..0e7a5fb649d 100644 --- a/cmd/tempo-cli/cmd-query-trace-summary.go +++ b/cmd/tempo-cli/cmd-query-trace-summary.go @@ -87,7 +87,7 @@ func sortServiceNames(nameFrequencies map[string]int) PairList { } func queryBucketForSummary(ctx context.Context, percentage float32, r backend.Reader, c backend.Compactor, tenantID string, traceID common.ID) (*TraceSummary, error) { - blockIDs, err := r.Blocks(context.Background(), tenantID) + blockIDs, _, err := r.Blocks(context.Background(), tenantID) if err != nil { return nil, err } diff --git a/cmd/tempo-cli/cmd-search.go b/cmd/tempo-cli/cmd-search.go index edb4ca85510..2b9c94ae491 100644 --- a/cmd/tempo-cli/cmd-search.go +++ b/cmd/tempo-cli/cmd-search.go @@ -53,7 +53,7 @@ func (cmd *searchBlocksCmd) Run(opts *globalOptions) error { ctx := context.Background() - blockIDs, err := r.Blocks(ctx, cmd.TenantID) + blockIDs, _, err := r.Blocks(ctx, cmd.TenantID) if err != nil { return err } diff --git a/cmd/tempo-cli/shared.go b/cmd/tempo-cli/shared.go index f642bb954be..b0b2249b660 100644 --- a/cmd/tempo-cli/shared.go +++ b/cmd/tempo-cli/shared.go @@ -56,11 +56,13 @@ type blockStats struct { } func loadBucket(r backend.Reader, c backend.Compactor, tenantID string, windowRange time.Duration, includeCompacted bool) ([]blockStats, error) { - blockIDs, err := r.Blocks(context.Background(), tenantID) + blockIDs, compactedBlockIDs, err := r.Blocks(context.Background(), tenantID) if err != nil { return nil, err } + blockIDs = append(blockIDs, compactedBlockIDs...) + fmt.Println("total blocks: ", len(blockIDs)) // Load in parallel diff --git a/cmd/tempo/app/modules.go b/cmd/tempo/app/modules.go index 1fb02b14d54..eb5f0befb9a 100644 --- a/cmd/tempo/app/modules.go +++ b/cmd/tempo/app/modules.go @@ -1,6 +1,7 @@ package app import ( + "context" "errors" "fmt" "io" @@ -295,7 +296,7 @@ func (t *App) initQuerier() (services.Service, error) { // do not enable polling if this is the single binary. in that case the compactor will take care of polling if t.cfg.Target == Querier { - t.store.EnablePolling(nil) + t.store.EnablePolling(context.Background(), nil) } ingesterRings := []ring.ReadRing{t.readRings[ringIngester]} @@ -394,7 +395,7 @@ func (t *App) initQueryFrontend() (services.Service, error) { t.Server.HTTP.Handle(addHTTPAPIPrefix(&t.cfg, api.PathSpanMetricsSummary), spanMetricsSummaryHandler) // the query frontend needs to have knowledge of the blocks so it can shard search jobs - t.store.EnablePolling(nil) + t.store.EnablePolling(context.Background(), nil) // http query echo endpoint t.Server.HTTP.Handle(addHTTPAPIPrefix(&t.cfg, api.PathEcho), echoHandler()) diff --git a/docs/sources/tempo/configuration/_index.md b/docs/sources/tempo/configuration/_index.md index c4595361429..b2c268e763c 100644 --- a/docs/sources/tempo/configuration/_index.md +++ b/docs/sources/tempo/configuration/_index.md @@ -731,6 +731,11 @@ storage: # Example: "endpoint: s3.dualstack.us-east-2.amazonaws.com" [endpoint: ] + # The number of list calls to make in parallel to the backend per instance. + # Adjustments here will impact the polling time, as well as the number of Go routines. + # Default is 3 + [list_blocks_concurrency: ] + # optional. # By default the region is inferred from the endpoint, # but is required for some S3-compatible storage engines. diff --git a/integration/e2e/backend/backend.go b/integration/e2e/backend/backend.go index 12f7a663dc2..3c45b523dce 100644 --- a/integration/e2e/backend/backend.go +++ b/integration/e2e/backend/backend.go @@ -17,7 +17,7 @@ import ( const ( azuriteImage = "mcr.microsoft.com/azure-storage/azurite" - gcsImage = "fsouza/fake-gcs-server:1.47.3" + gcsImage = "fsouza/fake-gcs-server:1.47.6" ) func parsePort(endpoint string) (int, error) { diff --git a/integration/poller/config-azurite.yaml b/integration/poller/config-azurite.yaml new file mode 100644 index 00000000000..0eadfcd8866 --- /dev/null +++ b/integration/poller/config-azurite.yaml @@ -0,0 +1,54 @@ +target: all +stream_over_http_enabled: true + +server: + http_listen_port: 3200 + +query_frontend: + search: + query_backend_after: 0 # setting these both to 0 will force all range searches to hit the backend + query_ingesters_until: 0 + +distributor: + receivers: + jaeger: + protocols: + grpc: + +ingester: + lifecycler: + address: 127.0.0.1 + ring: + kvstore: + store: inmemory + replication_factor: 1 + final_sleep: 0s + trace_idle_period: 1s + max_block_duration: 5s + complete_block_timeout: 5s + flush_check_period: 1s + +storage: + trace: + blocklist_poll: 1s + backend: azure + azure: + container_name: tempo # how to store data in azure + endpoint_suffix: tempo_e2e-azurite:10000 + storage_account_name: "devstoreaccount1" + storage_account_key: "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==" + pool: + max_workers: 10 + queue_depth: 100 + +overrides: + user_configurable_overrides: + enabled: true + poll_interval: 10s + client: + backend: azure + azure: + container_name: tempo + endpoint_suffix: tempo_e2e-azurite:10000 + storage_account_name: "devstoreaccount1" + storage_account_key: "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==" diff --git a/integration/poller/config-gcs.yaml b/integration/poller/config-gcs.yaml new file mode 100644 index 00000000000..06970879de3 --- /dev/null +++ b/integration/poller/config-gcs.yaml @@ -0,0 +1,54 @@ +target: all +stream_over_http_enabled: true + +server: + http_listen_port: 3200 + +query_frontend: + search: + query_ingesters_until: 0 # setting these both to 0 will force all range searches to hit the backend + query_backend_after: 0 + +distributor: + receivers: + jaeger: + protocols: + grpc: + +ingester: + lifecycler: + address: 127.0.0.1 + ring: + kvstore: + store: inmemory + replication_factor: 1 + final_sleep: 0s + trace_idle_period: 1s + max_block_duration: 5s + complete_block_timeout: 5s + flush_check_period: 1s + +storage: + trace: + blocklist_poll: 1s + backend: gcs + gcs: + bucket_name: tempo + endpoint: https://tempo_e2e-gcs:4443/storage/v1/ + insecure: true + pool: + max_workers: 10 + queue_depth: 1000 + +overrides: + user_configurable_overrides: + enabled: true + poll_interval: 10s + client: + backend: gcs + # fsouza/fake-gcs-server does not support versioning + confirm_versioning: false + gcs: + bucket_name: tempo + endpoint: https://tempo_e2e-gcs:4443/storage/v1/ + insecure: true diff --git a/integration/poller/config-s3.yaml b/integration/poller/config-s3.yaml new file mode 100644 index 00000000000..a1cfa34e38c --- /dev/null +++ b/integration/poller/config-s3.yaml @@ -0,0 +1,57 @@ +target: all +stream_over_http_enabled: true + +server: + http_listen_port: 3200 + +query_frontend: + search: + query_backend_after: 0 # setting these both to 0 will force all range searches to hit the backend + query_ingesters_until: 0 + +distributor: + receivers: + jaeger: + protocols: + grpc: + +ingester: + lifecycler: + address: 127.0.0.1 + ring: + kvstore: + store: inmemory + replication_factor: 1 + final_sleep: 0s + trace_idle_period: 1s + max_block_duration: 5s + complete_block_timeout: 5s + flush_check_period: 1s + +storage: + trace: + blocklist_poll: 1s + backend: s3 + s3: + bucket: tempo + endpoint: tempo-integration-minio-9000:9000 # TODO: this is brittle, fix this eventually + access_key: Cheescake # TODO: use cortex_e2e.MinioAccessKey + secret_key: supersecret # TODO: use cortex_e2e.MinioSecretKey + insecure: true + pool: + max_workers: 10 + queue_depth: 100 + +overrides: + user_configurable_overrides: + enabled: true + poll_interval: 10s + client: + backend: s3 + s3: + # TODO use separate bucket? + bucket: tempo + endpoint: tempo-integration-minio-9000:9000 # TODO: this is brittle, fix this eventually + access_key: Cheescake # TODO: use cortex_e2e.MinioAccessKey + secret_key: supersecret # TODO: use cortex_e2e.MinioSecretKey + insecure: true diff --git a/integration/poller/poller_test.go b/integration/poller/poller_test.go new file mode 100644 index 00000000000..947afd450bd --- /dev/null +++ b/integration/poller/poller_test.go @@ -0,0 +1,236 @@ +package poller + +import ( + "context" + "os" + "sort" + "testing" + "time" + + "github.com/go-kit/log" + "github.com/google/uuid" + "github.com/grafana/e2e" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "gopkg.in/yaml.v2" + + "github.com/grafana/tempo/cmd/tempo/app" + util "github.com/grafana/tempo/integration" + e2eBackend "github.com/grafana/tempo/integration/e2e/backend" + "github.com/grafana/tempo/pkg/blockboundary" + "github.com/grafana/tempo/tempodb/backend" + "github.com/grafana/tempo/tempodb/backend/azure" + "github.com/grafana/tempo/tempodb/backend/gcs" + "github.com/grafana/tempo/tempodb/backend/s3" + "github.com/grafana/tempo/tempodb/blocklist" +) + +const ( + configS3 = "config-s3.yaml" + configAzurite = "config-azurite.yaml" + configGCS = "config-gcs.yaml" + + tenant = "test" +) + +// OwnsEverythingSharder owns everything. +var OwnsEverythingSharder = ownsEverythingSharder{} + +type ownsEverythingSharder struct{} + +func (ownsEverythingSharder) Owns(_ string) bool { + return true +} + +func TestPollerOwnership(t *testing.T) { + testCompactorOwnershipBackends := []struct { + name string + configFile string + }{ + { + name: "s3", + configFile: configS3, + }, + { + name: "azure", + configFile: configAzurite, + }, + { + name: "gcs", + configFile: configGCS, + }, + } + + logger := log.NewLogfmtLogger(os.Stdout) + var hhh *e2e.HTTPService + t.Parallel() + for _, tc := range testCompactorOwnershipBackends { + t.Run(tc.name, func(t *testing.T) { + s, err := e2e.NewScenario("tempo-integration") + require.NoError(t, err) + defer s.Close() + + // set up the backend + cfg := app.Config{} + buff, err := os.ReadFile(tc.configFile) + require.NoError(t, err) + err = yaml.UnmarshalStrict(buff, &cfg) + require.NoError(t, err) + hhh, err = e2eBackend.New(s, cfg) + require.NoError(t, err) + + err = hhh.WaitReady() + require.NoError(t, err) + + err = hhh.Ready() + require.NoError(t, err) + + // Give some time for startup + time.Sleep(1 * time.Second) + + t.Logf("backend: %s", hhh.Endpoint(hhh.HTTPPort())) + + require.NoError(t, util.CopyFileToSharedDir(s, tc.configFile, "config.yaml")) + + var rr backend.RawReader + var ww backend.RawWriter + var cc backend.Compactor + + concurrency := 3 + + e := hhh.Endpoint(hhh.HTTPPort()) + switch tc.name { + case "s3": + cfg.StorageConfig.Trace.S3.ListBlocksConcurrency = concurrency + cfg.StorageConfig.Trace.S3.Endpoint = e + cfg.Overrides.UserConfigurableOverridesConfig.Client.S3.Endpoint = e + rr, ww, cc, err = s3.New(cfg.StorageConfig.Trace.S3) + case "gcs": + cfg.StorageConfig.Trace.GCS.ListBlocksConcurrency = concurrency + cfg.StorageConfig.Trace.GCS.Endpoint = e + cfg.Overrides.UserConfigurableOverridesConfig.Client.GCS.Endpoint = e + rr, ww, cc, err = gcs.New(cfg.StorageConfig.Trace.GCS) + case "azure": + cfg.StorageConfig.Trace.Azure.Endpoint = e + cfg.Overrides.UserConfigurableOverridesConfig.Client.Azure.Endpoint = e + rr, ww, cc, err = azure.New(cfg.StorageConfig.Trace.Azure) + } + require.NoError(t, err) + + r := backend.NewReader(rr) + w := backend.NewWriter(ww) + + blocklistPoller := blocklist.NewPoller(&blocklist.PollerConfig{ + PollConcurrency: 3, + TenantIndexBuilders: 1, + }, OwnsEverythingSharder, r, cc, w, logger) + + // Use the block boundaries in the GCS and S3 implementation + bb := blockboundary.CreateBlockBoundaries(concurrency) + // Pick a boundary to use for this test + base := bb[1] + expected := []uuid.UUID{} + + expected = append(expected, uuid.MustParse("00000000-0000-0000-0000-000000000000")) + expected = append(expected, uuid.MustParse("ffffffff-ffff-ffff-ffff-ffffffffffff")) + + // Grab the one before the boundary + decrementUUIDBytes(base) + expected = append(expected, uuid.UUID(base)) + + incrementUUIDBytes(base) + expected = append(expected, uuid.UUID(base)) + + incrementUUIDBytes(base) + expected = append(expected, uuid.UUID(base)) + + incrementUUIDBytes(base) + expected = append(expected, uuid.UUID(base)) + + writeTenantBlocks(t, w, tenant, expected) + + sort.Slice(expected, func(i, j int) bool { return expected[i].String() < expected[j].String() }) + t.Logf("expected: %v", expected) + + mmResults, cmResults, err := rr.ListBlocks(context.Background(), tenant) + require.NoError(t, err) + sort.Slice(mmResults, func(i, j int) bool { return mmResults[i].String() < mmResults[j].String() }) + t.Logf("mmResults: %s", mmResults) + t.Logf("cmResults: %s", cmResults) + + assert.Equal(t, expected, mmResults) + assert.Equal(t, len(expected), len(mmResults)) + assert.Equal(t, 0, len(cmResults)) + + l := blocklist.New() + mm, cm, err := blocklistPoller.Do(l) + require.NoError(t, err) + t.Logf("mm: %v", mm) + t.Logf("cm: %v", cm) + + l.ApplyPollResults(mm, cm) + + metas := l.Metas(tenant) + + actual := []uuid.UUID{} + for _, m := range metas { + actual = append(actual, m.BlockID) + } + + sort.Slice(actual, func(i, j int) bool { return actual[i].String() < actual[j].String() }) + + assert.Equal(t, expected, actual) + assert.Equal(t, len(expected), len(metas)) + t.Logf("actual: %v", actual) + + for _, e := range expected { + assert.True(t, found(e, metas)) + } + }) + } +} + +func found(id uuid.UUID, blockMetas []*backend.BlockMeta) bool { + for _, b := range blockMetas { + if b.BlockID == id { + return true + } + } + + return false +} + +func writeTenantBlocks(t *testing.T, w backend.Writer, tenant string, blockIDs []uuid.UUID) { + var err error + for _, b := range blockIDs { + meta := &backend.BlockMeta{ + BlockID: b, + TenantID: tenant, + } + + err = w.WriteBlockMeta(context.Background(), meta) + require.NoError(t, err) + } +} + +func decrementUUIDBytes(uuidBytes []byte) { + for i := len(uuidBytes) - 1; i >= 0; i-- { + if uuidBytes[i] > 0 { + uuidBytes[i]-- + break + } else { + uuidBytes[i] = 255 // Wrap around if the byte is 0 + } + } +} + +func incrementUUIDBytes(uuidBytes []byte) { + for i := len(uuidBytes) - 1; i >= 0; i-- { + if uuidBytes[i] < 255 { + uuidBytes[i]++ + break + } else { + uuidBytes[i] = 0 // Wrap around if the byte is 255 + } + } +} diff --git a/modules/compactor/compactor.go b/modules/compactor/compactor.go index a1b45770ba3..c53bb303958 100644 --- a/modules/compactor/compactor.go +++ b/modules/compactor/compactor.go @@ -149,7 +149,7 @@ func (c *Compactor) starting(ctx context.Context) (err error) { } // this will block until one poll cycle is complete - c.store.EnablePolling(c) + c.store.EnablePolling(ctx, c) return nil } diff --git a/modules/frontend/searchsharding_test.go b/modules/frontend/searchsharding_test.go index 66ba6824892..92d04e2ca5c 100644 --- a/modules/frontend/searchsharding_test.go +++ b/modules/frontend/searchsharding_test.go @@ -56,8 +56,8 @@ func (m *mockReader) Fetch(context.Context, *backend.BlockMeta, traceql.FetchSpa return traceql.FetchSpansResponse{}, nil } -func (m *mockReader) EnablePolling(blocklist.JobSharder) {} -func (m *mockReader) Shutdown() {} +func (m *mockReader) EnablePolling(context.Context, blocklist.JobSharder) {} +func (m *mockReader) Shutdown() {} func TestBuildBackendRequests(t *testing.T) { tests := []struct { diff --git a/modules/frontend/tracebyidsharding.go b/modules/frontend/tracebyidsharding.go index 55688d828dd..0ab7dfb150f 100644 --- a/modules/frontend/tracebyidsharding.go +++ b/modules/frontend/tracebyidsharding.go @@ -3,10 +3,8 @@ package frontend import ( "bytes" "context" - "encoding/binary" "encoding/hex" "io" - "math" "net/http" "strings" "sync" @@ -15,13 +13,15 @@ import ( "github.com/go-kit/log/level" "github.com/golang/protobuf/proto" //nolint:all //deprecated "github.com/grafana/dskit/user" + "github.com/opentracing/opentracing-go" + "github.com/grafana/tempo/modules/overrides" "github.com/grafana/tempo/modules/querier" "github.com/grafana/tempo/pkg/api" + "github.com/grafana/tempo/pkg/blockboundary" "github.com/grafana/tempo/pkg/boundedwaitgroup" "github.com/grafana/tempo/pkg/model/trace" "github.com/grafana/tempo/pkg/tempopb" - "github.com/opentracing/opentracing-go" ) const ( @@ -36,7 +36,7 @@ func newTraceByIDSharder(cfg *TraceByIDConfig, o overrides.Interface, logger log cfg: cfg, logger: logger, o: o, - blockBoundaries: createBlockBoundaries(cfg.QueryShards - 1), // one shard will be used to query ingesters + blockBoundaries: blockboundary.CreateBlockBoundaries(cfg.QueryShards - 1), // one shard will be used to query ingesters } }) } @@ -227,40 +227,6 @@ func (s *shardQuery) buildShardedRequests(parent *http.Request) ([]*http.Request return reqs, nil } -// createBlockBoundaries splits the range of blockIDs into queryShards parts -func createBlockBoundaries(queryShards int) [][]byte { - if queryShards == 0 { - return nil - } - - // create sharded queries - blockBoundaries := make([][]byte, queryShards+1) - for i := 0; i < queryShards+1; i++ { - blockBoundaries[i] = make([]byte, 16) - } - - // bucketSz is the min size for the bucket - bucketSz := (math.MaxUint64 / uint64(queryShards)) - // numLarger is the number of buckets that have to be bumped by 1 - numLarger := (math.MaxUint64 % uint64(queryShards)) - boundary := uint64(0) - for i := 0; i < queryShards; i++ { - binary.BigEndian.PutUint64(blockBoundaries[i][:8], boundary) - binary.BigEndian.PutUint64(blockBoundaries[i][8:], 0) - - boundary += bucketSz - if numLarger != 0 { - numLarger-- - boundary++ - } - } - - binary.BigEndian.PutUint64(blockBoundaries[queryShards][:8], math.MaxUint64) - binary.BigEndian.PutUint64(blockBoundaries[queryShards][8:], math.MaxUint64) - - return blockBoundaries -} - func shouldQuit(ctx context.Context, statusCode int, err error) bool { if err != nil { return true diff --git a/modules/frontend/tracebyidsharding_test.go b/modules/frontend/tracebyidsharding_test.go index 8956f5a0272..b428e4212e7 100644 --- a/modules/frontend/tracebyidsharding_test.go +++ b/modules/frontend/tracebyidsharding_test.go @@ -3,10 +3,8 @@ package frontend import ( "bytes" "context" - "encoding/binary" "errors" "io" - "math" "math/rand" "net/http" "net/http/httptest" @@ -16,84 +14,17 @@ import ( "github.com/go-kit/log" "github.com/gogo/protobuf/proto" "github.com/grafana/dskit/user" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/atomic" + "github.com/grafana/tempo/modules/overrides" + "github.com/grafana/tempo/pkg/blockboundary" "github.com/grafana/tempo/pkg/model/trace" "github.com/grafana/tempo/pkg/tempopb" "github.com/grafana/tempo/pkg/util/test" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "go.uber.org/atomic" ) -func TestCreateBlockBoundaries(t *testing.T) { - tests := []struct { - name string - queryShards int - expected [][]byte - }{ - { - name: "single shard", - queryShards: 1, - expected: [][]byte{ - {0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, - {0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff}, - }, - }, - { - name: "multiple shards", - queryShards: 4, - expected: [][]byte{ - {0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, - {0x40, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, - {0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, - {0xc0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, - {0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff}, - }, - }, - { - name: "large number of evenly divisible shards", - queryShards: 255, - }, - { - name: "large number of not evenly divisible shards", - queryShards: 1111, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - bb := createBlockBoundaries(tt.queryShards) - - if len(tt.expected) > 0 { - require.Len(t, bb, len(tt.expected)) - for i := 0; i < len(bb); i++ { - require.Equal(t, tt.expected[i], bb[i]) - } - } - - max := uint64(0) - min := uint64(math.MaxUint64) - - // test that the boundaries are in order - for i := 1; i < len(bb); i++ { - require.True(t, bytes.Compare(bb[i-1], bb[i]) < 0) - - prev := binary.BigEndian.Uint64(bb[i-1][:8]) - cur := binary.BigEndian.Uint64(bb[i][:8]) - dist := cur - prev - if dist > max { - max = dist - } - if dist < min { - min = dist - } - } - - // confirm that max - min <= 1. this means are boundaries are as fair as possible - require.LessOrEqual(t, max-min, uint64(1)) - }) - } -} - func TestBuildShardedRequests(t *testing.T) { queryShards := 2 @@ -101,7 +32,7 @@ func TestBuildShardedRequests(t *testing.T) { cfg: &TraceByIDConfig{ QueryShards: queryShards, }, - blockBoundaries: createBlockBoundaries(queryShards - 1), + blockBoundaries: blockboundary.CreateBlockBoundaries(queryShards - 1), } ctx := user.InjectOrgID(context.Background(), "blerg") diff --git a/modules/generator/processor/localblocks/processor.go b/modules/generator/processor/localblocks/processor.go index ebd17b3720c..a50350e78e3 100644 --- a/modules/generator/processor/localblocks/processor.go +++ b/modules/generator/processor/localblocks/processor.go @@ -571,7 +571,7 @@ func (p *Processor) reloadBlocks() error { return nil } - ids, err := r.Blocks(ctx, p.tenant) + ids, _, err := r.Blocks(ctx, p.tenant) if err != nil { return err } diff --git a/modules/ingester/ingester.go b/modules/ingester/ingester.go index 9300f9a9971..d4c9f8156f0 100644 --- a/modules/ingester/ingester.go +++ b/modules/ingester/ingester.go @@ -401,7 +401,7 @@ func (i *Ingester) rediscoverLocalBlocks() error { for _, t := range tenants { // check if any local blocks exist for a tenant before creating the instance. this is to protect us from cases // where left-over empty local tenant folders persist empty tenants - blocks, err := reader.Blocks(ctx, t) + blocks, _, err := reader.Blocks(ctx, t) if err != nil { return err } diff --git a/modules/ingester/instance.go b/modules/ingester/instance.go index be40c23b9b1..dfddbb1fa0a 100644 --- a/modules/ingester/instance.go +++ b/modules/ingester/instance.go @@ -561,7 +561,7 @@ func (i *instance) writeTraceToHeadBlock(id common.ID, b []byte, start, end uint } func (i *instance) rediscoverLocalBlocks(ctx context.Context) ([]*localBlock, error) { - ids, err := i.localReader.Blocks(ctx, i.instanceID) + ids, _, err := i.localReader.Blocks(ctx, i.instanceID) if err != nil { return nil, err } diff --git a/pkg/blockboundary/blockboundary.go b/pkg/blockboundary/blockboundary.go new file mode 100644 index 00000000000..9f05bcb07e7 --- /dev/null +++ b/pkg/blockboundary/blockboundary.go @@ -0,0 +1,40 @@ +package blockboundary + +import ( + "encoding/binary" + "math" +) + +// CreateBlockBoundaries splits the range of blockIDs into queryShards parts +func CreateBlockBoundaries(queryShards int) [][]byte { + if queryShards == 0 { + return nil + } + + // create sharded queries + blockBoundaries := make([][]byte, queryShards+1) + for i := 0; i < queryShards+1; i++ { + blockBoundaries[i] = make([]byte, 16) + } + + // bucketSz is the min size for the bucket + bucketSz := (math.MaxUint64 / uint64(queryShards)) + // numLarger is the number of buckets that have to be bumped by 1 + numLarger := (math.MaxUint64 % uint64(queryShards)) + boundary := uint64(0) + for i := 0; i < queryShards; i++ { + binary.BigEndian.PutUint64(blockBoundaries[i][:8], boundary) + binary.BigEndian.PutUint64(blockBoundaries[i][8:], 0) + + boundary += bucketSz + if numLarger != 0 { + numLarger-- + boundary++ + } + } + + binary.BigEndian.PutUint64(blockBoundaries[queryShards][:8], math.MaxUint64) + binary.BigEndian.PutUint64(blockBoundaries[queryShards][8:], math.MaxUint64) + + return blockBoundaries +} diff --git a/pkg/blockboundary/blockboundary_test.go b/pkg/blockboundary/blockboundary_test.go new file mode 100644 index 00000000000..54df596567d --- /dev/null +++ b/pkg/blockboundary/blockboundary_test.go @@ -0,0 +1,79 @@ +package blockboundary + +import ( + "bytes" + "encoding/binary" + "math" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestCreateBlockBoundaries(t *testing.T) { + tests := []struct { + name string + queryShards int + expected [][]byte + }{ + { + name: "single shard", + queryShards: 1, + expected: [][]byte{ + {0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, + {0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff}, + }, + }, + { + name: "multiple shards", + queryShards: 4, + expected: [][]byte{ + {0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, + {0x40, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, + {0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, + {0xc0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, + {0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff}, + }, + }, + { + name: "large number of evenly divisible shards", + queryShards: 255, + }, + { + name: "large number of not evenly divisible shards", + queryShards: 1111, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + bb := CreateBlockBoundaries(tt.queryShards) + + if len(tt.expected) > 0 { + require.Len(t, bb, len(tt.expected)) + for i := 0; i < len(bb); i++ { + require.Equal(t, tt.expected[i], bb[i]) + } + } + + max := uint64(0) + min := uint64(math.MaxUint64) + + // test that the boundaries are in order + for i := 1; i < len(bb); i++ { + require.True(t, bytes.Compare(bb[i-1], bb[i]) < 0) + + prev := binary.BigEndian.Uint64(bb[i-1][:8]) + cur := binary.BigEndian.Uint64(bb[i][:8]) + dist := cur - prev + if dist > max { + max = dist + } + if dist < min { + min = dist + } + } + + // confirm that max - min <= 1. this means are boundaries are as fair as possible + require.LessOrEqual(t, max-min, uint64(1)) + }) + } +} diff --git a/tempodb/backend/azure/v1/v1.go b/tempodb/backend/azure/v1/v1.go index 5f5bab9c7fa..79e99ac2b42 100644 --- a/tempodb/backend/azure/v1/v1.go +++ b/tempodb/backend/azure/v1/v1.go @@ -14,6 +14,7 @@ import ( blob "github.com/Azure/azure-storage-blob-go/azblob" "github.com/go-kit/log/level" + "github.com/google/uuid" "github.com/opentracing/opentracing-go" "github.com/grafana/tempo/pkg/util/log" @@ -159,6 +160,72 @@ func (rw *V1) List(ctx context.Context, keypath backend.KeyPath) ([]string, erro return objects, nil } +// ListBlocks implements backend.Reader +func (rw *V1) ListBlocks(ctx context.Context, tenant string) ([]uuid.UUID, []uuid.UUID, error) { + span, ctx := opentracing.StartSpanFromContext(ctx, "V1.ListBlocks") + defer span.Finish() + + var ( + blockIDs = make([]uuid.UUID, 0, 1000) + compactedBlockIDs = make([]uuid.UUID, 0, 1000) + keypath = backend.KeyPathWithPrefix(backend.KeyPath{tenant}, rw.cfg.Prefix) + marker = blob.Marker{} + parts []string + id uuid.UUID + ) + + prefix := path.Join(keypath...) + if len(prefix) > 0 { + prefix += dir + } + + for { + res, err := rw.containerURL.ListBlobsFlatSegment(ctx, marker, blob.ListBlobsSegmentOptions{ + Prefix: prefix, + Details: blob.BlobListingDetails{}, + }) + if err != nil { + return nil, nil, fmt.Errorf("iterating objects: %w", err) + } + marker = res.NextMarker + + for _, blob := range res.Segment.BlobItems { + obj := strings.TrimPrefix(strings.TrimSuffix(blob.Name, dir), prefix) + parts = strings.Split(obj, "/") + + // ie: /meta.json + if len(parts) != 2 { + continue + } + + switch parts[1] { + case backend.MetaName, backend.CompactedMetaName: + default: + continue + } + + id, err = uuid.Parse(parts[0]) + if err != nil { + return nil, nil, err + } + + switch parts[1] { + case backend.MetaName: + blockIDs = append(blockIDs, id) + case backend.CompactedMetaName: + compactedBlockIDs = append(compactedBlockIDs, id) + } + + } + + // Continue iterating if we are not done. + if !marker.NotDone() { + break + } + } + return blockIDs, compactedBlockIDs, nil +} + // Read implements backend.Reader func (rw *V1) Read(ctx context.Context, name string, keypath backend.KeyPath, _ bool) (io.ReadCloser, int64, error) { keypath = backend.KeyPathWithPrefix(keypath, rw.cfg.Prefix) diff --git a/tempodb/backend/azure/v2/v2.go b/tempodb/backend/azure/v2/v2.go index 3027eb71fb4..8973a9a91a6 100644 --- a/tempodb/backend/azure/v2/v2.go +++ b/tempodb/backend/azure/v2/v2.go @@ -20,6 +20,7 @@ import ( "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blockblob" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container" "github.com/go-kit/log/level" + "github.com/google/uuid" "github.com/opentracing/opentracing-go" "github.com/grafana/tempo/pkg/util/log" @@ -164,6 +165,68 @@ func (rw *V2) List(ctx context.Context, keypath backend.KeyPath) ([]string, erro return objects, nil } +// ListBlocks implements backend.Reader +func (rw *V2) ListBlocks(ctx context.Context, tenant string) ([]uuid.UUID, []uuid.UUID, error) { + span, ctx := opentracing.StartSpanFromContext(ctx, "V2.ListBlocks") + defer span.Finish() + + var ( + blockIDs = make([]uuid.UUID, 0, 1000) + compactedBlockIDs = make([]uuid.UUID, 0, 1000) + keypath = backend.KeyPathWithPrefix(backend.KeyPath{tenant}, rw.cfg.Prefix) + parts []string + id uuid.UUID + ) + + prefix := path.Join(keypath...) + if len(prefix) > 0 { + prefix += dir + } + + pager := rw.containerClient.NewListBlobsFlatPager(&container.ListBlobsFlatOptions{ + Include: container.ListBlobsInclude{}, + Prefix: &prefix, + }) + + for pager.More() { + page, err := pager.NextPage(ctx) + if err != nil { + return nil, nil, fmt.Errorf("iterating objects: %w", err) + } + + for _, b := range page.Segment.BlobItems { + if b.Name == nil { + continue + } + + obj := strings.TrimPrefix(strings.TrimSuffix(*b.Name, dir), prefix) + parts = strings.Split(obj, "/") + + // ie: /meta.json + if len(parts) != 2 { + continue + } + + if parts[1] != backend.MetaName && parts[1] != backend.CompactedMetaName { + continue + } + + id, err = uuid.Parse(parts[0]) + if err != nil { + return nil, nil, err + } + + switch parts[1] { + case backend.MetaName: + blockIDs = append(blockIDs, id) + case backend.CompactedMetaName: + compactedBlockIDs = append(compactedBlockIDs, id) + } + } + } + return blockIDs, compactedBlockIDs, nil +} + // Read implements backend.Reader func (rw *V2) Read(ctx context.Context, name string, keypath backend.KeyPath, _ bool) (io.ReadCloser, int64, error) { keypath = backend.KeyPathWithPrefix(keypath, rw.cfg.Prefix) diff --git a/tempodb/backend/backend.go b/tempodb/backend/backend.go index 3c2bfa661ba..03e64bdc7d3 100644 --- a/tempodb/backend/backend.go +++ b/tempodb/backend/backend.go @@ -20,6 +20,8 @@ var ( ErrEmptyTenantID = fmt.Errorf("empty tenant id") ErrEmptyBlockID = fmt.Errorf("empty block id") ErrBadSeedFile = fmt.Errorf("bad seed file") + + GlobalMaxBlockID = uuid.MustParse("ffffffff-ffff-ffff-ffff-ffffffffffff") ) // AppendTracker is an empty interface usable by the backend to track a long running append operation @@ -53,8 +55,8 @@ type Reader interface { ReadRange(ctx context.Context, name string, blockID uuid.UUID, tenantID string, offset uint64, buffer []byte, shouldCache bool) error // Tenants returns a list of all tenants in a backend Tenants(ctx context.Context) ([]string, error) - // Blocks returns a list of block UUIDs given a tenant - Blocks(ctx context.Context, tenantID string) ([]uuid.UUID, error) + // Blocks returns the blockIDs, compactedBlockIDs and an error from the backend. + Blocks(ctx context.Context, tenantID string) (blockIDs []uuid.UUID, compactedBlockIDs []uuid.UUID, err error) // BlockMeta returns the blockmeta given a block and tenant id BlockMeta(ctx context.Context, blockID uuid.UUID, tenantID string) (*BlockMeta, error) // TenantIndex returns lists of all metas given a tenant diff --git a/tempodb/backend/cache/cache.go b/tempodb/backend/cache/cache.go index 7e13f9b4ca3..a9f3ebc4a54 100644 --- a/tempodb/backend/cache/cache.go +++ b/tempodb/backend/cache/cache.go @@ -7,6 +7,7 @@ import ( "strconv" "strings" + "github.com/google/uuid" "github.com/grafana/tempo/pkg/cache" tempo_io "github.com/grafana/tempo/pkg/io" @@ -34,6 +35,10 @@ func (r *readerWriter) List(ctx context.Context, keypath backend.KeyPath) ([]str return r.nextReader.List(ctx, keypath) } +func (r *readerWriter) ListBlocks(ctx context.Context, tenant string) (blockIDs []uuid.UUID, compactedBlockIDs []uuid.UUID, err error) { + return r.nextReader.ListBlocks(ctx, tenant) +} + // Read implements backend.RawReader func (r *readerWriter) Read(ctx context.Context, name string, keypath backend.KeyPath, shouldCache bool) (io.ReadCloser, int64, error) { var k string diff --git a/tempodb/backend/gcs/config.go b/tempodb/backend/gcs/config.go index 206f3f2d845..598fbd1586e 100644 --- a/tempodb/backend/gcs/config.go +++ b/tempodb/backend/gcs/config.go @@ -8,20 +8,22 @@ import ( ) type Config struct { - BucketName string `yaml:"bucket_name"` - Prefix string `yaml:"prefix"` - ChunkBufferSize int `yaml:"chunk_buffer_size"` - Endpoint string `yaml:"endpoint"` - HedgeRequestsAt time.Duration `yaml:"hedge_requests_at"` - HedgeRequestsUpTo int `yaml:"hedge_requests_up_to"` - Insecure bool `yaml:"insecure"` - ObjectCacheControl string `yaml:"object_cache_control"` - ObjectMetadata map[string]string `yaml:"object_metadata"` + BucketName string `yaml:"bucket_name"` + Prefix string `yaml:"prefix"` + ChunkBufferSize int `yaml:"chunk_buffer_size"` + Endpoint string `yaml:"endpoint"` + HedgeRequestsAt time.Duration `yaml:"hedge_requests_at"` + HedgeRequestsUpTo int `yaml:"hedge_requests_up_to"` + Insecure bool `yaml:"insecure"` + ObjectCacheControl string `yaml:"object_cache_control"` + ObjectMetadata map[string]string `yaml:"object_metadata"` + ListBlocksConcurrency int `yaml:"list_blocks_concurrency"` } func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) { f.StringVar(&cfg.BucketName, util.PrefixConfig(prefix, "gcs.bucket"), "", "gcs bucket to store traces in.") f.StringVar(&cfg.Prefix, util.PrefixConfig(prefix, "gcs.prefix"), "", "gcs bucket prefix to store traces in.") + f.IntVar(&cfg.ListBlocksConcurrency, util.PrefixConfig(prefix, "gcs.list_blocks_concurrency"), 3, "number of concurrent list calls to make to backend") cfg.ChunkBufferSize = 10 * 1024 * 1024 cfg.HedgeRequestsUpTo = 2 } diff --git a/tempodb/backend/gcs/gcs.go b/tempodb/backend/gcs/gcs.go index cfd6f982aff..2a5509e95af 100644 --- a/tempodb/backend/gcs/gcs.go +++ b/tempodb/backend/gcs/gcs.go @@ -11,6 +11,9 @@ import ( "path" "strconv" "strings" + "sync" + + "github.com/google/uuid" "github.com/grafana/tempo/tempodb/backend/instrumentation" @@ -21,6 +24,7 @@ import ( "google.golang.org/api/option" google_http "google.golang.org/api/transport/http" + "github.com/grafana/tempo/pkg/blockboundary" tempo_io "github.com/grafana/tempo/pkg/io" "github.com/grafana/tempo/tempodb/backend" ) @@ -186,6 +190,124 @@ func (rw *readerWriter) List(ctx context.Context, keypath backend.KeyPath) ([]st return objects, nil } +// ListBlocks implements backend.Reader +func (rw *readerWriter) ListBlocks(ctx context.Context, tenant string) ([]uuid.UUID, []uuid.UUID, error) { + span, ctx := opentracing.StartSpanFromContext(ctx, "readerWriter.ListBlocks") + defer span.Finish() + + var ( + wg sync.WaitGroup + mtx sync.Mutex + bb = blockboundary.CreateBlockBoundaries(rw.cfg.ListBlocksConcurrency) + errChan = make(chan error, len(bb)) + keypath = backend.KeyPathWithPrefix(backend.KeyPath{tenant}, rw.cfg.Prefix) + min uuid.UUID + max uuid.UUID + blockIDs = make([]uuid.UUID, 0, 1000) + compactedBlockIDs = make([]uuid.UUID, 0, 1000) + ) + + prefix := path.Join(keypath...) + if len(prefix) > 0 { + prefix += "/" + } + + for i := 0; i < len(bb)-1; i++ { + min = uuid.UUID(bb[i]) + max = uuid.UUID(bb[i+1]) + + wg.Add(1) + go func(min, max uuid.UUID) { + defer wg.Done() + + var ( + query = &storage.Query{ + Prefix: prefix, + Delimiter: "", + Versions: false, + StartOffset: prefix + min.String(), + } + parts []string + id uuid.UUID + ) + + // If max is global max, then we don't want to set an end offset to + // ensure we reach the end. EndOffset is exclusive. + if max != backend.GlobalMaxBlockID { + query.EndOffset = prefix + max.String() + } + + iter := rw.bucket.Objects(ctx, query) + + for { + if ctx.Err() != nil { + return + } + + attrs, err := iter.Next() + if errors.Is(err, iterator.Done) { + return + } + if err != nil { + errChan <- fmt.Errorf("iterating blocks: %w", err) + return + } + + parts = strings.Split(attrs.Name, "/") + // ie: //meta.json + if len(parts) != 3 { + continue + } + + switch parts[2] { + case backend.MetaName: + case backend.CompactedMetaName: + default: + continue + } + + id, err = uuid.Parse(parts[1]) + if err != nil { + continue + } + + if bytes.Compare(id[:], min[:]) < 0 { + errChan <- fmt.Errorf("block UUID below shard minimum") + return + } + + if max != backend.GlobalMaxBlockID { + if bytes.Compare(id[:], max[:]) >= 0 { + return + } + } + + mtx.Lock() + switch parts[2] { + case backend.MetaName: + blockIDs = append(blockIDs, id) + case backend.CompactedMetaName: + compactedBlockIDs = append(compactedBlockIDs, id) + } + mtx.Unlock() + } + }(min, max) + } + wg.Wait() + close(errChan) + + errs := make([]error, 0, len(errChan)) + for e := range errChan { + errs = append(errs, e) + } + + if len(errs) > 0 { + return nil, nil, errors.Join(errs...) + } + + return blockIDs, compactedBlockIDs, nil +} + // Read implements backend.Reader func (rw *readerWriter) Read(ctx context.Context, name string, keypath backend.KeyPath, _ bool) (io.ReadCloser, int64, error) { keypath = backend.KeyPathWithPrefix(keypath, rw.cfg.Prefix) @@ -377,6 +499,7 @@ func createBucket(ctx context.Context, cfg *Config, hedge bool) (*storage.Bucket } if cfg.Endpoint != "" { storageClientOptions = append(storageClientOptions, option.WithEndpoint(cfg.Endpoint)) + storageClientOptions = append(storageClientOptions, storage.WithJSONReads()) } client, err := storage.NewClient(ctx, storageClientOptions...) if err != nil { diff --git a/tempodb/backend/local/local.go b/tempodb/backend/local/local.go index 169288222a3..4daef4a086b 100644 --- a/tempodb/backend/local/local.go +++ b/tempodb/backend/local/local.go @@ -3,8 +3,10 @@ package local import ( "context" "io" + "io/fs" "os" "path/filepath" + "strings" "github.com/google/uuid" "github.com/opentracing/opentracing-go" @@ -148,6 +150,45 @@ func (rw *Backend) List(ctx context.Context, keypath backend.KeyPath) ([]string, return objects, nil } +// ListBlocks implements backend.Reader +func (rw *Backend) ListBlocks(_ context.Context, tenant string) (metas []uuid.UUID, compactedMetas []uuid.UUID, err error) { + rootPath := rw.rootPath(backend.KeyPath{tenant}) + fff := os.DirFS(rootPath) + err = fs.WalkDir(fff, ".", func(path string, d fs.DirEntry, err error) error { + if err != nil { + return err + } + + tenantFilePath := filepath.Join(tenant, path) + + parts := strings.Split(tenantFilePath, "/") + // i.e: /meta + if len(parts) != 3 { + return nil + } + + if parts[2] != backend.MetaName && parts[2] != backend.CompactedMetaName { + return nil + } + + id, err := uuid.Parse(parts[1]) + if err != nil { + return err + } + + switch parts[2] { + case backend.MetaName: + metas = append(metas, id) + case backend.CompactedMetaName: + compactedMetas = append(compactedMetas, id) + } + + return nil + }) + + return +} + // Read implements backend.Reader func (rw *Backend) Read(ctx context.Context, name string, keypath backend.KeyPath, _ bool) (io.ReadCloser, int64, error) { if err := ctx.Err(); err != nil { diff --git a/tempodb/backend/local/local_test.go b/tempodb/backend/local/local_test.go index 11c1008ecb8..4e623b08160 100644 --- a/tempodb/backend/local/local_test.go +++ b/tempodb/backend/local/local_test.go @@ -51,6 +51,11 @@ func TestReadWrite(t *testing.T) { fakeMeta.TenantID = id err = w.Write(ctx, objectName, backend.KeyPathForBlock(fakeMeta.BlockID, id), bytes.NewReader(fakeObject), int64(len(fakeObject)), false) assert.NoError(t, err, "unexpected error writing") + + err = w.Write(ctx, backend.MetaName, backend.KeyPathForBlock(fakeMeta.BlockID, id), bytes.NewReader(fakeObject), int64(len(fakeObject)), false) + assert.NoError(t, err, "unexpected error meta.json") + err = w.Write(ctx, backend.CompactedMetaName, backend.KeyPathForBlock(fakeMeta.BlockID, id), bytes.NewReader(fakeObject), int64(len(fakeObject)), false) + assert.NoError(t, err, "unexpected error meta.compacted.json") } actualObject, size, err := r.Read(ctx, objectName, backend.KeyPathForBlock(blockID, tenantIDs[0]), false) @@ -65,9 +70,14 @@ func TestReadWrite(t *testing.T) { assert.Equal(t, fakeObject[5:10], actualReadRange) list, err := r.List(ctx, backend.KeyPath{tenantIDs[0]}) - assert.NoError(t, err, "unexpected error reading blocklist") + assert.NoError(t, err, "unexpected error listing") assert.Len(t, list, 1) assert.Equal(t, blockID.String(), list[0]) + + m, cm, err := r.ListBlocks(ctx, tenantIDs[0]) + assert.NoError(t, err, "unexpected error listing blocks") + assert.Len(t, m, 1) + assert.Len(t, cm, 1) } func TestShutdownLeavesTenantsWithBlocks(t *testing.T) { diff --git a/tempodb/backend/mocks.go b/tempodb/backend/mocks.go index 49124e587ea..f539606e354 100644 --- a/tempodb/backend/mocks.go +++ b/tempodb/backend/mocks.go @@ -5,6 +5,7 @@ import ( "context" "io" "strings" + "sync" tempo_io "github.com/grafana/tempo/pkg/io" @@ -21,11 +22,16 @@ var ( // MockRawReader type MockRawReader struct { - L []string - ListFn func(ctx context.Context, keypath KeyPath) ([]string, error) - R []byte // read - Range []byte // ReadRange - ReadFn func(ctx context.Context, name string, keypath KeyPath, shouldCache bool) (io.ReadCloser, int64, error) + L []string + ListFn func(ctx context.Context, keypath KeyPath) ([]string, error) + ListBlocksFn func(ctx context.Context, tenant string) ([]uuid.UUID, []uuid.UUID, error) + R []byte // read + Range []byte // ReadRange + ReadFn func(ctx context.Context, name string, keypath KeyPath, shouldCache bool) (io.ReadCloser, int64, error) + + BlockIDs []uuid.UUID + CompactedBlockIDs []uuid.UUID + FindResult []string } func (m *MockRawReader) List(ctx context.Context, keypath KeyPath) ([]string, error) { @@ -36,6 +42,14 @@ func (m *MockRawReader) List(ctx context.Context, keypath KeyPath) ([]string, er return m.L, nil } +func (m *MockRawReader) ListBlocks(ctx context.Context, tenant string) ([]uuid.UUID, []uuid.UUID, error) { + if m.ListBlocksFn != nil { + return m.ListBlocksFn(ctx, tenant) + } + + return m.BlockIDs, m.CompactedBlockIDs, nil +} + func (m *MockRawReader) Read(ctx context.Context, name string, keypath KeyPath, shouldCache bool) (io.ReadCloser, int64, error) { if m.ReadFn != nil { return m.ReadFn(ctx, name, keypath, shouldCache) @@ -49,6 +63,7 @@ func (m *MockRawReader) ReadRange(_ context.Context, _ string, _ KeyPath, _ uint return nil } + func (m *MockRawReader) Shutdown() {} // MockRawWriter @@ -95,7 +110,10 @@ func (m *MockRawWriter) Delete(_ context.Context, name string, keypath KeyPath, // MockCompactor type MockCompactor struct { - BlockMetaFn func(blockID uuid.UUID, tenantID string) (*CompactedBlockMeta, error) + sync.Mutex + + BlockMetaFn func(blockID uuid.UUID, tenantID string) (*CompactedBlockMeta, error) + CompactedBlockMetaCalls map[string]map[uuid.UUID]int } func (c *MockCompactor) MarkBlockCompacted(uuid.UUID, string) error { @@ -107,35 +125,61 @@ func (c *MockCompactor) ClearBlock(uuid.UUID, string) error { } func (c *MockCompactor) CompactedBlockMeta(blockID uuid.UUID, tenantID string) (*CompactedBlockMeta, error) { + c.Lock() + defer c.Unlock() + if c.CompactedBlockMetaCalls == nil { + c.CompactedBlockMetaCalls = make(map[string]map[uuid.UUID]int) + } + if _, ok := c.CompactedBlockMetaCalls[tenantID]; !ok { + c.CompactedBlockMetaCalls[tenantID] = make(map[uuid.UUID]int) + } + c.CompactedBlockMetaCalls[tenantID][blockID]++ + return c.BlockMetaFn(blockID, tenantID) } // MockReader type MockReader struct { - T []string - B []uuid.UUID // blocks - BlockFn func(ctx context.Context, tenantID string) ([]uuid.UUID, error) - M *BlockMeta // meta - BlockMetaFn func(ctx context.Context, blockID uuid.UUID, tenantID string) (*BlockMeta, error) - TenantIndexFn func(ctx context.Context, tenantID string) (*TenantIndex, error) - R []byte // read - Range []byte // ReadRange - ReadFn func(name string, blockID uuid.UUID, tenantID string) ([]byte, error) + sync.Mutex + + T []string + BlocksFn func(ctx context.Context, tenantID string) ([]uuid.UUID, []uuid.UUID, error) + M *BlockMeta // meta + BlockMetaFn func(ctx context.Context, blockID uuid.UUID, tenantID string) (*BlockMeta, error) + TenantIndexFn func(ctx context.Context, tenantID string) (*TenantIndex, error) + R []byte // read + Range []byte // ReadRange + ReadFn func(name string, blockID uuid.UUID, tenantID string) ([]byte, error) + BlockMetaCalls map[string]map[uuid.UUID]int + BlockIDs []uuid.UUID // blocks + CompactedBlockIDs []uuid.UUID // blocks } func (m *MockReader) Tenants(context.Context) ([]string, error) { return m.T, nil } -func (m *MockReader) Blocks(ctx context.Context, tenantID string) ([]uuid.UUID, error) { - if m.BlockFn != nil { - return m.BlockFn(ctx, tenantID) +func (m *MockReader) Blocks(ctx context.Context, tenantID string) ([]uuid.UUID, []uuid.UUID, error) { + if m.BlocksFn != nil { + return m.BlocksFn(ctx, tenantID) } - return m.B, nil + return m.BlockIDs, m.CompactedBlockIDs, nil } func (m *MockReader) BlockMeta(ctx context.Context, blockID uuid.UUID, tenantID string) (*BlockMeta, error) { + m.Lock() + defer m.Unlock() + + // Update the BlockMetaCalls map based on the tenantID and blockID + if m.BlockMetaCalls == nil { + m.BlockMetaCalls = make(map[string]map[uuid.UUID]int) + } + if _, ok := m.BlockMetaCalls[tenantID]; !ok { + m.BlockMetaCalls[tenantID] = make(map[uuid.UUID]int) + } + m.BlockMetaCalls[tenantID][blockID]++ + if m.BlockMetaFn != nil { return m.BlockMetaFn(ctx, blockID, tenantID) } @@ -173,6 +217,7 @@ func (m *MockReader) Shutdown() {} // MockWriter type MockWriter struct { + sync.Mutex IndexMeta map[string][]*BlockMeta IndexCompactedMeta map[string][]*CompactedBlockMeta } @@ -198,6 +243,9 @@ func (m *MockWriter) CloseAppend(context.Context, AppendTracker) error { } func (m *MockWriter) WriteTenantIndex(_ context.Context, tenantID string, meta []*BlockMeta, compactedMeta []*CompactedBlockMeta) error { + m.Lock() + defer m.Unlock() + if m.IndexMeta == nil { m.IndexMeta = make(map[string][]*BlockMeta) } @@ -208,3 +256,16 @@ func (m *MockWriter) WriteTenantIndex(_ context.Context, tenantID string, meta [ m.IndexCompactedMeta[tenantID] = compactedMeta return nil } + +type MockBlocklist struct { + MetasFn func(tenantID string) []*BlockMeta + CompactedMetasFn func(tenantID string) []*CompactedBlockMeta +} + +func (m *MockBlocklist) Metas(tenantID string) []*BlockMeta { + return m.MetasFn(tenantID) +} + +func (m *MockBlocklist) CompactedMetas(tenantID string) []*CompactedBlockMeta { + return m.CompactedMetasFn(tenantID) +} diff --git a/tempodb/backend/raw.go b/tempodb/backend/raw.go index a17707d3975..6b62a12a260 100644 --- a/tempodb/backend/raw.go +++ b/tempodb/backend/raw.go @@ -5,7 +5,6 @@ import ( "context" "encoding/json" "errors" - "fmt" "io" "path" @@ -22,9 +21,12 @@ const ( ClusterSeedFileName = "tempo_cluster_seed.json" ) -// KeyPath is an ordered set of strings that govern where data is read/written from the backend +// KeyPath is an ordered set of strings that govern where data is read/written +// from the backend type KeyPath []string +type Feature int + // RawWriter is a collection of methods to write data to tempodb backends type RawWriter interface { // Write is for in memory data. shouldCache specifies whether or not caching should be attempted. @@ -41,6 +43,8 @@ type RawWriter interface { type RawReader interface { // List returns all objects one level beneath the provided keypath List(ctx context.Context, keypath KeyPath) ([]string, error) + // ListBlocks returns all blockIDs and compactedBlockIDs for a tenant. + ListBlocks(ctx context.Context, tenant string) (blockIDs []uuid.UUID, compactedBlockIDs []uuid.UUID, err error) // Read is for streaming entire objects from the backend. There will be an attempt to retrieve this from cache if shouldCache is true. Read(ctx context.Context, name string, keyPath KeyPath, shouldCache bool) (io.ReadCloser, int64, error) // ReadRange is for reading parts of large objects from the backend. @@ -61,14 +65,23 @@ func NewWriter(w RawWriter) Writer { } } +// TODO: these objects are not raw, so perhaps they could move somewhere else. +// var ( +// x RawReader = reader{} +// y RawWriter = writer{} +// ) + +// Write implements backend.Writer func (w *writer) Write(ctx context.Context, name string, blockID uuid.UUID, tenantID string, buffer []byte, shouldCache bool) error { return w.w.Write(ctx, name, KeyPathForBlock(blockID, tenantID), bytes.NewReader(buffer), int64(len(buffer)), shouldCache) } +// Write implements backend.Writer func (w *writer) StreamWriter(ctx context.Context, name string, blockID uuid.UUID, tenantID string, data io.Reader, size int64) error { return w.w.Write(ctx, name, KeyPathForBlock(blockID, tenantID), data, size, false) } +// Write implements backend.Writer func (w *writer) WriteBlockMeta(ctx context.Context, meta *BlockMeta) error { blockID := meta.BlockID tenantID := meta.TenantID @@ -81,14 +94,17 @@ func (w *writer) WriteBlockMeta(ctx context.Context, meta *BlockMeta) error { return w.w.Write(ctx, MetaName, KeyPathForBlock(blockID, tenantID), bytes.NewReader(bMeta), int64(len(bMeta)), false) } +// Write implements backend.Writer func (w *writer) Append(ctx context.Context, name string, blockID uuid.UUID, tenantID string, tracker AppendTracker, buffer []byte) (AppendTracker, error) { return w.w.Append(ctx, name, KeyPathForBlock(blockID, tenantID), tracker, buffer) } +// Write implements backend.Writer func (w *writer) CloseAppend(ctx context.Context, tracker AppendTracker) error { return w.w.CloseAppend(ctx, tracker) } +// Write implements backend.Writer func (w *writer) WriteTenantIndex(ctx context.Context, tenantID string, meta []*BlockMeta, compactedMeta []*CompactedBlockMeta) error { // If meta and compactedMeta are empty, call delete the tenant index. if len(meta) == 0 && len(compactedMeta) == 0 { @@ -126,6 +142,7 @@ func NewReader(r RawReader) Reader { } } +// Read implements backend.Reader func (r *reader) Read(ctx context.Context, name string, blockID uuid.UUID, tenantID string, shouldCache bool) ([]byte, error) { objReader, size, err := r.r.Read(ctx, name, KeyPathForBlock(blockID, tenantID), shouldCache) if err != nil { @@ -135,14 +152,17 @@ func (r *reader) Read(ctx context.Context, name string, blockID uuid.UUID, tenan return tempo_io.ReadAllWithEstimate(objReader, size) } +// StreamReader implements backend.Reader func (r *reader) StreamReader(ctx context.Context, name string, blockID uuid.UUID, tenantID string) (io.ReadCloser, int64, error) { return r.r.Read(ctx, name, KeyPathForBlock(blockID, tenantID), false) } +// ReadRange implements backend.Reader func (r *reader) ReadRange(ctx context.Context, name string, blockID uuid.UUID, tenantID string, offset uint64, buffer []byte, shouldCache bool) error { return r.r.ReadRange(ctx, name, KeyPathForBlock(blockID, tenantID), offset, buffer, shouldCache) } +// Tenants implements backend.Reader func (r *reader) Tenants(ctx context.Context) ([]string, error) { list, err := r.r.List(ctx, nil) @@ -157,30 +177,12 @@ func (r *reader) Tenants(ctx context.Context) ([]string, error) { return filteredList, err } -func (r *reader) Blocks(ctx context.Context, tenantID string) ([]uuid.UUID, error) { - objects, err := r.r.List(ctx, KeyPath{tenantID}) - if err != nil { - return nil, err - } - - // translate everything to UUIDs, if we see a bucket index we can skip that - blockIDs := make([]uuid.UUID, 0, len(objects)) - for _, id := range objects { - // TODO: this line exists due to behavior differences in backends: https://github.com/grafana/tempo/issues/880 - // revisit once #880 is resolved. - if id == TenantIndexName || id == "" { - continue - } - uuid, err := uuid.Parse(id) - if err != nil { - return nil, fmt.Errorf("failed to parse %s: %w", id, err) - } - blockIDs = append(blockIDs, uuid) - } - - return blockIDs, nil +// Blocks implements backend.Reader +func (r *reader) Blocks(ctx context.Context, tenantID string) ([]uuid.UUID, []uuid.UUID, error) { + return r.r.ListBlocks(ctx, tenantID) } +// BlockMeta implements backend.Reader func (r *reader) BlockMeta(ctx context.Context, blockID uuid.UUID, tenantID string) (*BlockMeta, error) { reader, size, err := r.r.Read(ctx, MetaName, KeyPathForBlock(blockID, tenantID), false) if err != nil { @@ -202,6 +204,7 @@ func (r *reader) BlockMeta(ctx context.Context, blockID uuid.UUID, tenantID stri return out, nil } +// TenantIndex implements backend.Reader func (r *reader) TenantIndex(ctx context.Context, tenantID string) (*TenantIndex, error) { reader, size, err := r.r.Read(ctx, TenantIndexName, KeyPath([]string{tenantID}), false) if err != nil { @@ -224,6 +227,7 @@ func (r *reader) TenantIndex(ctx context.Context, tenantID string) (*TenantIndex return i, nil } +// Shutdown implements backend.Reader func (r *reader) Shutdown() { r.r.Shutdown() } diff --git a/tempodb/backend/raw_test.go b/tempodb/backend/raw_test.go index 5a680415483..b3b61518fde 100644 --- a/tempodb/backend/raw_test.go +++ b/tempodb/backend/raw_test.go @@ -89,13 +89,18 @@ func TestReader(t *testing.T) { assert.NoError(t, err) assert.Equal(t, expectedTenants, actualTenants) - uuid1 := uuid.New() - uuid2 := uuid.New() + uuid1, uuid2, uuid3 := uuid.New(), uuid.New(), uuid.New() expectedBlocks := []uuid.UUID{uuid1, uuid2} - m.L = []string{uuid1.String(), uuid2.String()} - actualBlocks, err := r.Blocks(ctx, "test") + expectedCompactedBlocks := []uuid.UUID{uuid3} + + m.BlockIDs = append(m.BlockIDs, uuid1) + m.BlockIDs = append(m.BlockIDs, uuid2) + m.CompactedBlockIDs = append(m.CompactedBlockIDs, uuid3) + + actualBlocks, actualCompactedBlocks, err := r.Blocks(ctx, "test") assert.NoError(t, err) assert.Equal(t, expectedBlocks, actualBlocks) + assert.Equal(t, expectedCompactedBlocks, actualCompactedBlocks) // should fail b/c meta is not valid meta, err := r.BlockMeta(ctx, uuid.New(), "test") diff --git a/tempodb/backend/s3/config.go b/tempodb/backend/s3/config.go index e20be31f16e..2c25c64fd67 100644 --- a/tempodb/backend/s3/config.go +++ b/tempodb/backend/s3/config.go @@ -33,7 +33,8 @@ type Config struct { Metadata map[string]string `yaml:"metadata"` // Deprecated // See https://github.com/grafana/tempo/pull/3006 for more details - NativeAWSAuthEnabled bool `yaml:"native_aws_auth_enabled"` + NativeAWSAuthEnabled bool `yaml:"native_aws_auth_enabled"` + ListBlocksConcurrency int `yaml:"list_blocks_concurrency"` } func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) { @@ -44,6 +45,7 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) f.StringVar(&cfg.MinVersion, util.PrefixConfig(prefix, "s3.tls_min_version"), "VersionTLS12", "minimum version of TLS to use when connecting to s3.") f.Var(&cfg.SecretKey, util.PrefixConfig(prefix, "s3.secret_key"), "s3 secret key.") f.Var(&cfg.SessionToken, util.PrefixConfig(prefix, "s3.session_token"), "s3 session token.") + f.IntVar(&cfg.ListBlocksConcurrency, util.PrefixConfig(prefix, "s3.list_blocks_concurrency"), 3, "number of concurrent list calls to make to backend") cfg.HedgeRequestsUpTo = 2 } diff --git a/tempodb/backend/s3/s3.go b/tempodb/backend/s3/s3.go index c8deb2ba851..cd8914d8bac 100644 --- a/tempodb/backend/s3/s3.go +++ b/tempodb/backend/s3/s3.go @@ -10,6 +10,9 @@ import ( "os" "path" "strings" + "sync" + + "github.com/google/uuid" "github.com/grafana/tempo/tempodb/backend/instrumentation" @@ -21,6 +24,7 @@ import ( "github.com/minio/minio-go/v7/pkg/credentials" "github.com/opentracing/opentracing-go" + "github.com/grafana/tempo/pkg/blockboundary" tempo_io "github.com/grafana/tempo/pkg/io" "github.com/grafana/tempo/pkg/util/log" "github.com/grafana/tempo/tempodb/backend" @@ -277,6 +281,116 @@ func (rw *readerWriter) List(_ context.Context, keypath backend.KeyPath) ([]stri return objects, nil } +func (rw *readerWriter) ListBlocks( + ctx context.Context, + tenant string, +) ([]uuid.UUID, []uuid.UUID, error) { + span, ctx := opentracing.StartSpanFromContext(ctx, "readerWriter.ListBlocks") + defer span.Finish() + + blockIDs := make([]uuid.UUID, 0, 1000) + compactedBlockIDs := make([]uuid.UUID, 0, 1000) + + keypath := backend.KeyPathWithPrefix(backend.KeyPath{tenant}, rw.cfg.Prefix) + prefix := path.Join(keypath...) + if len(prefix) > 0 { + prefix += "/" + } + + bb := blockboundary.CreateBlockBoundaries(rw.cfg.ListBlocksConcurrency) + + errChan := make(chan error, len(bb)) + wg := sync.WaitGroup{} + mtx := sync.Mutex{} + + var min uuid.UUID + var max uuid.UUID + + for i := 0; i < len(bb)-1; i++ { + + min = uuid.UUID(bb[i]) + max = uuid.UUID(bb[i+1]) + + wg.Add(1) + go func(min, max uuid.UUID) { + defer wg.Done() + + var ( + err error + res minio.ListBucketV2Result + startAfter = prefix + min.String() + ) + + for res.IsTruncated = true; res.IsTruncated; { + if ctx.Err() != nil { + return + } + + res, err = rw.core.ListObjectsV2(rw.cfg.Bucket, prefix, startAfter, res.NextContinuationToken, "", 0) + if err != nil { + errChan <- fmt.Errorf("error finding objects in s3 bucket, bucket: %s: %w", rw.cfg.Bucket, err) + return + } + + for _, c := range res.Contents { + // i.e: /meta + parts := strings.Split(c.Key, "/") + if len(parts) != 3 { + continue + } + + switch parts[2] { + case backend.MetaName: + case backend.CompactedMetaName: + default: + continue + } + + id, err := uuid.Parse(parts[1]) + if err != nil { + continue + } + + if bytes.Compare(id[:], min[:]) < 0 { + errChan <- fmt.Errorf("block UUID below shard minimum") + return + } + + if max != backend.GlobalMaxBlockID { + if bytes.Compare(id[:], max[:]) >= 0 { + return + } + } + + mtx.Lock() + switch parts[2] { + case backend.MetaName: + blockIDs = append(blockIDs, id) + case backend.CompactedMetaName: + compactedBlockIDs = append(compactedBlockIDs, id) + } + mtx.Unlock() + } + } + }(min, max) + } + wg.Wait() + close(errChan) + + errs := make([]error, 0, len(errChan)) + for e := range errChan { + errs = append(errs, e) + } + + if len(errs) > 0 { + return nil, nil, errors.Join(errs...) + } + + level.Debug(rw.logger).Log("msg", "listing blocks complete", "blockIDs", len(blockIDs), "compactedBlockIDs", len(compactedBlockIDs)) + + return blockIDs, compactedBlockIDs, nil +} + // Read implements backend.Reader func (rw *readerWriter) Read(ctx context.Context, name string, keypath backend.KeyPath, _ bool) (io.ReadCloser, int64, error) { span, derivedCtx := opentracing.StartSpanFromContext(ctx, "s3.Read") diff --git a/tempodb/backend/s3/s3_test.go b/tempodb/backend/s3/s3_test.go index 5678ff84f7f..fcb06fd4891 100644 --- a/tempodb/backend/s3/s3_test.go +++ b/tempodb/backend/s3/s3_test.go @@ -20,11 +20,12 @@ import ( "github.com/aws/aws-sdk-go/service/s3" "github.com/grafana/dskit/flagext" - "github.com/grafana/tempo/tempodb/backend" "github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7/pkg/credentials" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "github.com/grafana/tempo/tempodb/backend" ) const ( @@ -284,7 +285,7 @@ func TestNilConfig(t *testing.T) { } func fakeServer(t *testing.T, returnIn time.Duration, counter *int32) *httptest.Server { - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { time.Sleep(returnIn) atomic.AddInt32(counter, 1) diff --git a/tempodb/blocklist/list.go b/tempodb/blocklist/list.go index 952557d66ba..96088578e79 100644 --- a/tempodb/blocklist/list.go +++ b/tempodb/blocklist/list.go @@ -4,6 +4,7 @@ import ( "sync" "github.com/google/uuid" + "github.com/grafana/tempo/tempodb/backend" ) @@ -93,10 +94,10 @@ func (l *List) ApplyPollResults(m PerTenant, c PerTenantCompacted) { l.updateInternal(tenantID, l.added[tenantID], l.removed[tenantID], l.compactedAdded[tenantID], l.compactedRemoved[tenantID]) } - l.added = make(PerTenant) - l.removed = make(PerTenant) - l.compactedAdded = make(PerTenantCompacted) - l.compactedRemoved = make(PerTenantCompacted) + clear(l.added) + clear(l.removed) + clear(l.compactedAdded) + clear(l.compactedRemoved) } // Update Adds and removes regular or compacted blocks from the in-memory blocklist. @@ -111,7 +112,11 @@ func (l *List) Update(tenantID string, add []*backend.BlockMeta, remove []*backe l.updateInternal(tenantID, add, remove, compactedAdd, compactedRemove) - // save off, they are retained for an additional polling cycle + // We have updated the current blocklist, but we may be in the middle of a + // polling cycle. When the Apply is called above, we will have lost the + // changes that we have just added. So we keep track of them here and apply + // them again after the Apply to save them for the next polling cycle. On + // the next polling cycle, the changes here will rediscovered. l.added[tenantID] = append(l.added[tenantID], add...) l.removed[tenantID] = append(l.removed[tenantID], remove...) l.compactedAdded[tenantID] = append(l.compactedAdded[tenantID], compactedAdd...) @@ -149,6 +154,7 @@ func (l *List) updateInternal(tenantID string, add []*backend.BlockMeta, remove newblocklist = append(newblocklist, b) } } + l.metas[tenantID] = newblocklist // ******** Compacted blocks ******** @@ -178,5 +184,6 @@ func (l *List) updateInternal(tenantID string, add []*backend.BlockMeta, remove newCompactedBlocklist = append(newCompactedBlocklist, b) } } + l.compactedMetas[tenantID] = newCompactedBlocklist } diff --git a/tempodb/blocklist/list_test.go b/tempodb/blocklist/list_test.go index 7a87b258c9f..efa20ceb6ca 100644 --- a/tempodb/blocklist/list_test.go +++ b/tempodb/blocklist/list_test.go @@ -5,8 +5,9 @@ import ( "testing" "github.com/google/uuid" - "github.com/grafana/tempo/tempodb/backend" "github.com/stretchr/testify/assert" + + "github.com/grafana/tempo/tempodb/backend" ) const testTenantID = "test" @@ -470,6 +471,12 @@ func TestUpdateCompacted(t *testing.T) { func TestUpdatesSaved(t *testing.T) { // unlike most tests these are applied serially to the same list object and the expected // results are cumulative across all tests + + one := uuid.MustParse("00000000-0000-0000-0000-000000000001") + two := uuid.MustParse("00000000-0000-0000-0000-000000000002") + oneOhOne := uuid.MustParse("10000000-0000-0000-0000-000000000001") + oneOhTwo := uuid.MustParse("10000000-0000-0000-0000-000000000002") + tests := []struct { applyMetas PerTenant applyCompacted PerTenantCompacted @@ -487,7 +494,7 @@ func TestUpdatesSaved(t *testing.T) { applyMetas: PerTenant{ "test": []*backend.BlockMeta{ { - BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000001"), + BlockID: one, }, }, }, @@ -495,7 +502,7 @@ func TestUpdatesSaved(t *testing.T) { "test": []*backend.CompactedBlockMeta{ { BlockMeta: backend.BlockMeta{ - BlockID: uuid.MustParse("10000000-0000-0000-0000-000000000001"), + BlockID: oneOhOne, }, }, }, @@ -503,21 +510,21 @@ func TestUpdatesSaved(t *testing.T) { updateTenant: "test", addMetas: []*backend.BlockMeta{ { - BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000001"), + BlockID: one, }, { - BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000002"), + BlockID: two, }, }, removeMetas: []*backend.BlockMeta{ { - BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000001"), + BlockID: one, }, }, addCompacted: []*backend.CompactedBlockMeta{ { BlockMeta: backend.BlockMeta{ - BlockID: uuid.MustParse("10000000-0000-0000-0000-000000000002"), + BlockID: oneOhTwo, }, }, }, @@ -525,7 +532,7 @@ func TestUpdatesSaved(t *testing.T) { expectedMetas: PerTenant{ "test": []*backend.BlockMeta{ { - BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000002"), + BlockID: two, }, }, }, @@ -533,12 +540,12 @@ func TestUpdatesSaved(t *testing.T) { "test": []*backend.CompactedBlockMeta{ { BlockMeta: backend.BlockMeta{ - BlockID: uuid.MustParse("10000000-0000-0000-0000-000000000001"), + BlockID: oneOhOne, }, }, { BlockMeta: backend.BlockMeta{ - BlockID: uuid.MustParse("10000000-0000-0000-0000-000000000002"), + BlockID: oneOhTwo, }, }, }, @@ -549,7 +556,7 @@ func TestUpdatesSaved(t *testing.T) { applyMetas: PerTenant{ "test": []*backend.BlockMeta{ { - BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000001"), + BlockID: one, }, }, }, @@ -557,7 +564,7 @@ func TestUpdatesSaved(t *testing.T) { "test": []*backend.CompactedBlockMeta{ { BlockMeta: backend.BlockMeta{ - BlockID: uuid.MustParse("10000000-0000-0000-0000-000000000001"), + BlockID: oneOhOne, }, }, }, @@ -565,8 +572,12 @@ func TestUpdatesSaved(t *testing.T) { expectedTenants: []string{"test"}, expectedMetas: PerTenant{ "test": []*backend.BlockMeta{ + // Even though we have just appled one, it was removed in the previous step, and we we expect not to find it here. + // { + // BlockID: one, + // }, { - BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000002"), + BlockID: two, }, }, }, @@ -574,12 +585,12 @@ func TestUpdatesSaved(t *testing.T) { "test": []*backend.CompactedBlockMeta{ { BlockMeta: backend.BlockMeta{ - BlockID: uuid.MustParse("10000000-0000-0000-0000-000000000001"), + BlockID: oneOhOne, }, }, { BlockMeta: backend.BlockMeta{ - BlockID: uuid.MustParse("10000000-0000-0000-0000-000000000002"), + BlockID: oneOhTwo, }, }, }, @@ -590,7 +601,7 @@ func TestUpdatesSaved(t *testing.T) { applyMetas: PerTenant{ "test": []*backend.BlockMeta{ { - BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000001"), + BlockID: one, }, }, }, @@ -598,7 +609,7 @@ func TestUpdatesSaved(t *testing.T) { "test": []*backend.CompactedBlockMeta{ { BlockMeta: backend.BlockMeta{ - BlockID: uuid.MustParse("10000000-0000-0000-0000-000000000001"), + BlockID: oneOhOne, }, }, }, @@ -607,7 +618,7 @@ func TestUpdatesSaved(t *testing.T) { expectedMetas: PerTenant{ "test": []*backend.BlockMeta{ { - BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000001"), + BlockID: one, }, }, }, @@ -615,7 +626,7 @@ func TestUpdatesSaved(t *testing.T) { "test": []*backend.CompactedBlockMeta{ { BlockMeta: backend.BlockMeta{ - BlockID: uuid.MustParse("10000000-0000-0000-0000-000000000001"), + BlockID: oneOhOne, }, }, }, @@ -624,7 +635,9 @@ func TestUpdatesSaved(t *testing.T) { } l := New() - for _, tc := range tests { + for i, tc := range tests { + t.Logf("step %d", i+1) + l.ApplyPollResults(tc.applyMetas, tc.applyCompacted) if tc.updateTenant != "" { l.Update(tc.updateTenant, tc.addMetas, tc.removeMetas, tc.addCompacted, nil) diff --git a/tempodb/blocklist/poller.go b/tempodb/blocklist/poller.go index 9d5b8898c47..b20df0f38f4 100644 --- a/tempodb/blocklist/poller.go +++ b/tempodb/blocklist/poller.go @@ -7,15 +7,16 @@ import ( "math/rand" "sort" "strconv" + "sync" "time" "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/google/uuid" opentracing "github.com/opentracing/opentracing-go" + spanlog "github.com/opentracing/opentracing-go/log" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" - "go.uber.org/atomic" "github.com/grafana/tempo/pkg/boundedwaitgroup" "github.com/grafana/tempo/tempodb/backend" @@ -123,7 +124,7 @@ func NewPoller(cfg *PollerConfig, sharder JobSharder, reader backend.Reader, com } // Do does the doing of getting a blocklist -func (p *Poller) Do() (PerTenant, PerTenantCompacted, error) { +func (p *Poller) Do(previous *List) (PerTenant, PerTenantCompacted, error) { start := time.Now() defer func() { diff := time.Since(start).Seconds() @@ -131,7 +132,12 @@ func (p *Poller) Do() (PerTenant, PerTenantCompacted, error) { level.Info(p.logger).Log("msg", "blocklist poll complete", "seconds", diff) }() - ctx := context.Background() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + span, _ := opentracing.StartSpanFromContext(ctx, "Poller.Do") + defer span.Finish() + tenants, err := p.reader.Tenants(ctx) if err != nil { metricBlocklistErrors.WithLabelValues("").Inc() @@ -144,7 +150,7 @@ func (p *Poller) Do() (PerTenant, PerTenantCompacted, error) { consecutiveErrors := 0 for _, tenantID := range tenants { - newBlockList, newCompactedBlockList, err := p.pollTenantAndCreateIndex(ctx, tenantID) + newBlockList, newCompactedBlockList, err := p.pollTenantAndCreateIndex(ctx, tenantID, previous) if err != nil { level.Error(p.logger).Log("msg", "failed to poll or create index for tenant", "tenant", tenantID, "err", err) consecutiveErrors++ @@ -178,12 +184,18 @@ func (p *Poller) Do() (PerTenant, PerTenantCompacted, error) { return blocklist, compactedBlocklist, nil } -func (p *Poller) pollTenantAndCreateIndex(ctx context.Context, tenantID string) ([]*backend.BlockMeta, []*backend.CompactedBlockMeta, error) { - span, derivedCtx := opentracing.StartSpanFromContext(ctx, "poll tenant index") +func (p *Poller) pollTenantAndCreateIndex( + ctx context.Context, + tenantID string, + previous *List, +) ([]*backend.BlockMeta, []*backend.CompactedBlockMeta, error) { + span, derivedCtx := opentracing.StartSpanFromContext(ctx, "Poller.pollTenantAndCreateIndex", opentracing.Tag{Key: "tenant", Value: tenantID}) defer span.Finish() // are we a tenant index builder? - if !p.buildTenantIndex(tenantID) { + builder := p.tenantIndexBuilder(tenantID) + span.SetTag("tenant_index_builder", builder) + if !builder { metricTenantIndexBuilder.WithLabelValues(tenantID).Set(0) i, err := p.reader.TenantIndex(derivedCtx, tenantID) @@ -192,24 +204,31 @@ func (p *Poller) pollTenantAndCreateIndex(ctx context.Context, tenantID string) // success! return the retrieved index metricTenantIndexAgeSeconds.WithLabelValues(tenantID).Set(float64(time.Since(i.CreatedAt) / time.Second)) level.Info(p.logger).Log("msg", "successfully pulled tenant index", "tenant", tenantID, "createdAt", i.CreatedAt, "metas", len(i.Meta), "compactedMetas", len(i.CompactedMeta)) + + span.SetTag("metas", len(i.Meta)) + span.SetTag("compactedMetas", len(i.CompactedMeta)) return i.Meta, i.CompactedMeta, nil } metricTenantIndexErrors.WithLabelValues(tenantID).Inc() + span.LogFields( + spanlog.Error(err), + ) // there was an error, return the error if we're not supposed to fallback to polling if !p.cfg.PollFallback { - return nil, nil, err + return nil, nil, fmt.Errorf("failed to pull tenant index and no fallback configured: %w", err) } // polling fallback is true, log the error and continue in this method to completely poll the backend level.Error(p.logger).Log("msg", "failed to pull bucket index for tenant. falling back to polling", "tenant", tenantID, "err", err) } - // if we're here then we have been configured to be a tenant index builder OR there was a failure to pull - // the tenant index and we are configured to fall back to polling + // if we're here then we have been configured to be a tenant index builder OR + // there was a failure to pull the tenant index and we are configured to fall + // back to polling. metricTenantIndexBuilder.WithLabelValues(tenantID).Set(1) - blocklist, compactedBlocklist, err := p.pollTenantBlocks(derivedCtx, tenantID) + blocklist, compactedBlocklist, err := p.pollTenantBlocks(derivedCtx, tenantID, previous) if err != nil { return nil, nil, err } @@ -226,59 +245,78 @@ func (p *Poller) pollTenantAndCreateIndex(ctx context.Context, tenantID string) return blocklist, compactedBlocklist, nil } -func (p *Poller) pollTenantBlocks(ctx context.Context, tenantID string) ([]*backend.BlockMeta, []*backend.CompactedBlockMeta, error) { - blockIDs, err := p.reader.Blocks(ctx, tenantID) +func (p *Poller) pollTenantBlocks( + ctx context.Context, + tenantID string, + previous *List, +) ([]*backend.BlockMeta, []*backend.CompactedBlockMeta, error) { + span, derivedCtx := opentracing.StartSpanFromContext(ctx, "Poller.pollTenantBlocks") + defer span.Finish() + + currentBlockIDs, currentCompactedBlockIDs, err := p.reader.Blocks(derivedCtx, tenantID) if err != nil { - metricBlocklistErrors.WithLabelValues(tenantID).Inc() - return []*backend.BlockMeta{}, []*backend.CompactedBlockMeta{}, err + return nil, nil, err } - bg := boundedwaitgroup.New(p.cfg.PollConcurrency) - chMeta := make(chan *backend.BlockMeta, len(blockIDs)) - chCompactedMeta := make(chan *backend.CompactedBlockMeta, len(blockIDs)) - anyError := atomic.Error{} + var ( + metas = previous.Metas(tenantID) + compactedMetas = previous.CompactedMetas(tenantID) + mm = make(map[uuid.UUID]*backend.BlockMeta, len(metas)) + cm = make(map[uuid.UUID]*backend.CompactedBlockMeta, len(compactedMetas)) + newBlockList = make([]*backend.BlockMeta, 0, len(currentBlockIDs)) + newCompactedBlocklist = make([]*backend.CompactedBlockMeta, 0, len(currentCompactedBlockIDs)) + unknownBlockIDs = make(map[uuid.UUID]bool, 1000) + ) + + span.SetTag("metas", len(metas)) + span.SetTag("compactedMetas", len(compactedMetas)) + + for _, i := range metas { + mm[i.BlockID] = i + } - for _, blockID := range blockIDs { - bg.Add(1) - go func(uuid uuid.UUID) { - defer bg.Done() + for _, i := range compactedMetas { + cm[i.BlockID] = i + } - if p.cfg.PollJitterMs > 0 { - time.Sleep(time.Duration(rand.Intn(p.cfg.PollJitterMs)) * time.Millisecond) - } + // The boolean here to track if we know the block has been compacted + for _, blockID := range currentBlockIDs { + // if we already have this block id in our previous list, use the existing data. + if v, ok := mm[blockID]; ok { + newBlockList = append(newBlockList, v) + continue + } + unknownBlockIDs[blockID] = false - m, cm, err := p.pollBlock(ctx, tenantID, uuid) - if m != nil { - chMeta <- m - } else if cm != nil { - chCompactedMeta <- cm - } else if err != nil { - anyError.Store(err) - } - }(blockID) } - bg.Wait() - close(chMeta) - close(chCompactedMeta) + for _, blockID := range currentCompactedBlockIDs { + // if we already have this block id in our previous list, use the existing data. + if v, ok := cm[blockID]; ok { + newCompactedBlocklist = append(newCompactedBlocklist, v) + continue + } + + // TODO: Review the ability to avoid polling for compacted blocks that we + // know about. We need to know the compacted time, but perhaps there is + // another way to get that, like the object creation time. + + unknownBlockIDs[blockID] = true - if err = anyError.Load(); err != nil { - metricTenantIndexErrors.WithLabelValues(tenantID).Inc() - return nil, nil, err } - newBlockList := make([]*backend.BlockMeta, 0, len(blockIDs)) - for m := range chMeta { - newBlockList = append(newBlockList, m) + newM, newCm, err := p.pollUnknown(derivedCtx, unknownBlockIDs, tenantID) + if err != nil { + return nil, nil, err } + + newBlockList = append(newBlockList, newM...) + newCompactedBlocklist = append(newCompactedBlocklist, newCm...) + sort.Slice(newBlockList, func(i, j int) bool { return newBlockList[i].StartTime.Before(newBlockList[j].StartTime) }) - newCompactedBlocklist := make([]*backend.CompactedBlockMeta, 0, len(blockIDs)) - for cm := range chCompactedMeta { - newCompactedBlocklist = append(newCompactedBlocklist, cm) - } sort.Slice(newCompactedBlocklist, func(i, j int) bool { return newCompactedBlocklist[i].StartTime.Before(newCompactedBlocklist[j].StartTime) }) @@ -286,11 +324,93 @@ func (p *Poller) pollTenantBlocks(ctx context.Context, tenantID string) ([]*back return newBlockList, newCompactedBlocklist, nil } -func (p *Poller) pollBlock(ctx context.Context, tenantID string, blockID uuid.UUID) (*backend.BlockMeta, *backend.CompactedBlockMeta, error) { +func (p *Poller) pollUnknown( + ctx context.Context, + unknownBlocks map[uuid.UUID]bool, + tenantID string, +) ([]*backend.BlockMeta, []*backend.CompactedBlockMeta, error) { + span, derivedCtx := opentracing.StartSpanFromContext(ctx, "pollUnknown", opentracing.Tags{ + "unknownBlockIDs": len(unknownBlocks), + }) + defer span.Finish() + + var ( + errs []error + mtx sync.Mutex + bg = boundedwaitgroup.New(p.cfg.PollConcurrency) + newBlockList = make([]*backend.BlockMeta, 0, len(unknownBlocks)) + newCompactedBlocklist = make([]*backend.CompactedBlockMeta, 0, len(unknownBlocks)) + ) + + for blockID, compacted := range unknownBlocks { + bg.Add(1) + + // Avoid polling if we've already encountered an error + mtx.Lock() + if len(errs) > 0 { + mtx.Unlock() + break + } + mtx.Unlock() + + go func(id uuid.UUID, compacted bool) { + defer bg.Done() + + if p.cfg.PollJitterMs > 0 { + time.Sleep(time.Duration(rand.Intn(p.cfg.PollJitterMs)) * time.Millisecond) + } + + m, cm, pollBlockErr := p.pollBlock(derivedCtx, tenantID, id, compacted) + mtx.Lock() + defer mtx.Unlock() + if m != nil { + newBlockList = append(newBlockList, m) + return + } + + if cm != nil { + newCompactedBlocklist = append(newCompactedBlocklist, cm) + return + } + + if pollBlockErr != nil { + errs = append(errs, pollBlockErr) + } + }(blockID, compacted) + } + + bg.Wait() + + if len(errs) > 0 { + metricTenantIndexErrors.WithLabelValues(tenantID).Inc() + // TODO: add span status on error + return nil, nil, errors.Join(errs...) + } + + return newBlockList, newCompactedBlocklist, nil +} + +func (p *Poller) pollBlock( + ctx context.Context, + tenantID string, + blockID uuid.UUID, + compacted bool, +) (*backend.BlockMeta, *backend.CompactedBlockMeta, error) { + span, derivedCtx := opentracing.StartSpanFromContext(ctx, "Poller.pollBlock") + defer span.Finish() + var err error + + span.SetTag("tenant", tenantID) + span.SetTag("block", blockID.String()) + + var blockMeta *backend.BlockMeta var compactedBlockMeta *backend.CompactedBlockMeta - blockMeta, err := p.reader.BlockMeta(ctx, blockID, tenantID) + + if !compacted { + blockMeta, err = p.reader.BlockMeta(derivedCtx, blockID, tenantID) + } // if the normal meta doesn't exist maybe it's compacted. - if errors.Is(err, backend.ErrDoesNotExist) { + if errors.Is(err, backend.ErrDoesNotExist) || compacted { blockMeta = nil compactedBlockMeta, err = p.compactor.CompactedBlockMeta(blockID, tenantID) } @@ -308,7 +428,8 @@ func (p *Poller) pollBlock(ctx context.Context, tenantID string, blockID uuid.UU return blockMeta, compactedBlockMeta, nil } -func (p *Poller) buildTenantIndex(tenant string) bool { +// tenantIndexBuilder returns true if this poller owns this tenant +func (p *Poller) tenantIndexBuilder(tenant string) bool { for i := 0; i < p.cfg.TenantIndexBuilders; i++ { job := jobPrefix + strconv.Itoa(i) + "-" + tenant if p.sharder.Owns(job) { @@ -338,7 +459,10 @@ type backendMetaMetrics struct { compactedBlockMetaTotalBytes uint64 } -func sumTotalBackendMetaMetrics(blockMeta []*backend.BlockMeta, compactedBlockMeta []*backend.CompactedBlockMeta) backendMetaMetrics { +func sumTotalBackendMetaMetrics( + blockMeta []*backend.BlockMeta, + compactedBlockMeta []*backend.CompactedBlockMeta, +) backendMetaMetrics { var sumTotalObjectsBM int var sumTotalObjectsCBM int var sumTotalBytesBM uint64 diff --git a/tempodb/blocklist/poller_test.go b/tempodb/blocklist/poller_test.go index 673bc94f570..1cb63234b1b 100644 --- a/tempodb/blocklist/poller_test.go +++ b/tempodb/blocklist/poller_test.go @@ -1,16 +1,23 @@ package blocklist import ( + "bytes" "context" "errors" + "fmt" + "maps" + "math/rand" + "sort" "strconv" "testing" "time" "github.com/go-kit/log" "github.com/google/uuid" - "github.com/grafana/tempo/tempodb/backend" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/grafana/tempo/tempodb/backend" ) var ( @@ -147,6 +154,7 @@ func TestTenantIndexBuilder(t *testing.T) { c := newMockCompactor(tc.compactedList, tc.expectsError) r := newMockReader(tc.list, tc.compactedList, tc.expectsError) w := &backend.MockWriter{} + b := newBlocklist(PerTenant{}, PerTenantCompacted{}) poller := NewPoller(&PollerConfig{ PollConcurrency: testPollConcurrency, @@ -155,7 +163,7 @@ func TestTenantIndexBuilder(t *testing.T) { }, &mockJobSharder{ owns: true, }, r, c, w, log.NewNopLogger()) - actualList, actualCompactedList, err := poller.Do() + actualList, actualCompactedList, err := poller.Do(b) // confirm return as expected assert.Equal(t, tc.expectedList, actualList) @@ -238,13 +246,16 @@ func TestTenantIndexFallback(t *testing.T) { "test": []*backend.BlockMeta{}, }, nil, false) w := &backend.MockWriter{} + b := newBlocklist(PerTenant{}, PerTenantCompacted{}) r.(*backend.MockReader).TenantIndexFn = func(ctx context.Context, tenantID string) (*backend.TenantIndex, error) { if tc.errorOnCreateTenantIndex { return nil, errors.New("err") } return &backend.TenantIndex{ - CreatedAt: time.Now().Add(-5 * time.Minute), // always make the tenant index 5 minutes old so the above tests can use that for fallback testing + CreatedAt: time.Now(). + Add(-5 * time.Minute), + // always make the tenant index 5 minutes old so the above tests can use that for fallback testing }, nil } @@ -256,7 +267,7 @@ func TestTenantIndexFallback(t *testing.T) { }, &mockJobSharder{ owns: tc.isTenantIndexBuilder, }, r, c, w, log.NewNopLogger()) - _, _, err := poller.Do() + _, _, err := poller.Do(b) assert.Equal(t, tc.expectsError, err != nil) assert.Equal(t, tc.expectsTenantIndexWritten, w.IndexCompactedMeta != nil) @@ -343,7 +354,7 @@ func TestPollBlock(t *testing.T) { PollFallback: testPollFallback, TenantIndexBuilders: testBuilders, }, &mockJobSharder{}, r, c, w, log.NewNopLogger()) - actualMeta, actualCompactedMeta, err := poller.pollBlock(context.Background(), tc.pollTenantID, tc.pollBlockID) + actualMeta, actualCompactedMeta, err := poller.pollBlock(context.Background(), tc.pollTenantID, tc.pollBlockID, false) assert.Equal(t, tc.expectedMeta, actualMeta) assert.Equal(t, tc.expectedCompactedMeta, actualCompactedMeta) @@ -502,6 +513,7 @@ func TestPollTolerateConsecutiveErrors(t *testing.T) { c = newMockCompactor(PerTenantCompacted{}, false) w = &backend.MockWriter{} s = &mockJobSharder{owns: true} + b = newBlocklist(PerTenant{}, PerTenantCompacted{}) ) testCases := []struct { @@ -529,9 +541,15 @@ func TestPollTolerateConsecutiveErrors(t *testing.T) { expectedError: nil, }, { - name: "too many errors", - tolerate: 2, - tenantErrors: []error{nil, errors.New("tenant 1 err"), errors.New("tenant 2 err"), errors.New("tenant 3 err"), nil}, + name: "too many errors", + tolerate: 2, + tenantErrors: []error{ + nil, + errors.New("tenant 1 err"), + errors.New("tenant 2 err"), + errors.New("tenant 3 err"), + nil, + }, expectedError: errors.New("tenant 3 err"), }, } @@ -540,9 +558,9 @@ func TestPollTolerateConsecutiveErrors(t *testing.T) { t.Run(tc.name, func(t *testing.T) { // This mock reader returns error or nil based on the tenant ID r := &backend.MockReader{ - BlockFn: func(ctx context.Context, tenantID string) ([]uuid.UUID, error) { + BlocksFn: func(ctx context.Context, tenantID string) ([]uuid.UUID, []uuid.UUID, error) { i, _ := strconv.Atoi(tenantID) - return nil, tc.tenantErrors[i] + return nil, nil, tc.tenantErrors[i] }, } // Tenant ID for each index in the slice @@ -557,13 +575,372 @@ func TestPollTolerateConsecutiveErrors(t *testing.T) { TolerateConsecutiveErrors: tc.tolerate, }, s, r, c, w, log.NewNopLogger()) - _, _, err := poller.Do() + _, _, err := poller.Do(b) - assert.Equal(t, tc.expectedError, err) + if tc.expectedError != nil { + assert.ErrorContains(t, err, tc.expectedError.Error()) + } else { + assert.NoError(t, err) + } }) } } +func TestPollComparePreviousResults(t *testing.T) { + zero := uuid.MustParse("00000000-0000-0000-0000-000000000000") + aaa := uuid.MustParse("00000000-0000-0000-0000-00000000000A") + eff := uuid.MustParse("00000000-0000-0000-0000-00000000000F") + + testCases := []struct { + name string + + previousPerTenant PerTenant + previousCompactedPerTenant PerTenantCompacted + + currentPerTenant PerTenant + currentCompactedPerTenant PerTenantCompacted + + expectedPerTenant PerTenant + expectedCompactedPerTenant PerTenantCompacted + + expectedBlockMetaCalls map[string]map[uuid.UUID]int + expectedCompactedBlockMetaCalls map[string]map[uuid.UUID]int + + readerErr bool + err error + }{ + { + name: "with no previous results, the blocklist is polled", + previousPerTenant: PerTenant{}, + previousCompactedPerTenant: PerTenantCompacted{}, + currentPerTenant: PerTenant{ + "test": []*backend.BlockMeta{ + {BlockID: zero}, + }, + }, + currentCompactedPerTenant: PerTenantCompacted{ + "test": []*backend.CompactedBlockMeta{ + {BlockMeta: backend.BlockMeta{BlockID: eff}}, + }, + }, + expectedPerTenant: PerTenant{ + "test": []*backend.BlockMeta{ + {BlockID: zero}, + }, + }, + expectedCompactedPerTenant: PerTenantCompacted{ + "test": []*backend.CompactedBlockMeta{ + {BlockMeta: backend.BlockMeta{BlockID: eff}}, + }, + }, + expectedBlockMetaCalls: map[string]map[uuid.UUID]int{ + "test": { + zero: 1, + }, + }, + expectedCompactedBlockMetaCalls: map[string]map[uuid.UUID]int{ + "test": { + eff: 1, + }, + }, + }, + { + name: "with previous results, meta should be read from only new blocks", + previousPerTenant: PerTenant{ + "test": []*backend.BlockMeta{ + {BlockID: zero}, + {BlockID: eff}, + }, + }, + previousCompactedPerTenant: PerTenantCompacted{}, + currentPerTenant: PerTenant{ + "test": []*backend.BlockMeta{ + {BlockID: zero}, + {BlockID: eff}, + }, + }, + currentCompactedPerTenant: PerTenantCompacted{}, + expectedPerTenant: PerTenant{ + "test": []*backend.BlockMeta{ + {BlockID: zero}, + {BlockID: eff}, + }, + }, + expectedCompactedPerTenant: PerTenantCompacted{ + "test": []*backend.CompactedBlockMeta{}, + }, + expectedBlockMetaCalls: map[string]map[uuid.UUID]int{}, + }, + { + name: "with previous results, blocks that have been compacted since the last poll should be known as compacted", + previousPerTenant: PerTenant{ + "test": []*backend.BlockMeta{ + {BlockID: zero}, + {BlockID: aaa}, + }, + }, + previousCompactedPerTenant: PerTenantCompacted{}, + currentPerTenant: PerTenant{ + "test": []*backend.BlockMeta{ + {BlockID: eff}, + }, + }, + currentCompactedPerTenant: PerTenantCompacted{ + "test": []*backend.CompactedBlockMeta{ + {BlockMeta: backend.BlockMeta{BlockID: zero}}, + {BlockMeta: backend.BlockMeta{BlockID: aaa}}, + }, + }, + expectedPerTenant: PerTenant{ + "test": []*backend.BlockMeta{ + {BlockID: eff}, + }, + }, + expectedCompactedPerTenant: PerTenantCompacted{ + "test": []*backend.CompactedBlockMeta{ + {BlockMeta: backend.BlockMeta{BlockID: aaa}}, + {BlockMeta: backend.BlockMeta{BlockID: zero}}, + }, + }, + expectedBlockMetaCalls: map[string]map[uuid.UUID]int{ + "test": { + eff: 1, + }, + }, + expectedCompactedBlockMetaCalls: map[string]map[uuid.UUID]int{ + "test": { + aaa: 1, + zero: 1, + }, + }, + }, + { + name: "with previous compactions should be known", + previousPerTenant: PerTenant{}, + previousCompactedPerTenant: PerTenantCompacted{ + "test": []*backend.CompactedBlockMeta{ + {BlockMeta: backend.BlockMeta{BlockID: zero}}, + {BlockMeta: backend.BlockMeta{BlockID: aaa}}, + {BlockMeta: backend.BlockMeta{BlockID: eff}}, + }, + }, + currentPerTenant: PerTenant{}, + currentCompactedPerTenant: PerTenantCompacted{ + "test": []*backend.CompactedBlockMeta{ + {BlockMeta: backend.BlockMeta{BlockID: zero}}, + {BlockMeta: backend.BlockMeta{BlockID: aaa}}, + {BlockMeta: backend.BlockMeta{BlockID: eff}}, + }, + }, + expectedPerTenant: PerTenant{ + "test": []*backend.BlockMeta{}, + }, + expectedCompactedPerTenant: PerTenantCompacted{ + "test": []*backend.CompactedBlockMeta{ + {BlockMeta: backend.BlockMeta{BlockID: zero}}, + {BlockMeta: backend.BlockMeta{BlockID: aaa}}, + {BlockMeta: backend.BlockMeta{BlockID: eff}}, + }, + }, + expectedBlockMetaCalls: map[string]map[uuid.UUID]int{}, + }, + { + name: "with previous compactions removed, should be forgotten", + previousPerTenant: PerTenant{}, + previousCompactedPerTenant: PerTenantCompacted{ + "test": []*backend.CompactedBlockMeta{ + {BlockMeta: backend.BlockMeta{BlockID: zero}}, + }, + }, + currentPerTenant: PerTenant{}, + currentCompactedPerTenant: PerTenantCompacted{}, + expectedPerTenant: PerTenant{}, + expectedCompactedPerTenant: PerTenantCompacted{}, + expectedBlockMetaCalls: map[string]map[uuid.UUID]int{}, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + var ( + c = newMockCompactor(tc.currentCompactedPerTenant, false) + w = &backend.MockWriter{} + s = &mockJobSharder{owns: true} + r = newMockReader(tc.currentPerTenant, tc.currentCompactedPerTenant, tc.readerErr) + previous = newBlocklist(tc.previousPerTenant, tc.previousCompactedPerTenant) + ) + + // This mock reader returns error or nil based on the tenant ID + poller := NewPoller(&PollerConfig{ + PollConcurrency: testPollConcurrency, + PollFallback: testPollFallback, + TenantIndexBuilders: testBuilders, + }, s, r, c, w, log.NewNopLogger()) + + metas, compactedMetas, err := poller.Do(previous) + require.Equal(t, tc.err, err) + + require.Equal(t, len(tc.expectedPerTenant), len(metas)) + for tenantID, expectedMetas := range tc.expectedPerTenant { + l := metas[tenantID] + sort.Slice(l, func(i, j int) bool { + x := bytes.Compare(l[i].BlockID[:], l[j].BlockID[:]) + return x > 0 + }) + + sort.Slice(expectedMetas, func(i, j int) bool { + x := bytes.Compare(expectedMetas[i].BlockID[:], expectedMetas[j].BlockID[:]) + return x > 0 + }) + + require.Equal(t, expectedMetas, l) + } + + require.Equal(t, len(tc.expectedCompactedPerTenant), len(compactedMetas)) + for tenantID, expectedCompactedMetas := range tc.expectedCompactedPerTenant { + l := compactedMetas[tenantID] + sort.Slice(l, func(i, j int) bool { + x := bytes.Compare(l[i].BlockID[:], l[j].BlockID[:]) + return x > 0 + }) + + sort.Slice(expectedCompactedMetas, func(i, j int) bool { + x := bytes.Compare(expectedCompactedMetas[i].BlockID[:], expectedCompactedMetas[j].BlockID[:]) + return x > 0 + }) + require.Equal(t, expectedCompactedMetas, l) + } + + require.Equal(t, tc.expectedBlockMetaCalls, r.(*backend.MockReader).BlockMetaCalls) + require.Equal(t, tc.expectedCompactedBlockMetaCalls, c.(*backend.MockCompactor).CompactedBlockMetaCalls) + }) + } +} + +func BenchmarkPoller10k(b *testing.B) { + tests := []struct { + tenantCount int + blocksPerTenant int + }{ + { + tenantCount: 1, + blocksPerTenant: 100, + }, + { + tenantCount: 1, + blocksPerTenant: 1000, + }, + { + tenantCount: 1, + blocksPerTenant: 10000, + }, + { + tenantCount: 1, + blocksPerTenant: 100000, + }, + } + + for _, tc := range tests { + previousPerTenant := newPerTenant(tc.tenantCount, tc.blocksPerTenant) + previousPerTenantCompacted := newPerTenantCompacted(tc.tenantCount, tc.blocksPerTenant) + + // currentPerTenant := newPerTenant(uuids, tc.tenantCount, tc.blocksPerTenant) + // currentPerTenantCompacted := newPerTenantCompacted(uuids, tc.tenantCount, tc.blocksPerTenant) + currentPerTenant := maps.Clone(previousPerTenant) + currentPerTenantCompacted := maps.Clone(previousPerTenantCompacted) + + var ( + c = newMockCompactor(currentPerTenantCompacted, false) + w = &backend.MockWriter{} + s = &mockJobSharder{owns: true} + r = newMockReader(currentPerTenant, currentPerTenantCompacted, false) + previous = newBlocklist(previousPerTenant, previousPerTenantCompacted) + ) + + // This mock reader returns error or nil based on the tenant ID + poller := NewPoller(&PollerConfig{ + PollConcurrency: testPollConcurrency, + PollFallback: testPollFallback, + TenantIndexBuilders: testBuilders, + }, s, r, c, w, log.NewNopLogger()) + + runName := fmt.Sprintf("%d-%d", tc.tenantCount, tc.blocksPerTenant) + b.Run(runName, func(b *testing.B) { + for tenant := range previousPerTenant { + benchmarkPollTenant(b, poller, tenant, previous) + } + }) + } +} + +func benchmarkPollTenant(b *testing.B, poller *Poller, tenant string, previous *List) { + b.ResetTimer() + for n := 0; n < b.N; n++ { + _, _, err := poller.pollTenantBlocks(context.Background(), tenant, previous) + require.NoError(b, err) + } +} + +func newBlockMetas(count int) []*backend.BlockMeta { + metas := make([]*backend.BlockMeta, count) + for i := 0; i < count; i++ { + metas[i] = &backend.BlockMeta{ + BlockID: uuid.New(), + } + } + + return metas +} + +func newCompactedMetas(count int) []*backend.CompactedBlockMeta { + metas := make([]*backend.CompactedBlockMeta, count) + for i := 0; i < count; i++ { + metas[i] = &backend.CompactedBlockMeta{ + BlockMeta: backend.BlockMeta{ + BlockID: uuid.New(), + }, + } + } + + return metas +} + +var chars = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") + +func randString(n int) string { + b := make([]rune, n) + for i := range b { + b[i] = chars[rand.Intn(len(chars))] + } + return string(b) +} + +func newPerTenant(tenantCount, blockCount int) PerTenant { + perTenant := make(PerTenant, tenantCount) + var metas []*backend.BlockMeta + var id string + for i := 0; i < tenantCount; i++ { + metas = newBlockMetas(blockCount) + id = randString(5) + perTenant[id] = metas + } + + return perTenant +} + +func newPerTenantCompacted(tenantCount, blockCount int) PerTenantCompacted { + perTenantCompacted := make(PerTenantCompacted) + var metas []*backend.CompactedBlockMeta + var id string + for i := 0; i < tenantCount; i++ { + metas = newCompactedMetas(blockCount) + id = randString(5) + perTenantCompacted[id] = metas + } + + return perTenantCompacted +} + func newMockCompactor(list PerTenantCompacted, expectsError bool) backend.Compactor { return &backend.MockCompactor{ BlockMetaFn: func(blockID uuid.UUID, tenantID string) (*backend.CompactedBlockMeta, error) { @@ -589,31 +966,39 @@ func newMockCompactor(list PerTenantCompacted, expectsError bool) backend.Compac func newMockReader(list PerTenant, compactedList PerTenantCompacted, expectsError bool) backend.Reader { tenants := []string{} + ttt := make(map[string]bool) + for t := range list { - tenants = append(tenants, t) + ttt[t] = true } for t := range compactedList { - tenants = append(tenants, t) + ttt[t] = true + } + + for k := range ttt { + tenants = append(tenants, k) } return &backend.MockReader{ T: tenants, - BlockFn: func(ctx context.Context, tenantID string) ([]uuid.UUID, error) { + BlocksFn: func(ctx context.Context, tenantID string) ([]uuid.UUID, []uuid.UUID, error) { if expectsError { - return nil, errors.New("err") + return nil, nil, errors.New("err") } blocks := list[tenantID] uuids := []uuid.UUID{} + compactedUUIDs := []uuid.UUID{} for _, b := range blocks { uuids = append(uuids, b.BlockID) } compactedBlocks := compactedList[tenantID] for _, b := range compactedBlocks { - uuids = append(uuids, b.BlockID) + compactedUUIDs = append(compactedUUIDs, b.BlockID) } - return uuids, nil + return uuids, compactedUUIDs, nil }, + BlockMetaCalls: make(map[string]map[uuid.UUID]int), BlockMetaFn: func(ctx context.Context, blockID uuid.UUID, tenantID string) (*backend.BlockMeta, error) { if expectsError { return nil, errors.New("err") @@ -634,3 +1019,11 @@ func newMockReader(list PerTenant, compactedList PerTenantCompacted, expectsErro }, } } + +func newBlocklist(metas PerTenant, compactedMetas PerTenantCompacted) *List { + l := New() + + l.ApplyPollResults(metas, compactedMetas) + + return l +} diff --git a/tempodb/compactor_test.go b/tempodb/compactor_test.go index aa405a509eb..90ce11e6e56 100644 --- a/tempodb/compactor_test.go +++ b/tempodb/compactor_test.go @@ -106,7 +106,7 @@ func testCompactionRoundtrip(t *testing.T, targetBlockVersion string) { }, &mockSharder{}, &mockOverrides{}) require.NoError(t, err) - r.EnablePolling(&mockJobSharder{}) + r.EnablePolling(ctx, &mockJobSharder{}) wal := w.WAL() require.NoError(t, err) @@ -253,7 +253,7 @@ func testSameIDCompaction(t *testing.T, targetBlockVersion string) { }, &mockSharder{}, &mockOverrides{}) require.NoError(t, err) - r.EnablePolling(&mockJobSharder{}) + r.EnablePolling(ctx, &mockJobSharder{}) wal := w.WAL() require.NoError(t, err) @@ -396,7 +396,7 @@ func TestCompactionUpdatesBlocklist(t *testing.T) { }, &mockSharder{}, &mockOverrides{}) require.NoError(t, err) - r.EnablePolling(&mockJobSharder{}) + r.EnablePolling(ctx, &mockJobSharder{}) // Cut x blocks with y records each blockCount := 5 @@ -467,7 +467,7 @@ func TestCompactionMetrics(t *testing.T) { }, &mockSharder{}, &mockOverrides{}) require.NoError(t, err) - r.EnablePolling(&mockJobSharder{}) + r.EnablePolling(ctx, &mockJobSharder{}) // Cut x blocks with y records each blockCount := 5 @@ -543,7 +543,7 @@ func TestCompactionIteratesThroughTenants(t *testing.T) { }, &mockSharder{}, &mockOverrides{}) require.NoError(t, err) - r.EnablePolling(&mockJobSharder{}) + r.EnablePolling(ctx, &mockJobSharder{}) // Cut blocks for multiple tenants cutTestBlocks(t, w, testTenantID, 2, 2) @@ -615,7 +615,7 @@ func testCompactionHonorsBlockStartEndTimes(t *testing.T, targetBlockVersion str }, &mockSharder{}, &mockOverrides{}) require.NoError(t, err) - r.EnablePolling(&mockJobSharder{}) + r.EnablePolling(ctx, &mockJobSharder{}) cutTestBlockWithTraces(t, w, testTenantID, []testData{ {test.ValidTraceID(nil), test.MakeTrace(10, nil), 100, 101}, diff --git a/tempodb/encoding/v2/streaming_block_test.go b/tempodb/encoding/v2/streaming_block_test.go index a33f2f35ed9..34dd7711149 100644 --- a/tempodb/encoding/v2/streaming_block_test.go +++ b/tempodb/encoding/v2/streaming_block_test.go @@ -18,6 +18,7 @@ import ( "github.com/stretchr/testify/require" "github.com/google/uuid" + "github.com/grafana/tempo/pkg/util" "github.com/grafana/tempo/tempodb/backend" "github.com/grafana/tempo/tempodb/backend/local" @@ -165,7 +166,7 @@ func testStreamingBlockToBackendBlock(t *testing.T, cfg *common.BlockConfig) { _, ids, reqs := streamingBlock(t, cfg, w) // meta? - uuids, err := r.Blocks(context.Background(), testTenantID) + uuids, _, err := r.Blocks(context.Background(), testTenantID) require.NoError(t, err, "error listing blocks") require.Len(t, uuids, 1) diff --git a/tempodb/encoding/vparquet/block_findtracebyid_test.go b/tempodb/encoding/vparquet/block_findtracebyid_test.go index fb363a3d588..87ba89ea6f8 100644 --- a/tempodb/encoding/vparquet/block_findtracebyid_test.go +++ b/tempodb/encoding/vparquet/block_findtracebyid_test.go @@ -112,7 +112,7 @@ func TestBackendBlockFindTraceByID_TestData(t *testing.T) { r := backend.NewReader(rawR) ctx := context.Background() - blocks, err := r.Blocks(ctx, "single-tenant") + blocks, _, err := r.Blocks(ctx, "single-tenant") require.NoError(t, err) assert.Len(t, blocks, 1) diff --git a/tempodb/encoding/vparquet/block_iterator_test.go b/tempodb/encoding/vparquet/block_iterator_test.go index ecaafc76681..ea6eb88bc7b 100644 --- a/tempodb/encoding/vparquet/block_iterator_test.go +++ b/tempodb/encoding/vparquet/block_iterator_test.go @@ -19,7 +19,7 @@ func TestRawIteratorReadsAllRows(t *testing.T) { r := backend.NewReader(rawR) ctx := context.Background() - blocks, err := r.Blocks(ctx, "single-tenant") + blocks, _, err := r.Blocks(ctx, "single-tenant") require.NoError(t, err) require.Len(t, blocks, 1) diff --git a/tempodb/encoding/vparquet/readers_test.go b/tempodb/encoding/vparquet/readers_test.go index 82922898fdd..4e468bd869a 100644 --- a/tempodb/encoding/vparquet/readers_test.go +++ b/tempodb/encoding/vparquet/readers_test.go @@ -41,7 +41,7 @@ func TestParquetGoSetsMetadataSections(t *testing.T) { r := backend.NewReader(rawR) ctx := context.Background() - blocks, err := r.Blocks(ctx, tenantID) + blocks, _, err := r.Blocks(ctx, tenantID) require.NoError(t, err) require.Len(t, blocks, 1) @@ -97,7 +97,7 @@ func TestCachingReaderAt(t *testing.T) { r := backend.NewReader(rawR) ctx := context.Background() - blocks, err := r.Blocks(ctx, tenantID) + blocks, _, err := r.Blocks(ctx, tenantID) require.NoError(t, err) require.Len(t, blocks, 1) diff --git a/tempodb/encoding/vparquet2/block_findtracebyid_test.go b/tempodb/encoding/vparquet2/block_findtracebyid_test.go index 6f441d57f78..9f172721ce6 100644 --- a/tempodb/encoding/vparquet2/block_findtracebyid_test.go +++ b/tempodb/encoding/vparquet2/block_findtracebyid_test.go @@ -112,7 +112,7 @@ func TestBackendBlockFindTraceByID_TestData(t *testing.T) { r := backend.NewReader(rawR) ctx := context.Background() - blocks, err := r.Blocks(ctx, "single-tenant") + blocks, _, err := r.Blocks(ctx, "single-tenant") require.NoError(t, err) assert.Len(t, blocks, 1) diff --git a/tempodb/encoding/vparquet2/block_iterator_test.go b/tempodb/encoding/vparquet2/block_iterator_test.go index f19843f3a01..276e020ea72 100644 --- a/tempodb/encoding/vparquet2/block_iterator_test.go +++ b/tempodb/encoding/vparquet2/block_iterator_test.go @@ -19,7 +19,7 @@ func TestRawIteratorReadsAllRows(t *testing.T) { r := backend.NewReader(rawR) ctx := context.Background() - blocks, err := r.Blocks(ctx, "single-tenant") + blocks, _, err := r.Blocks(ctx, "single-tenant") require.NoError(t, err) require.Len(t, blocks, 1) diff --git a/tempodb/encoding/vparquet2/readers_test.go b/tempodb/encoding/vparquet2/readers_test.go index e06f02758b6..392d756bf09 100644 --- a/tempodb/encoding/vparquet2/readers_test.go +++ b/tempodb/encoding/vparquet2/readers_test.go @@ -41,7 +41,7 @@ func TestParquetGoSetsMetadataSections(t *testing.T) { r := backend.NewReader(rawR) ctx := context.Background() - blocks, err := r.Blocks(ctx, tenantID) + blocks, _, err := r.Blocks(ctx, tenantID) require.NoError(t, err) require.Len(t, blocks, 1) @@ -97,7 +97,7 @@ func TestCachingReaderAt(t *testing.T) { r := backend.NewReader(rawR) ctx := context.Background() - blocks, err := r.Blocks(ctx, tenantID) + blocks, _, err := r.Blocks(ctx, tenantID) require.NoError(t, err) require.Len(t, blocks, 1) diff --git a/tempodb/encoding/vparquet3/block_findtracebyid_test.go b/tempodb/encoding/vparquet3/block_findtracebyid_test.go index c284067b8f8..2109e4ba1b0 100644 --- a/tempodb/encoding/vparquet3/block_findtracebyid_test.go +++ b/tempodb/encoding/vparquet3/block_findtracebyid_test.go @@ -112,7 +112,7 @@ func TestBackendBlockFindTraceByID_TestData(t *testing.T) { r := backend.NewReader(rawR) ctx := context.Background() - blocks, err := r.Blocks(ctx, "single-tenant") + blocks, _, err := r.Blocks(ctx, "single-tenant") require.NoError(t, err) assert.Len(t, blocks, 1) diff --git a/tempodb/encoding/vparquet3/block_iterator_test.go b/tempodb/encoding/vparquet3/block_iterator_test.go index 1a6672d17f1..deef719bb09 100644 --- a/tempodb/encoding/vparquet3/block_iterator_test.go +++ b/tempodb/encoding/vparquet3/block_iterator_test.go @@ -19,7 +19,7 @@ func TestRawIteratorReadsAllRows(t *testing.T) { r := backend.NewReader(rawR) ctx := context.Background() - blocks, err := r.Blocks(ctx, "single-tenant") + blocks, _, err := r.Blocks(ctx, "single-tenant") require.NoError(t, err) require.Len(t, blocks, 1) diff --git a/tempodb/encoding/vparquet3/readers_test.go b/tempodb/encoding/vparquet3/readers_test.go index 7595cebc882..69640bb901e 100644 --- a/tempodb/encoding/vparquet3/readers_test.go +++ b/tempodb/encoding/vparquet3/readers_test.go @@ -41,7 +41,7 @@ func TestParquetGoSetsMetadataSections(t *testing.T) { r := backend.NewReader(rawR) ctx := context.Background() - blocks, err := r.Blocks(ctx, tenantID) + blocks, _, err := r.Blocks(ctx, tenantID) require.NoError(t, err) require.Len(t, blocks, 1) @@ -97,7 +97,7 @@ func TestCachingReaderAt(t *testing.T) { r := backend.NewReader(rawR) ctx := context.Background() - blocks, err := r.Blocks(ctx, tenantID) + blocks, _, err := r.Blocks(ctx, tenantID) require.NoError(t, err) require.Len(t, blocks, 1) diff --git a/tempodb/retention_test.go b/tempodb/retention_test.go index 3f0028592a6..20c15554192 100644 --- a/tempodb/retention_test.go +++ b/tempodb/retention_test.go @@ -51,7 +51,7 @@ func TestRetention(t *testing.T) { }, &mockSharder{}, &mockOverrides{}) require.NoError(t, err) - r.EnablePolling(&mockJobSharder{}) + r.EnablePolling(ctx, &mockJobSharder{}) blockID := uuid.New() @@ -105,7 +105,8 @@ func TestRetentionUpdatesBlocklistImmediately(t *testing.T) { }, log.NewNopLogger()) assert.NoError(t, err) - r.EnablePolling(&mockJobSharder{}) + ctx := context.Background() + r.EnablePolling(ctx, &mockJobSharder{}) err = c.EnableCompaction(context.Background(), &CompactorConfig{ ChunkSizeBytes: 10, @@ -123,7 +124,6 @@ func TestRetentionUpdatesBlocklistImmediately(t *testing.T) { head, err := wal.NewBlock(blockID, testTenantID, model.CurrentEncoding) assert.NoError(t, err) - ctx := context.Background() complete, err := w.CompleteBlock(ctx, head) assert.NoError(t, err) blockID = complete.BlockMeta().BlockID @@ -185,7 +185,7 @@ func TestBlockRetentionOverride(t *testing.T) { }, &mockSharder{}, overrides) require.NoError(t, err) - r.EnablePolling(&mockJobSharder{}) + r.EnablePolling(ctx, &mockJobSharder{}) cutTestBlocks(t, w, testTenantID, 10, 10) diff --git a/tempodb/tempodb.go b/tempodb/tempodb.go index 9b3af4d8002..8008697b504 100644 --- a/tempodb/tempodb.go +++ b/tempodb/tempodb.go @@ -78,7 +78,7 @@ type Reader interface { Search(ctx context.Context, meta *backend.BlockMeta, req *tempopb.SearchRequest, opts common.SearchOptions) (*tempopb.SearchResponse, error) Fetch(ctx context.Context, meta *backend.BlockMeta, req traceql.FetchSpansRequest, opts common.SearchOptions) (traceql.FetchSpansResponse, error) BlockMetas(tenantID string) []*backend.BlockMeta - EnablePolling(sharder blocklist.JobSharder) + EnablePolling(ctx context.Context, sharder blocklist.JobSharder) Shutdown() } @@ -407,7 +407,7 @@ func (rw *readerWriter) EnableCompaction(ctx context.Context, cfg *CompactorConf // EnablePolling activates the polling loop. Pass nil if this component // // should never be a tenant index builder. -func (rw *readerWriter) EnablePolling(sharder blocklist.JobSharder) { +func (rw *readerWriter) EnablePolling(ctx context.Context, sharder blocklist.JobSharder) { if sharder == nil { sharder = blocklist.OwnsNothingSharder } @@ -424,7 +424,7 @@ func (rw *readerWriter) EnablePolling(sharder blocklist.JobSharder) { rw.cfg.BlocklistPollTenantIndexBuilders = DefaultTenantIndexBuilders } - level.Info(rw.logger).Log("msg", "polling enabled", "interval", rw.cfg.BlocklistPoll, "concurrency", rw.cfg.BlocklistPollConcurrency) + level.Info(rw.logger).Log("msg", "polling enabled", "interval", rw.cfg.BlocklistPoll, "blocklist_concurrency", rw.cfg.BlocklistPollConcurrency) blocklistPoller := blocklist.NewPoller(&blocklist.PollerConfig{ PollConcurrency: rw.cfg.BlocklistPollConcurrency, @@ -441,20 +441,26 @@ func (rw *readerWriter) EnablePolling(sharder blocklist.JobSharder) { // that when this method returns the block list is updated rw.pollBlocklist() - go rw.pollingLoop() + go rw.pollingLoop(ctx) } -func (rw *readerWriter) pollingLoop() { +func (rw *readerWriter) pollingLoop(ctx context.Context) { ticker := time.NewTicker(rw.cfg.BlocklistPoll) - for range ticker.C { - rw.pollBlocklist() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + rw.pollBlocklist() + } } } func (rw *readerWriter) pollBlocklist() { - blocklist, compactedBlocklist, err := rw.blocklistPoller.Do() + blocklist, compactedBlocklist, err := rw.blocklistPoller.Do(rw.blocklist) if err != nil { - level.Error(rw.logger).Log("msg", "failed to poll blocklist. using previously polled lists", "err", err) + level.Error(rw.logger).Log("msg", "failed to poll blocklist", "err", err) return } diff --git a/tempodb/tempodb_search_test.go b/tempodb/tempodb_search_test.go index 585c41f2559..7ce887ba7f8 100644 --- a/tempodb/tempodb_search_test.go +++ b/tempodb/tempodb_search_test.go @@ -966,7 +966,8 @@ func runCompleteBlockSearchTest(t *testing.T, blockVersion string, runners ...ru }, &mockSharder{}, &mockOverrides{}) require.NoError(t, err) - r.EnablePolling(&mockJobSharder{}) + ctx := context.Background() + r.EnablePolling(ctx, &mockJobSharder{}) rw := r.(*readerWriter) wantID, wantTr, start, end, wantMeta, searchesThatMatch, searchesThatDontMatch := searchTestSuite() @@ -1395,7 +1396,7 @@ func TestWALBlockGetMetrics(t *testing.T) { }, &mockSharder{}, &mockOverrides{}) require.NoError(t, err) - r.EnablePolling(&mockJobSharder{}) + r.EnablePolling(ctx, &mockJobSharder{}) wal := w.WAL() head, err := wal.NewBlock(uuid.New(), testTenantID, model.CurrentEncoding) diff --git a/tempodb/tempodb_test.go b/tempodb/tempodb_test.go index 47f4b698088..1e59e8d1d3b 100644 --- a/tempodb/tempodb_test.go +++ b/tempodb/tempodb_test.go @@ -75,7 +75,7 @@ func TestDB(t *testing.T) { }, &mockSharder{}, &mockOverrides{}) require.NoError(t, err) - r.EnablePolling(&mockJobSharder{}) + r.EnablePolling(context.Background(), &mockJobSharder{}) blockID := uuid.New() @@ -126,7 +126,7 @@ func TestBlockSharding(t *testing.T) { // search with different shards and check if its respecting the params r, w, _, _ := testConfig(t, backend.EncLZ4_256k, 0) - r.EnablePolling(&mockJobSharder{}) + r.EnablePolling(context.Background(), &mockJobSharder{}) // create block with known ID blockID := uuid.New() @@ -190,7 +190,7 @@ func TestBlockCleanup(t *testing.T) { }, &mockSharder{}, &mockOverrides{}) require.NoError(t, err) - r.EnablePolling(&mockJobSharder{}) + r.EnablePolling(context.Background(), &mockJobSharder{}) blockID := uuid.New() @@ -520,7 +520,7 @@ func TestSearchCompactedBlocks(t *testing.T) { }, &mockSharder{}, &mockOverrides{}) require.NoError(t, err) - r.EnablePolling(&mockJobSharder{}) + r.EnablePolling(context.Background(), &mockJobSharder{}) wal := w.WAL()