From e929275060ba55cca75c68214c8860531b32c110 Mon Sep 17 00:00:00 2001 From: Martin Disibio Date: Thu, 11 Mar 2021 15:50:59 -0500 Subject: [PATCH 1/2] Add new list compaction-summary command to tempo-cli --- cmd/tempo-cli/cmd-list-blocks.go | 76 +---------- cmd/tempo-cli/cmd-list-compactionsummary.go | 124 ++++++++++++++++++ cmd/tempo-cli/main.go | 67 +--------- cmd/tempo-cli/shared.go | 134 ++++++++++++++++++++ 4 files changed, 265 insertions(+), 136 deletions(-) create mode 100644 cmd/tempo-cli/cmd-list-compactionsummary.go create mode 100644 cmd/tempo-cli/shared.go diff --git a/cmd/tempo-cli/cmd-list-blocks.go b/cmd/tempo-cli/cmd-list-blocks.go index 1258668c323..da729ceebfe 100644 --- a/cmd/tempo-cli/cmd-list-blocks.go +++ b/cmd/tempo-cli/cmd-list-blocks.go @@ -1,24 +1,18 @@ package main import ( - "context" "fmt" "os" - "sort" "strconv" "time" "github.com/dustin/go-humanize" - "github.com/google/uuid" - "github.com/grafana/tempo/pkg/boundedwaitgroup" - tempodb_backend "github.com/grafana/tempo/tempodb/backend" "github.com/olekukonko/tablewriter" ) type listBlocksCmd struct { TenantID string `arg:"" help:"tenant-id within the bucket"` IncludeCompacted bool `help:"include compacted blocks"` - backendOptions } @@ -36,76 +30,8 @@ func (l *listBlocksCmd) Run(ctx *globalOptions) error { } displayResults(results, windowDuration, l.IncludeCompacted) - return nil -} - -type blockStats struct { - unifiedBlockMeta -} - -func loadBucket(r tempodb_backend.Reader, c tempodb_backend.Compactor, tenantID string, windowRange time.Duration, includeCompacted bool) ([]blockStats, error) { - blockIDs, err := r.Blocks(context.Background(), tenantID) - if err != nil { - return nil, err - } - fmt.Println("total blocks: ", len(blockIDs)) - - // Load in parallel - wg := boundedwaitgroup.New(10) - resultsCh := make(chan blockStats, len(blockIDs)) - - for _, id := range blockIDs { - wg.Add(1) - - go func(id2 uuid.UUID) { - defer wg.Done() - - b, err := loadBlock(r, c, tenantID, id2, windowRange, includeCompacted) - if err != nil { - fmt.Println("Error loading block:", id2, err) - return - } - - if b != nil { - resultsCh <- *b - } - }(id) - } - - wg.Wait() - close(resultsCh) - - results := make([]blockStats, 0) - for b := range resultsCh { - results = append(results, b) - } - - sort.Slice(results, func(i, j int) bool { - return results[i].end.Before(results[j].end) - }) - - return results, nil -} - -func loadBlock(r tempodb_backend.Reader, c tempodb_backend.Compactor, tenantID string, id uuid.UUID, windowRange time.Duration, includeCompacted bool) (*blockStats, error) { - fmt.Print(".") - - meta, err := r.BlockMeta(context.Background(), id, tenantID) - if err == tempodb_backend.ErrMetaDoesNotExist && !includeCompacted { - return nil, nil - } else if err != nil && err != tempodb_backend.ErrMetaDoesNotExist { - return nil, err - } - - compactedMeta, err := c.CompactedBlockMeta(id, tenantID) - if err != nil && err != tempodb_backend.ErrMetaDoesNotExist { - return nil, err - } - - return &blockStats{ - unifiedBlockMeta: getMeta(meta, compactedMeta, windowRange), - }, nil + return nil } func displayResults(results []blockStats, windowDuration time.Duration, includeCompacted bool) { diff --git a/cmd/tempo-cli/cmd-list-compactionsummary.go b/cmd/tempo-cli/cmd-list-compactionsummary.go new file mode 100644 index 00000000000..d019b20b742 --- /dev/null +++ b/cmd/tempo-cli/cmd-list-compactionsummary.go @@ -0,0 +1,124 @@ +package main + +import ( + "fmt" + "os" + "sort" + "strconv" + "time" + + "github.com/dustin/go-humanize" + "github.com/olekukonko/tablewriter" +) + +type listCompactionSummaryCmd struct { + TenantID string `arg:"" help:"tenant-id within the bucket"` + backendOptions +} + +func (l *listCompactionSummaryCmd) Run(ctx *globalOptions) error { + r, c, err := loadBackend(&l.backendOptions, ctx) + if err != nil { + return err + } + + windowDuration := time.Hour + + results, err := loadBucket(r, c, l.TenantID, windowDuration, false) + if err != nil { + return err + } + + displayCompactionSummary(results) + + return nil +} + +func displayCompactionSummary(results []blockStats) { + fmt.Println() + fmt.Println("Stats by compaction level:") + resultsByLevel := make(map[int][]blockStats) + var levels []int + for _, r := range results { + l := int(r.compactionLevel) + + s, ok := resultsByLevel[l] + if !ok { + s = make([]blockStats, 0) + levels = append(levels, l) + } + + s = append(s, r) + resultsByLevel[l] = s + } + + sort.Ints(levels) + + columns := []string{"lvl", "blocks", "total", "smallest block", "largest block", "earliest", "latest"} + + out := make([][]string, 0) + + for _, l := range levels { + sizeSum := uint64(0) + sizeMin := uint64(0) + sizeMax := uint64(0) + countSum := 0 + countMin := 0 + countMax := 0 + var newest time.Time + var oldest time.Time + for _, r := range resultsByLevel[l] { + sizeSum += r.size + countSum += r.objects + + if r.size < sizeMin || sizeMin == 0 { + sizeMin = r.size + } + if r.size > sizeMax { + sizeMax = r.size + } + if r.objects < countMin || countMin == 0 { + countMin = r.objects + } + if r.objects > countMax { + countMax = r.objects + } + if r.start.Before(oldest) || oldest.IsZero() { + oldest = r.start + } + if r.end.After(newest) { + newest = r.end + } + } + + line := make([]string, 0) + + for _, c := range columns { + s := "" + switch c { + case "lvl": + s = strconv.Itoa(l) + case "blocks": + s = fmt.Sprintf("%d (%d %%)", len(resultsByLevel[l]), len(resultsByLevel[l])*100/len(results)) + case "total": + s = fmt.Sprintf("%s objects (%s)", humanize.Comma(int64(countSum)), humanize.Bytes(sizeSum)) + case "smallest block": + s = fmt.Sprintf("%s objects (%s)", humanize.Comma(int64(countMin)), humanize.Bytes(sizeMin)) + case "largest block": + s = fmt.Sprintf("%s objects (%s)", humanize.Comma(int64(countMax)), humanize.Bytes(sizeMax)) + case "earliest": + s = fmt.Sprint(time.Since(oldest).Round(time.Second), " ago") + case "latest": + s = fmt.Sprint(time.Since(newest).Round(time.Second), " ago") + } + line = append(line, s) + } + out = append(out, line) + } + + fmt.Println() + w := tablewriter.NewWriter(os.Stdout) + w.SetHeader(columns) + w.AppendBulk(out) + w.Render() +} diff --git a/cmd/tempo-cli/main.go b/cmd/tempo-cli/main.go index a2b1f217c23..02846a9ba6d 100644 --- a/cmd/tempo-cli/main.go +++ b/cmd/tempo-cli/main.go @@ -4,12 +4,9 @@ import ( "flag" "fmt" "io/ioutil" - "time" - "github.com/google/uuid" "github.com/grafana/tempo/cmd/tempo/app" "github.com/grafana/tempo/tempodb/backend" - tempodb_backend "github.com/grafana/tempo/tempodb/backend" "github.com/grafana/tempo/tempodb/backend/local" "gopkg.in/yaml.v2" @@ -36,8 +33,9 @@ var cli struct { globalOptions List struct { - Block listBlockCmd `cmd:"" help:"List information about a block"` - Blocks listBlocksCmd `cmd:"" help:"List information about all blocks in a bucket"` + Block listBlockCmd `cmd:"" help:"List information about a block"` + Blocks listBlocksCmd `cmd:"" help:"List information about all blocks in a bucket"` + CompactionSummary listCompactionSummaryCmd `cmd:"" help:"List summary of data by compaction level"` } `cmd:""` Query queryCmd `cmd:"" help:"query tempo api"` @@ -54,7 +52,7 @@ func main() { ctx.FatalIfErrorf(err) } -func loadBackend(b *backendOptions, g *globalOptions) (tempodb_backend.Reader, tempodb_backend.Compactor, error) { +func loadBackend(b *backendOptions, g *globalOptions) (backend.Reader, backend.Compactor, error) { // Defaults cfg := app.Config{} cfg.RegisterFlagsAndApplyDefaults("", &flag.FlagSet{}) @@ -89,8 +87,8 @@ func loadBackend(b *backendOptions, g *globalOptions) (tempodb_backend.Reader, t } var err error - var r tempodb_backend.Reader - var c tempodb_backend.Compactor + var r backend.Reader + var c backend.Compactor switch cfg.StorageConfig.Trace.Backend { case "local": @@ -111,56 +109,3 @@ func loadBackend(b *backendOptions, g *globalOptions) (tempodb_backend.Reader, t return r, c, nil } - -type unifiedBlockMeta struct { - id uuid.UUID - compactionLevel uint8 - objects int - size uint64 - window int64 - start time.Time - end time.Time - compacted bool - version string - encoding string -} - -func getMeta(meta *backend.BlockMeta, compactedMeta *backend.CompactedBlockMeta, windowRange time.Duration) unifiedBlockMeta { - if meta != nil { - return unifiedBlockMeta{ - id: meta.BlockID, - compactionLevel: meta.CompactionLevel, - objects: meta.TotalObjects, - size: meta.Size, - window: meta.EndTime.Unix() / int64(windowRange/time.Second), - start: meta.StartTime, - end: meta.EndTime, - compacted: false, - version: meta.Version, - encoding: meta.Encoding.String(), - } - } - if compactedMeta != nil { - return unifiedBlockMeta{ - id: compactedMeta.BlockID, - compactionLevel: compactedMeta.CompactionLevel, - objects: compactedMeta.TotalObjects, - size: compactedMeta.Size, - window: compactedMeta.EndTime.Unix() / int64(windowRange/time.Second), - start: compactedMeta.StartTime, - end: compactedMeta.EndTime, - compacted: true, - version: compactedMeta.Version, - encoding: compactedMeta.Encoding.String(), - } - } - return unifiedBlockMeta{ - id: uuid.UUID{}, - compactionLevel: 0, - objects: -1, - window: -1, - start: time.Unix(0, 0), - end: time.Unix(0, 0), - compacted: false, - } -} diff --git a/cmd/tempo-cli/shared.go b/cmd/tempo-cli/shared.go new file mode 100644 index 00000000000..0a4327e44a1 --- /dev/null +++ b/cmd/tempo-cli/shared.go @@ -0,0 +1,134 @@ +package main + +import ( + "context" + "fmt" + "sort" + "time" + + "github.com/google/uuid" + "github.com/grafana/tempo/pkg/boundedwaitgroup" + "github.com/grafana/tempo/tempodb/backend" +) + +type unifiedBlockMeta struct { + id uuid.UUID + compactionLevel uint8 + objects int + size uint64 + window int64 + start time.Time + end time.Time + compacted bool + version string + encoding string +} + +func getMeta(meta *backend.BlockMeta, compactedMeta *backend.CompactedBlockMeta, windowRange time.Duration) unifiedBlockMeta { + if meta != nil { + return unifiedBlockMeta{ + id: meta.BlockID, + compactionLevel: meta.CompactionLevel, + objects: meta.TotalObjects, + size: meta.Size, + window: meta.EndTime.Unix() / int64(windowRange/time.Second), + start: meta.StartTime, + end: meta.EndTime, + compacted: false, + version: meta.Version, + encoding: meta.Encoding.String(), + } + } + if compactedMeta != nil { + return unifiedBlockMeta{ + id: compactedMeta.BlockID, + compactionLevel: compactedMeta.CompactionLevel, + objects: compactedMeta.TotalObjects, + size: compactedMeta.Size, + window: compactedMeta.EndTime.Unix() / int64(windowRange/time.Second), + start: compactedMeta.StartTime, + end: compactedMeta.EndTime, + compacted: true, + version: compactedMeta.Version, + encoding: compactedMeta.Encoding.String(), + } + } + return unifiedBlockMeta{ + id: uuid.UUID{}, + compactionLevel: 0, + objects: -1, + window: -1, + start: time.Unix(0, 0), + end: time.Unix(0, 0), + compacted: false, + } +} + +type blockStats struct { + unifiedBlockMeta +} + +func loadBucket(r backend.Reader, c backend.Compactor, tenantID string, windowRange time.Duration, includeCompacted bool) ([]blockStats, error) { + blockIDs, err := r.Blocks(context.Background(), tenantID) + if err != nil { + return nil, err + } + + fmt.Println("total blocks: ", len(blockIDs)) + + // Load in parallel + wg := boundedwaitgroup.New(10) + resultsCh := make(chan blockStats, len(blockIDs)) + + for _, id := range blockIDs { + wg.Add(1) + + go func(id2 uuid.UUID) { + defer wg.Done() + + b, err := loadBlock(r, c, tenantID, id2, windowRange, includeCompacted) + if err != nil { + fmt.Println("Error loading block:", id2, err) + return + } + + if b != nil { + resultsCh <- *b + } + }(id) + } + + wg.Wait() + close(resultsCh) + + results := make([]blockStats, 0) + for b := range resultsCh { + results = append(results, b) + } + + sort.Slice(results, func(i, j int) bool { + return results[i].end.Before(results[j].end) + }) + + return results, nil +} + +func loadBlock(r backend.Reader, c backend.Compactor, tenantID string, id uuid.UUID, windowRange time.Duration, includeCompacted bool) (*blockStats, error) { + fmt.Print(".") + + meta, err := r.BlockMeta(context.Background(), id, tenantID) + if err == backend.ErrMetaDoesNotExist && !includeCompacted { + return nil, nil + } else if err != nil && err != backend.ErrMetaDoesNotExist { + return nil, err + } + + compactedMeta, err := c.CompactedBlockMeta(id, tenantID) + if err != nil && err != backend.ErrMetaDoesNotExist { + return nil, err + } + + return &blockStats{ + unifiedBlockMeta: getMeta(meta, compactedMeta, windowRange), + }, nil +} From ff1132b15ffe3e24715419eb8d6317256b4431ab Mon Sep 17 00:00:00 2001 From: Martin Disibio Date: Thu, 11 Mar 2021 15:54:34 -0500 Subject: [PATCH 2/2] Update changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1093d5b055b..b6301515ec0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ * [ENHANCEMENT] Add a Shutdown handler to flush data to backend, at "/shutdown". [#526](https://github.com/grafana/tempo/pull/526) * [ENHANCEMENT] Queriers now query all (healthy) ingesters for a trace to mitigate 404s on ingester rollouts/scaleups. This is a **breaking change** and will likely result in query errors on rollout as the query signature b/n QueryFrontend & Querier has changed. [#557](https://github.com/grafana/tempo/pull/557) +* [ENHANCEMENT] Add list compaction-summary command to tempo-cli [#588](https://github.com/grafana/tempo/pull/588) * [BUGFIX] Fixes permissions errors on startup in GCS. [#554](https://github.com/grafana/tempo/pull/554) * [BUGFIX] Fixes error where Dell ECS cannot list objects. [#561](https://github.com/grafana/tempo/pull/561) * [BUGFIX] Fixes listing blocks in S3 when the list is truncated. [#567](https://github.com/grafana/tempo/pull/567)