Skip to content

Commit

Permalink
Implement polling improvements (#2652)
Browse files Browse the repository at this point in the history
* tempodb: update poller tests and mocks to support new work

* include Blocklist interface for testing

* tempodb: include quick poller

* Include QuickBlocks on tempodb

* Include notes about what is being implemented

* Add initial ListBlocks() stubs and implement s3

* Fix poller commits

* Lint fix

* Fix method for unimplemented test

* Add test coverage for previous compactions

* Fix method for unimplemented test

* Lint fix

* Drop some debug printing

* Drop unused variablea and update debug message

* Relocate allocations

* Invent fewer objects

* Lint fix

* Implement gcs ListBlocks()

* Tidy up s3.go

* Import fix

* Adjust list_test.go for eyeball parse

* Include notes after discoverying why

* Wrap a couple errors for clarity

* Modify error test for additional details

* Modify gcs backend for single character delimiter

* Add note about call change

* Add note about call change and implement listv2 for s3

* Adjust uuidInBlocklist to use existing allocations

* Add config and defaults for tenant polling concurrency

* Implement concurrent tenant polling

* Implement Find() on backend readers

* Add Find to MockRawReader

* Convert QuickBlocks over to use Find()

* Implement Find() on local/cache backend readers

* Drop ListBlocks in favor of Find

* Replace Blocks() with QuickBlocks() and update usage everywhere

* Fix local.Find for path handling

* Fix backend raw tests for Find mocks

* Reorder to reduce PR diff

* Relocate super-handy block boudary method

* Extend FindFunc to indicate Done

* Parallelize the Find() calls within Blocks()

* Fix race

* Fix boundedwaitgroup init in test

* Update backend implementations

* Make note of implemenation details

* Capture errors from tenant polling

* Add feature detection on backend readers

* Use backend.HasFeature() to determine if appropriate function

* Fix error handling in poller

* Pick lint

* Extend log message to include tenant_concurrency

* Add doc for HasFeature

* Local backend supports no features

* Pick lint

* Ensure we wait for results

* Avoid early return

* Improve error handling to include all errors

* Remove HasFeature() from RawReader interface after discussion

* Drop start argument from Find signature

* Include range calculation for sharded list calls

* Enable concurrent list calls for s3.Find()

* Include useful OwnsEverythingSharder

* Remove concurrency handling from tempodb.Blocks()

* Add docs for new config options

* Fix GCS e2e object path

* Clean up comments

* Include a test to ensure retained blocks are forgotten

* Begin reimplementing ListBlock() in RawReaders after discussion

* Reimplement ListBlocks() on the backends

* Fix ListBlocks() for gcs

* Drop Find and supporting code

* Drop unused method

* Back out proto change, since `main` also has this issue

* Update docs/sources/tempo/configuration/_index.md

Co-authored-by: Mario <[email protected]>

* Reimplement ListBlocks() for Azure

* Lint

* Rename config option for concurrent block list

* Relocate block boundary code to own package

* Relocate Blocklist interface to poller package

* Rely on error count for early exit condition

* Avoid polling additional tenants if we have already reached our error tollerance

* Improve tests

* Include initial benchmarks

* Update docs/sources/tempo/configuration/_index.md

Co-authored-by: Koenraad Verheyden <[email protected]>

* Update docs/sources/tempo/configuration/_index.md

Co-authored-by: Koenraad Verheyden <[email protected]>

* Use *List directly, drop Blocklist interface

* Use builtin clear() to avoid heap escape

* Adhere to new error usage

* Speed up benchmark setup

* Trade memory for time in large data sets

* Lock for race

* Update CLI handling for Blocks() call to include compacted results

* Add doc to mention the concurrency per-instance

* Tidy up backends a bit

* Implement concurrent ListBlocks() on gcs

* Clean up

* Include ListBlocksConcurrency config option for GCS

* Test for compacted migration

* Start tracing ListBlocks()

* Start tracing the poller

* Fix span attribute

* Sort the block list for test

* Use slice collection instead of channel collection for deadlock

* Revert tenant poll concurrency

* Pass context to the EnablePolling method for future use

* Handle context on pollingLoop

* Use attribtues not events for better traceql

* Drop last tenant concurrency reference

* Fix Do() revert to old code

* Update for context

* Update tracing attributes for poller

* Back out proto update

* Remove unused error

* Remove inacurate docs after refactor

* Clean up

* Include additional span attributes

* Drop unused

* Drop unused

* Address some feedback in the backend

* Skip compacted blocks on trace summary

* Include TODO about reducing the 404 on meta.json for compacted blocks

* Include integration test for poller

* Upgrade fake-gcs-server

* Use JSONRead in gcs client for fake-gcs-server when Endpoint is set

* Avoid early allocation of error for concurent operations

* Include EndOffset in gcs backend

* Include integration configs

* Fix for global min/max

* Include poller integration test in ci

* Preallocate for review comments

* Lock for all results and break early if error

* Include additional assertion for poller_test

* Adjust gcs and s3 locking

* Adjust ListBlocks interface to take a tenant string instead of a keypath

* Move OwnsEverythingSharder

* Remove some comments

* Ensure exclusive max unless global max

* Restructure for clarity

* Tidy up backends a bit

* Fix test call count after avoiding BlockMeta for known compacted blocks

* Update tests to include CompactedMeta call count

* Include error message when block UUID is below shard minimum

* Centralize backend.GlobalMaxBlockID

---------

Co-authored-by: Mario <[email protected]>
Co-authored-by: Koenraad Verheyden <[email protected]>
  • Loading branch information
3 people committed Oct 27, 2023
1 parent f297c50 commit 7e0e310
Show file tree
Hide file tree
Showing 57 changed files with 1,804 additions and 325 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ jobs:
- name: Test
run: make test-e2e

- name: Poller
run: make test-integration-poller

integration-tests-serverless:
name: Test serverless integration e2e suite
runs-on: ubuntu-latest
Expand Down
6 changes: 5 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,13 @@ test-e2e: tools docker-tempo docker-tempo-query
test-e2e-serverless: tools docker-tempo docker-serverless
$(GOTEST) -v $(GOTEST_OPT) ./integration/e2e/serverless

.PHONY: test-integration-poller
test-integration-poller: tools
$(GOTEST) -v $(GOTEST_OPT) ./integration/poller

# test-all/bench use a docker image so build it first to make sure we're up to date
.PHONY: test-all
test-all: test-with-cover test-e2e test-e2e-serverless
test-all: test-with-cover test-e2e test-e2e-serverless test-integration-poller

.PHONY: test-bench
test-bench: tools docker-tempo
Expand Down
2 changes: 1 addition & 1 deletion cmd/tempo-cli/cmd-analyse-blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func (cmd *analyseBlocksCmd) Run(ctx *globalOptions) error {
}

// TODO: Parallelize this
blocks, err := r.Blocks(context.Background(), cmd.TenantID)
blocks, _, err := r.Blocks(context.Background(), cmd.TenantID)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/tempo-cli/cmd-migrate-tenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (cmd *migrateTenantCmd) Run(opts *globalOptions) error {

// TODO create dest directory if it doesn't exist yet?

blocksDest, err := readerDest.Blocks(ctx, cmd.DestTenantID)
blocksDest, _, err := readerDest.Blocks(ctx, cmd.DestTenantID)
if err != nil {
return err
}
Expand Down
4 changes: 3 additions & 1 deletion cmd/tempo-cli/cmd-query-blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,15 @@ func (cmd *queryBlocksCmd) Run(ctx *globalOptions) error {
}

func queryBucket(ctx context.Context, r backend.Reader, c backend.Compactor, tenantID string, traceID common.ID) ([]queryResults, error) {
blockIDs, err := r.Blocks(context.Background(), tenantID)
blockIDs, compactedBlockIDs, err := r.Blocks(context.Background(), tenantID)
if err != nil {
return nil, err
}

fmt.Println("total blocks to search: ", len(blockIDs))

blockIDs = append(blockIDs, compactedBlockIDs...)

// Load in parallel
wg := boundedwaitgroup.New(100)
resultsCh := make(chan queryResults, len(blockIDs))
Expand Down
2 changes: 1 addition & 1 deletion cmd/tempo-cli/cmd-query-trace-summary.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func sortServiceNames(nameFrequencies map[string]int) PairList {
}

func queryBucketForSummary(ctx context.Context, percentage float32, r backend.Reader, c backend.Compactor, tenantID string, traceID common.ID) (*TraceSummary, error) {
blockIDs, err := r.Blocks(context.Background(), tenantID)
blockIDs, _, err := r.Blocks(context.Background(), tenantID)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/tempo-cli/cmd-search.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (cmd *searchBlocksCmd) Run(opts *globalOptions) error {

ctx := context.Background()

blockIDs, err := r.Blocks(ctx, cmd.TenantID)
blockIDs, _, err := r.Blocks(ctx, cmd.TenantID)
if err != nil {
return err
}
Expand Down
4 changes: 3 additions & 1 deletion cmd/tempo-cli/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,13 @@ type blockStats struct {
}

func loadBucket(r backend.Reader, c backend.Compactor, tenantID string, windowRange time.Duration, includeCompacted bool) ([]blockStats, error) {
blockIDs, err := r.Blocks(context.Background(), tenantID)
blockIDs, compactedBlockIDs, err := r.Blocks(context.Background(), tenantID)
if err != nil {
return nil, err
}

blockIDs = append(blockIDs, compactedBlockIDs...)

fmt.Println("total blocks: ", len(blockIDs))

// Load in parallel
Expand Down
5 changes: 3 additions & 2 deletions cmd/tempo/app/modules.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package app

import (
"context"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -295,7 +296,7 @@ func (t *App) initQuerier() (services.Service, error) {

// do not enable polling if this is the single binary. in that case the compactor will take care of polling
if t.cfg.Target == Querier {
t.store.EnablePolling(nil)
t.store.EnablePolling(context.Background(), nil)
}

ingesterRings := []ring.ReadRing{t.readRings[ringIngester]}
Expand Down Expand Up @@ -394,7 +395,7 @@ func (t *App) initQueryFrontend() (services.Service, error) {
t.Server.HTTP.Handle(addHTTPAPIPrefix(&t.cfg, api.PathSpanMetricsSummary), spanMetricsSummaryHandler)

// the query frontend needs to have knowledge of the blocks so it can shard search jobs
t.store.EnablePolling(nil)
t.store.EnablePolling(context.Background(), nil)

// http query echo endpoint
t.Server.HTTP.Handle(addHTTPAPIPrefix(&t.cfg, api.PathEcho), echoHandler())
Expand Down
5 changes: 5 additions & 0 deletions docs/sources/tempo/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -737,6 +737,11 @@ storage:
# Example: "endpoint: s3.dualstack.us-east-2.amazonaws.com"
[endpoint: <string>]

# The number of list calls to make in parallel to the backend per instance.
# Adjustments here will impact the polling time, as well as the number of Go routines.
# Default is 3
[list_blocks_concurrency: <int>]

# optional.
# By default the region is inferred from the endpoint,
# but is required for some S3-compatible storage engines.
Expand Down
2 changes: 1 addition & 1 deletion integration/e2e/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (

const (
azuriteImage = "mcr.microsoft.com/azure-storage/azurite"
gcsImage = "fsouza/fake-gcs-server:1.47.3"
gcsImage = "fsouza/fake-gcs-server:1.47.6"
)

func parsePort(endpoint string) (int, error) {
Expand Down
54 changes: 54 additions & 0 deletions integration/poller/config-azurite.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
target: all
stream_over_http_enabled: true

server:
http_listen_port: 3200

query_frontend:
search:
query_backend_after: 0 # setting these both to 0 will force all range searches to hit the backend
query_ingesters_until: 0

distributor:
receivers:
jaeger:
protocols:
grpc:

ingester:
lifecycler:
address: 127.0.0.1
ring:
kvstore:
store: inmemory
replication_factor: 1
final_sleep: 0s
trace_idle_period: 1s
max_block_duration: 5s
complete_block_timeout: 5s
flush_check_period: 1s

storage:
trace:
blocklist_poll: 1s
backend: azure
azure:
container_name: tempo # how to store data in azure
endpoint_suffix: tempo_e2e-azurite:10000
storage_account_name: "devstoreaccount1"
storage_account_key: "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=="
pool:
max_workers: 10
queue_depth: 100

overrides:
user_configurable_overrides:
enabled: true
poll_interval: 10s
client:
backend: azure
azure:
container_name: tempo
endpoint_suffix: tempo_e2e-azurite:10000
storage_account_name: "devstoreaccount1"
storage_account_key: "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=="
54 changes: 54 additions & 0 deletions integration/poller/config-gcs.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
target: all
stream_over_http_enabled: true

server:
http_listen_port: 3200

query_frontend:
search:
query_ingesters_until: 0 # setting these both to 0 will force all range searches to hit the backend
query_backend_after: 0

distributor:
receivers:
jaeger:
protocols:
grpc:

ingester:
lifecycler:
address: 127.0.0.1
ring:
kvstore:
store: inmemory
replication_factor: 1
final_sleep: 0s
trace_idle_period: 1s
max_block_duration: 5s
complete_block_timeout: 5s
flush_check_period: 1s

storage:
trace:
blocklist_poll: 1s
backend: gcs
gcs:
bucket_name: tempo
endpoint: https://tempo_e2e-gcs:4443/storage/v1/
insecure: true
pool:
max_workers: 10
queue_depth: 1000

overrides:
user_configurable_overrides:
enabled: true
poll_interval: 10s
client:
backend: gcs
# fsouza/fake-gcs-server does not support versioning
confirm_versioning: false
gcs:
bucket_name: tempo
endpoint: https://tempo_e2e-gcs:4443/storage/v1/
insecure: true
57 changes: 57 additions & 0 deletions integration/poller/config-s3.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
target: all
stream_over_http_enabled: true

server:
http_listen_port: 3200

query_frontend:
search:
query_backend_after: 0 # setting these both to 0 will force all range searches to hit the backend
query_ingesters_until: 0

distributor:
receivers:
jaeger:
protocols:
grpc:

ingester:
lifecycler:
address: 127.0.0.1
ring:
kvstore:
store: inmemory
replication_factor: 1
final_sleep: 0s
trace_idle_period: 1s
max_block_duration: 5s
complete_block_timeout: 5s
flush_check_period: 1s

storage:
trace:
blocklist_poll: 1s
backend: s3
s3:
bucket: tempo
endpoint: tempo-integration-minio-9000:9000 # TODO: this is brittle, fix this eventually
access_key: Cheescake # TODO: use cortex_e2e.MinioAccessKey
secret_key: supersecret # TODO: use cortex_e2e.MinioSecretKey
insecure: true
pool:
max_workers: 10
queue_depth: 100

overrides:
user_configurable_overrides:
enabled: true
poll_interval: 10s
client:
backend: s3
s3:
# TODO use separate bucket?
bucket: tempo
endpoint: tempo-integration-minio-9000:9000 # TODO: this is brittle, fix this eventually
access_key: Cheescake # TODO: use cortex_e2e.MinioAccessKey
secret_key: supersecret # TODO: use cortex_e2e.MinioSecretKey
insecure: true
Loading

0 comments on commit 7e0e310

Please sign in to comment.