Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Index Downsample Bytes #519

Merged
merged 6 commits into from
Feb 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@
* [ENHANCEMENT] Add S3 options region and forcepathstyle [#431](https://github.com/grafana/tempo/issues/431)
* [ENHANCEMENT] Add exhaustive search to combine traces from all blocks in the backend. [#489](https://github.com/grafana/tempo/pull/489)
* [ENHANCEMENT] Add per-tenant block retention [#77](https://github.com/grafana/tempo/issues/77)
* [ENHANCEMENT] Change index-downsample to index-downsample-bytes. This is a **breaking change** [#519](https://github.com/grafana/tempo/issues/519)
* [BUGFIX] Upgrade cortex dependency to v1.7.0-rc.0+ to address issue with forgetting ring membership [#442](https://github.com/grafana/tempo/pull/442) [#512](https://github.com/grafana/tempo/pull/512)
* [BUGFIX] No longer raise the `tempodb_blocklist_poll_errors_total` metric if a block doesn't have meta or compacted meta. [#481](https://github.com/grafana/tempo/pull/481)
* [BUGFIX] No longer raise the `tempodb_blocklist_poll_errors_total` metric if a block doesn't have meta or compacted meta. [#481](https://github.com/grafana/tempo/pull/481)]

## v0.5.0

Expand Down
2 changes: 1 addition & 1 deletion example/docker-compose/etc/tempo-azure.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ storage:
backend: azure # backend configuration to use
block:
bloom_filter_false_positive: .05 # bloom filter false positive rate. lower values create larger filters but fewer false positives
index_downsample: 10 # number of traces per index record
index_downsample_bytes: 1000 # number of bytes per index record
encoding: zstd # block encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd
wal:
path: /tmp/tempo/wal # where to store the the wal locally
Expand Down
2 changes: 1 addition & 1 deletion example/docker-compose/etc/tempo-gcs-fake.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ storage:
backend: gcs # backend configuration to use
block:
bloom_filter_false_positive: .05 # bloom filter false positive rate. lower values create larger filters but fewer false positives
index_downsample: 10 # number of traces per index record
index_downsample_bytes: 1000 # number of bytes per index record
encoding: zstd # block encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd
wal:
path: /tmp/tempo/wal # where to store the the wal locally
Expand Down
2 changes: 1 addition & 1 deletion example/docker-compose/etc/tempo-local.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ storage:
backend: local # backend configuration to use
block:
bloom_filter_false_positive: .05 # bloom filter false positive rate. lower values create larger filters but fewer false positives
index_downsample: 10 # number of traces per index record
index_downsample_bytes: 1000 # number of bytes per index record
encoding: zstd # block encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd
wal:
path: /tmp/tempo/wal # where to store the the wal locally
Expand Down
2 changes: 1 addition & 1 deletion example/docker-compose/etc/tempo-s3-minio.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ storage:
backend: s3 # backend configuration to use
block:
bloom_filter_false_positive: .05 # bloom filter false positive rate. lower values create larger filters but fewer false positives
index_downsample: 10 # number of traces per index record
index_downsample_bytes: 1000 # number of bytes per index record
encoding: zstd # block encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd
wal:
path: /tmp/tempo/wal # where to store the the wal locally
Expand Down
6 changes: 3 additions & 3 deletions modules/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,9 +191,9 @@ func defaultIngester(t *testing.T, tmpDir string) (*Ingester, []*tempopb.Trace,
Path: tmpDir,
},
Block: &encoding.BlockConfig{
IndexDownsample: 2,
BloomFP: .01,
Encoding: backend.EncLZ4_1M,
IndexDownsampleBytes: 2,
BloomFP: .01,
Encoding: backend.EncLZ4_1M,
},
WAL: &wal.Config{
Filepath: tmpDir,
Expand Down
6 changes: 3 additions & 3 deletions modules/ingester/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,9 +439,9 @@ func defaultInstance(t assert.TestingT, tmpDir string) *instance {
Path: tmpDir,
},
Block: &encoding.BlockConfig{
IndexDownsample: 2,
BloomFP: .01,
Encoding: backend.EncLZ4_1M,
IndexDownsampleBytes: 2,
BloomFP: .01,
Encoding: backend.EncLZ4_1M,
},
WAL: &wal.Config{
Filepath: tmpDir,
Expand Down
6 changes: 3 additions & 3 deletions modules/querier/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ func TestReturnAllHits(t *testing.T) {
Path: path.Join(tempDir, "traces"),
},
Block: &encoding.BlockConfig{
Encoding: backend.EncNone,
IndexDownsample: 10,
BloomFP: .05,
Encoding: backend.EncNone,
IndexDownsampleBytes: 10,
BloomFP: .05,
},
WAL: &wal.Config{
Filepath: path.Join(tempDir, "wal"),
Expand Down
2 changes: 1 addition & 1 deletion modules/storage/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet)

cfg.Trace.Block = &encoding.BlockConfig{}
f.Float64Var(&cfg.Trace.Block.BloomFP, util.PrefixConfig(prefix, "trace.block.bloom-filter-false-positive"), .05, "Bloom False Positive.")
f.IntVar(&cfg.Trace.Block.IndexDownsample, util.PrefixConfig(prefix, "trace.block.index-downsample"), 100, "Number of traces per index record.")
f.IntVar(&cfg.Trace.Block.IndexDownsampleBytes, util.PrefixConfig(prefix, "trace.block.index-downsample-bytes"), 2*1024*1024, "Number of bytes (before compression) per index record.")
cfg.Trace.Block.Encoding = backend.EncZstd

cfg.Trace.Azure = &azure.Config{}
Expand Down
6 changes: 3 additions & 3 deletions tempodb/compactor_bookmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ func TestCurrentClear(t *testing.T) {
Path: path.Join(tempDir, "traces"),
},
Block: &encoding.BlockConfig{
IndexDownsample: 17,
BloomFP: .01,
Encoding: backend.EncGZIP,
IndexDownsampleBytes: 17,
BloomFP: .01,
Encoding: backend.EncGZIP,
},
WAL: &wal.Config{
Filepath: path.Join(tempDir, "wal"),
Expand Down
30 changes: 15 additions & 15 deletions tempodb/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ func TestCompaction(t *testing.T) {
Path: path.Join(tempDir, "traces"),
},
Block: &encoding.BlockConfig{
IndexDownsample: 11,
BloomFP: .01,
Encoding: backend.EncLZ4_4M,
IndexDownsampleBytes: 11,
BloomFP: .01,
Encoding: backend.EncLZ4_4M,
},
WAL: &wal.Config{
Filepath: path.Join(tempDir, "wal"),
Expand Down Expand Up @@ -187,9 +187,9 @@ func TestSameIDCompaction(t *testing.T) {
Path: path.Join(tempDir, "traces"),
},
Block: &encoding.BlockConfig{
IndexDownsample: 11,
BloomFP: .01,
Encoding: backend.EncSnappy,
IndexDownsampleBytes: 11,
BloomFP: .01,
Encoding: backend.EncSnappy,
},
WAL: &wal.Config{
Filepath: path.Join(tempDir, "wal"),
Expand Down Expand Up @@ -274,9 +274,9 @@ func TestCompactionUpdatesBlocklist(t *testing.T) {
Path: path.Join(tempDir, "traces"),
},
Block: &encoding.BlockConfig{
IndexDownsample: 11,
BloomFP: .01,
Encoding: backend.EncNone,
IndexDownsampleBytes: 11,
BloomFP: .01,
Encoding: backend.EncNone,
},
WAL: &wal.Config{
Filepath: path.Join(tempDir, "wal"),
Expand Down Expand Up @@ -339,9 +339,9 @@ func TestCompactionMetrics(t *testing.T) {
Path: path.Join(tempDir, "traces"),
},
Block: &encoding.BlockConfig{
IndexDownsample: 11,
BloomFP: .01,
Encoding: backend.EncNone,
IndexDownsampleBytes: 11,
BloomFP: .01,
Encoding: backend.EncNone,
},
WAL: &wal.Config{
Filepath: path.Join(tempDir, "wal"),
Expand Down Expand Up @@ -413,9 +413,9 @@ func TestCompactionIteratesThroughTenants(t *testing.T) {
Path: path.Join(tempDir, "traces"),
},
Block: &encoding.BlockConfig{
IndexDownsample: 11,
BloomFP: .01,
Encoding: backend.EncLZ4_64k,
IndexDownsampleBytes: 11,
BloomFP: .01,
Encoding: backend.EncLZ4_64k,
},
WAL: &wal.Config{
Filepath: path.Join(tempDir, "wal"),
Expand Down
2 changes: 1 addition & 1 deletion tempodb/encoding/compactor_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func NewCompactorBlock(cfg *BlockConfig, id uuid.UUID, tenantID string, metas []

var err error
c.appendBuffer = &bytes.Buffer{}
c.appender, err = c.encoding.newBufferedAppender(c.appendBuffer, cfg.Encoding, cfg.IndexDownsample, estimatedObjects)
c.appender, err = c.encoding.newBufferedAppender(c.appendBuffer, cfg.Encoding, cfg.IndexDownsampleBytes, estimatedObjects)
if err != nil {
return nil, fmt.Errorf("failed to created appender: %w", err)
}
Expand Down
25 changes: 18 additions & 7 deletions tempodb/encoding/compactor_block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package encoding

import (
"bytes"
"math"
"math/rand"
"testing"
"time"
Expand All @@ -24,7 +23,7 @@ func TestCompactorBlockError(t *testing.T) {
}

func TestCompactorBlockAddObject(t *testing.T) {
indexDownsample := 3
indexDownsample := 500

metas := []*backend.BlockMeta{
{
Expand All @@ -39,15 +38,18 @@ func TestCompactorBlockAddObject(t *testing.T) {

numObjects := (rand.Int() % 20) + 1
cb, err := NewCompactorBlock(&BlockConfig{
BloomFP: .01,
IndexDownsample: indexDownsample,
Encoding: backend.EncGZIP,
BloomFP: .01,
IndexDownsampleBytes: indexDownsample,
Encoding: backend.EncGZIP,
}, uuid.New(), testTenantID, metas, numObjects)
assert.NoError(t, err)

var minID common.ID
var maxID common.ID

expectedRecords := 0
byteCounter := 0

ids := make([][]byte, 0)
for i := 0; i < numObjects; i++ {
id := make([]byte, 16)
Expand All @@ -63,13 +65,23 @@ func TestCompactorBlockAddObject(t *testing.T) {
err = cb.AddObject(id, object)
assert.NoError(t, err)

byteCounter += len(id) + len(object) + 4 + 4
if byteCounter > indexDownsample {
byteCounter = 0
expectedRecords++
}

if len(minID) == 0 || bytes.Compare(id, minID) == -1 {
minID = id
}
if len(maxID) == 0 || bytes.Compare(id, maxID) == 1 {
maxID = id
}
}
if byteCounter > 0 {
expectedRecords++
}

err = cb.appender.Complete()
assert.NoError(t, err)
assert.Equal(t, numObjects, cb.Length())
Expand All @@ -92,7 +104,6 @@ func TestCompactorBlockAddObject(t *testing.T) {
}

records := cb.appender.Records()
assert.Equal(t, math.Ceil(float64(numObjects)/float64(indexDownsample)), float64(len(records)))

assert.Equal(t, expectedRecords, len(records))
assert.Equal(t, numObjects, cb.CurrentBufferedObjects())
}
2 changes: 1 addition & 1 deletion tempodb/encoding/complete_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func NewCompleteBlock(cfg *BlockConfig, originatingMeta *backend.BlockMeta, iter
return nil, err
}

appender, err := c.encoding.newBufferedAppender(appendFile, cfg.Encoding, cfg.IndexDownsample, estimatedObjects)
appender, err := c.encoding.newBufferedAppender(appendFile, cfg.Encoding, cfg.IndexDownsampleBytes, estimatedObjects)
if err != nil {
return nil, err
}
Expand Down
35 changes: 24 additions & 11 deletions tempodb/encoding/complete_block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ func TestCompleteBlock(t *testing.T) {
require.NoError(t, err, "unexpected error creating temp dir")

block, ids, reqs := completeBlock(t, &BlockConfig{
IndexDownsample: 13,
BloomFP: .01,
Encoding: backend.EncGZIP,
IndexDownsampleBytes: 13,
BloomFP: .01,
Encoding: backend.EncGZIP,
}, tempDir)

// test Find
Expand Down Expand Up @@ -73,9 +73,9 @@ func TestCompleteBlockAll(t *testing.T) {
t.Run(enc.String(), func(t *testing.T) {
testCompleteBlockToBackendBlock(t,
&BlockConfig{
IndexDownsample: 13,
BloomFP: .01,
Encoding: enc,
IndexDownsampleBytes: 1000,
BloomFP: .01,
Encoding: enc,
},
)
})
Expand Down Expand Up @@ -183,14 +183,27 @@ func completeBlock(t *testing.T, cfg *BlockConfig, tempDir string) (*CompleteBlo
originatingMeta.StartTime = time.Now().Add(-5 * time.Minute)
originatingMeta.EndTime = time.Now().Add(5 * time.Minute)

// calc expected records
byteCounter := 0
expectedRecords := 0
for _, rec := range appender.Records() {
byteCounter += int(rec.Length)
if byteCounter > cfg.IndexDownsampleBytes {
byteCounter = 0
expectedRecords++
}
}
if byteCounter > 0 {
expectedRecords++
}

iterator := v0.NewRecordIterator(appender.Records(), bytes.NewReader(buffer.Bytes()))
block, err := NewCompleteBlock(cfg, originatingMeta, iterator, numMsgs, tempDir, "")
require.NoError(t, err, "unexpected error completing block")

// test downsample config
require.Equal(t, numMsgs/cfg.IndexDownsample+1, len(block.records))
require.Equal(t, expectedRecords, len(block.records))
require.True(t, block.FlushedTime().IsZero())

require.True(t, bytes.Equal(block.meta.MinID, minID))
require.True(t, bytes.Equal(block.meta.MaxID, maxID))
require.Equal(t, originatingMeta.StartTime, block.meta.StartTime)
Expand Down Expand Up @@ -269,9 +282,9 @@ func benchmarkCompressBlock(b *testing.B, encoding backend.Encoding, indexDownsa

originatingMeta := backend.NewBlockMeta(testTenantID, uuid.New(), "should_be_ignored", backend.EncGZIP)
cb, err := NewCompleteBlock(&BlockConfig{
IndexDownsample: indexDownsample,
BloomFP: .05,
Encoding: encoding,
IndexDownsampleBytes: indexDownsample,
BloomFP: .05,
Encoding: encoding,
}, originatingMeta, iterator, 10000, tempDir, "")
require.NoError(b, err, "error creating block")

Expand Down
10 changes: 5 additions & 5 deletions tempodb/encoding/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@ import (

// BlockConfig holds configuration options for newly created blocks
type BlockConfig struct {
IndexDownsample int `yaml:"index_downsample"`
BloomFP float64 `yaml:"bloom_filter_false_positive"`
Encoding backend.Encoding `yaml:"encoding"`
IndexDownsampleBytes int `yaml:"index_downsample_bytes"`
BloomFP float64 `yaml:"bloom_filter_false_positive"`
Encoding backend.Encoding `yaml:"encoding"`
}

// ValidateConfig returns true if the config is valid
func ValidateConfig(b *BlockConfig) error {
if b.IndexDownsample == 0 {
return fmt.Errorf("Non-zero index downsample required")
if b.IndexDownsampleBytes <= 0 {
return fmt.Errorf("Positive index downsample required")
}

if b.BloomFP <= 0.0 {
Expand Down
16 changes: 8 additions & 8 deletions tempodb/encoding/v0/appender_buffered.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,19 @@ type bufferedAppender struct {
writer io.Writer
records []*common.Record

totalObjects int
currentOffset uint64
currentRecord *common.Record
indexDownsample int
totalObjects int
currentOffset uint64
currentRecord *common.Record
indexDownsampleBytes int
}

// NewBufferedAppender returns an bufferedAppender. This appender builds a writes to
// the provided writer and also builds a downsampled records slice.
func NewBufferedAppender(writer io.Writer, indexDownsample int, totalObjectsEstimate int) common.Appender {
return &bufferedAppender{
writer: writer,
records: make([]*common.Record, 0, totalObjectsEstimate/indexDownsample+1),
indexDownsample: indexDownsample,
writer: writer,
records: make([]*common.Record, 0, totalObjectsEstimate/indexDownsample+1),
indexDownsampleBytes: indexDownsample,
}
}

Expand All @@ -45,7 +45,7 @@ func (a *bufferedAppender) Append(id common.ID, b []byte) error {
a.currentRecord.ID = id
a.currentRecord.Length += uint32(length)

if a.totalObjects%a.indexDownsample == 0 {
if int(a.currentRecord.Length) > a.indexDownsampleBytes {
a.records = append(a.records, a.currentRecord)
a.currentRecord = nil
}
Expand Down
Loading