Skip to content

Commit

Permalink
Index Downsample Bytes (#519)
Browse files Browse the repository at this point in the history
* indexdownsample => indexdownsamplebytes

Signed-off-by: Joe Elliott <[email protected]>

* tests passing

Signed-off-by: Joe Elliott <[email protected]>

* updated examples

Signed-off-by: Joe Elliott <[email protected]>

* changelog

Signed-off-by: Joe Elliott <[email protected]>

* fixed index config check

Signed-off-by: Joe Elliott <[email protected]>

* indexdownsample + bytes

Signed-off-by: Joe Elliott <[email protected]>
  • Loading branch information
joe-elliott authored Feb 10, 2021
1 parent 734bbf2 commit 42bd3bd
Show file tree
Hide file tree
Showing 21 changed files with 126 additions and 101 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@
* [ENHANCEMENT] Add S3 options region and forcepathstyle [#431](https://github.com/grafana/tempo/issues/431)
* [ENHANCEMENT] Add exhaustive search to combine traces from all blocks in the backend. [#489](https://github.com/grafana/tempo/pull/489)
* [ENHANCEMENT] Add per-tenant block retention [#77](https://github.com/grafana/tempo/issues/77)
* [ENHANCEMENT] Change index-downsample to index-downsample-bytes. This is a **breaking change** [#519](https://github.com/grafana/tempo/issues/519)
* [BUGFIX] Upgrade cortex dependency to v1.7.0-rc.0+ to address issue with forgetting ring membership [#442](https://github.com/grafana/tempo/pull/442) [#512](https://github.com/grafana/tempo/pull/512)
* [BUGFIX] No longer raise the `tempodb_blocklist_poll_errors_total` metric if a block doesn't have meta or compacted meta. [#481](https://github.com/grafana/tempo/pull/481)
* [BUGFIX] No longer raise the `tempodb_blocklist_poll_errors_total` metric if a block doesn't have meta or compacted meta. [#481](https://github.com/grafana/tempo/pull/481)]

## v0.5.0

Expand Down
2 changes: 1 addition & 1 deletion example/docker-compose/etc/tempo-azure.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ storage:
backend: azure # backend configuration to use
block:
bloom_filter_false_positive: .05 # bloom filter false positive rate. lower values create larger filters but fewer false positives
index_downsample: 10 # number of traces per index record
index_downsample_bytes: 1000 # number of bytes per index record
encoding: zstd # block encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd
wal:
path: /tmp/tempo/wal # where to store the the wal locally
Expand Down
2 changes: 1 addition & 1 deletion example/docker-compose/etc/tempo-gcs-fake.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ storage:
backend: gcs # backend configuration to use
block:
bloom_filter_false_positive: .05 # bloom filter false positive rate. lower values create larger filters but fewer false positives
index_downsample: 10 # number of traces per index record
index_downsample_bytes: 1000 # number of bytes per index record
encoding: zstd # block encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd
wal:
path: /tmp/tempo/wal # where to store the the wal locally
Expand Down
2 changes: 1 addition & 1 deletion example/docker-compose/etc/tempo-local.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ storage:
backend: local # backend configuration to use
block:
bloom_filter_false_positive: .05 # bloom filter false positive rate. lower values create larger filters but fewer false positives
index_downsample: 10 # number of traces per index record
index_downsample_bytes: 1000 # number of bytes per index record
encoding: zstd # block encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd
wal:
path: /tmp/tempo/wal # where to store the the wal locally
Expand Down
2 changes: 1 addition & 1 deletion example/docker-compose/etc/tempo-s3-minio.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ storage:
backend: s3 # backend configuration to use
block:
bloom_filter_false_positive: .05 # bloom filter false positive rate. lower values create larger filters but fewer false positives
index_downsample: 10 # number of traces per index record
index_downsample_bytes: 1000 # number of bytes per index record
encoding: zstd # block encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd
wal:
path: /tmp/tempo/wal # where to store the the wal locally
Expand Down
6 changes: 3 additions & 3 deletions modules/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,9 +191,9 @@ func defaultIngester(t *testing.T, tmpDir string) (*Ingester, []*tempopb.Trace,
Path: tmpDir,
},
Block: &encoding.BlockConfig{
IndexDownsample: 2,
BloomFP: .01,
Encoding: backend.EncLZ4_1M,
IndexDownsampleBytes: 2,
BloomFP: .01,
Encoding: backend.EncLZ4_1M,
},
WAL: &wal.Config{
Filepath: tmpDir,
Expand Down
6 changes: 3 additions & 3 deletions modules/ingester/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,9 +439,9 @@ func defaultInstance(t assert.TestingT, tmpDir string) *instance {
Path: tmpDir,
},
Block: &encoding.BlockConfig{
IndexDownsample: 2,
BloomFP: .01,
Encoding: backend.EncLZ4_1M,
IndexDownsampleBytes: 2,
BloomFP: .01,
Encoding: backend.EncLZ4_1M,
},
WAL: &wal.Config{
Filepath: tmpDir,
Expand Down
6 changes: 3 additions & 3 deletions modules/querier/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ func TestReturnAllHits(t *testing.T) {
Path: path.Join(tempDir, "traces"),
},
Block: &encoding.BlockConfig{
Encoding: backend.EncNone,
IndexDownsample: 10,
BloomFP: .05,
Encoding: backend.EncNone,
IndexDownsampleBytes: 10,
BloomFP: .05,
},
WAL: &wal.Config{
Filepath: path.Join(tempDir, "wal"),
Expand Down
2 changes: 1 addition & 1 deletion modules/storage/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet)

cfg.Trace.Block = &encoding.BlockConfig{}
f.Float64Var(&cfg.Trace.Block.BloomFP, util.PrefixConfig(prefix, "trace.block.bloom-filter-false-positive"), .05, "Bloom False Positive.")
f.IntVar(&cfg.Trace.Block.IndexDownsample, util.PrefixConfig(prefix, "trace.block.index-downsample"), 100, "Number of traces per index record.")
f.IntVar(&cfg.Trace.Block.IndexDownsampleBytes, util.PrefixConfig(prefix, "trace.block.index-downsample-bytes"), 2*1024*1024, "Number of bytes (before compression) per index record.")
cfg.Trace.Block.Encoding = backend.EncZstd

cfg.Trace.Azure = &azure.Config{}
Expand Down
6 changes: 3 additions & 3 deletions tempodb/compactor_bookmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ func TestCurrentClear(t *testing.T) {
Path: path.Join(tempDir, "traces"),
},
Block: &encoding.BlockConfig{
IndexDownsample: 17,
BloomFP: .01,
Encoding: backend.EncGZIP,
IndexDownsampleBytes: 17,
BloomFP: .01,
Encoding: backend.EncGZIP,
},
WAL: &wal.Config{
Filepath: path.Join(tempDir, "wal"),
Expand Down
30 changes: 15 additions & 15 deletions tempodb/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ func TestCompaction(t *testing.T) {
Path: path.Join(tempDir, "traces"),
},
Block: &encoding.BlockConfig{
IndexDownsample: 11,
BloomFP: .01,
Encoding: backend.EncLZ4_4M,
IndexDownsampleBytes: 11,
BloomFP: .01,
Encoding: backend.EncLZ4_4M,
},
WAL: &wal.Config{
Filepath: path.Join(tempDir, "wal"),
Expand Down Expand Up @@ -187,9 +187,9 @@ func TestSameIDCompaction(t *testing.T) {
Path: path.Join(tempDir, "traces"),
},
Block: &encoding.BlockConfig{
IndexDownsample: 11,
BloomFP: .01,
Encoding: backend.EncSnappy,
IndexDownsampleBytes: 11,
BloomFP: .01,
Encoding: backend.EncSnappy,
},
WAL: &wal.Config{
Filepath: path.Join(tempDir, "wal"),
Expand Down Expand Up @@ -274,9 +274,9 @@ func TestCompactionUpdatesBlocklist(t *testing.T) {
Path: path.Join(tempDir, "traces"),
},
Block: &encoding.BlockConfig{
IndexDownsample: 11,
BloomFP: .01,
Encoding: backend.EncNone,
IndexDownsampleBytes: 11,
BloomFP: .01,
Encoding: backend.EncNone,
},
WAL: &wal.Config{
Filepath: path.Join(tempDir, "wal"),
Expand Down Expand Up @@ -339,9 +339,9 @@ func TestCompactionMetrics(t *testing.T) {
Path: path.Join(tempDir, "traces"),
},
Block: &encoding.BlockConfig{
IndexDownsample: 11,
BloomFP: .01,
Encoding: backend.EncNone,
IndexDownsampleBytes: 11,
BloomFP: .01,
Encoding: backend.EncNone,
},
WAL: &wal.Config{
Filepath: path.Join(tempDir, "wal"),
Expand Down Expand Up @@ -413,9 +413,9 @@ func TestCompactionIteratesThroughTenants(t *testing.T) {
Path: path.Join(tempDir, "traces"),
},
Block: &encoding.BlockConfig{
IndexDownsample: 11,
BloomFP: .01,
Encoding: backend.EncLZ4_64k,
IndexDownsampleBytes: 11,
BloomFP: .01,
Encoding: backend.EncLZ4_64k,
},
WAL: &wal.Config{
Filepath: path.Join(tempDir, "wal"),
Expand Down
2 changes: 1 addition & 1 deletion tempodb/encoding/compactor_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func NewCompactorBlock(cfg *BlockConfig, id uuid.UUID, tenantID string, metas []

var err error
c.appendBuffer = &bytes.Buffer{}
c.appender, err = c.encoding.newBufferedAppender(c.appendBuffer, cfg.Encoding, cfg.IndexDownsample, estimatedObjects)
c.appender, err = c.encoding.newBufferedAppender(c.appendBuffer, cfg.Encoding, cfg.IndexDownsampleBytes, estimatedObjects)
if err != nil {
return nil, fmt.Errorf("failed to created appender: %w", err)
}
Expand Down
25 changes: 18 additions & 7 deletions tempodb/encoding/compactor_block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package encoding

import (
"bytes"
"math"
"math/rand"
"testing"
"time"
Expand All @@ -24,7 +23,7 @@ func TestCompactorBlockError(t *testing.T) {
}

func TestCompactorBlockAddObject(t *testing.T) {
indexDownsample := 3
indexDownsample := 500

metas := []*backend.BlockMeta{
{
Expand All @@ -39,15 +38,18 @@ func TestCompactorBlockAddObject(t *testing.T) {

numObjects := (rand.Int() % 20) + 1
cb, err := NewCompactorBlock(&BlockConfig{
BloomFP: .01,
IndexDownsample: indexDownsample,
Encoding: backend.EncGZIP,
BloomFP: .01,
IndexDownsampleBytes: indexDownsample,
Encoding: backend.EncGZIP,
}, uuid.New(), testTenantID, metas, numObjects)
assert.NoError(t, err)

var minID common.ID
var maxID common.ID

expectedRecords := 0
byteCounter := 0

ids := make([][]byte, 0)
for i := 0; i < numObjects; i++ {
id := make([]byte, 16)
Expand All @@ -63,13 +65,23 @@ func TestCompactorBlockAddObject(t *testing.T) {
err = cb.AddObject(id, object)
assert.NoError(t, err)

byteCounter += len(id) + len(object) + 4 + 4
if byteCounter > indexDownsample {
byteCounter = 0
expectedRecords++
}

if len(minID) == 0 || bytes.Compare(id, minID) == -1 {
minID = id
}
if len(maxID) == 0 || bytes.Compare(id, maxID) == 1 {
maxID = id
}
}
if byteCounter > 0 {
expectedRecords++
}

err = cb.appender.Complete()
assert.NoError(t, err)
assert.Equal(t, numObjects, cb.Length())
Expand All @@ -92,7 +104,6 @@ func TestCompactorBlockAddObject(t *testing.T) {
}

records := cb.appender.Records()
assert.Equal(t, math.Ceil(float64(numObjects)/float64(indexDownsample)), float64(len(records)))

assert.Equal(t, expectedRecords, len(records))
assert.Equal(t, numObjects, cb.CurrentBufferedObjects())
}
2 changes: 1 addition & 1 deletion tempodb/encoding/complete_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func NewCompleteBlock(cfg *BlockConfig, originatingMeta *backend.BlockMeta, iter
return nil, err
}

appender, err := c.encoding.newBufferedAppender(appendFile, cfg.Encoding, cfg.IndexDownsample, estimatedObjects)
appender, err := c.encoding.newBufferedAppender(appendFile, cfg.Encoding, cfg.IndexDownsampleBytes, estimatedObjects)
if err != nil {
return nil, err
}
Expand Down
35 changes: 24 additions & 11 deletions tempodb/encoding/complete_block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ func TestCompleteBlock(t *testing.T) {
require.NoError(t, err, "unexpected error creating temp dir")

block, ids, reqs := completeBlock(t, &BlockConfig{
IndexDownsample: 13,
BloomFP: .01,
Encoding: backend.EncGZIP,
IndexDownsampleBytes: 13,
BloomFP: .01,
Encoding: backend.EncGZIP,
}, tempDir)

// test Find
Expand Down Expand Up @@ -73,9 +73,9 @@ func TestCompleteBlockAll(t *testing.T) {
t.Run(enc.String(), func(t *testing.T) {
testCompleteBlockToBackendBlock(t,
&BlockConfig{
IndexDownsample: 13,
BloomFP: .01,
Encoding: enc,
IndexDownsampleBytes: 1000,
BloomFP: .01,
Encoding: enc,
},
)
})
Expand Down Expand Up @@ -183,14 +183,27 @@ func completeBlock(t *testing.T, cfg *BlockConfig, tempDir string) (*CompleteBlo
originatingMeta.StartTime = time.Now().Add(-5 * time.Minute)
originatingMeta.EndTime = time.Now().Add(5 * time.Minute)

// calc expected records
byteCounter := 0
expectedRecords := 0
for _, rec := range appender.Records() {
byteCounter += int(rec.Length)
if byteCounter > cfg.IndexDownsampleBytes {
byteCounter = 0
expectedRecords++
}
}
if byteCounter > 0 {
expectedRecords++
}

iterator := v0.NewRecordIterator(appender.Records(), bytes.NewReader(buffer.Bytes()))
block, err := NewCompleteBlock(cfg, originatingMeta, iterator, numMsgs, tempDir, "")
require.NoError(t, err, "unexpected error completing block")

// test downsample config
require.Equal(t, numMsgs/cfg.IndexDownsample+1, len(block.records))
require.Equal(t, expectedRecords, len(block.records))
require.True(t, block.FlushedTime().IsZero())

require.True(t, bytes.Equal(block.meta.MinID, minID))
require.True(t, bytes.Equal(block.meta.MaxID, maxID))
require.Equal(t, originatingMeta.StartTime, block.meta.StartTime)
Expand Down Expand Up @@ -269,9 +282,9 @@ func benchmarkCompressBlock(b *testing.B, encoding backend.Encoding, indexDownsa

originatingMeta := backend.NewBlockMeta(testTenantID, uuid.New(), "should_be_ignored", backend.EncGZIP)
cb, err := NewCompleteBlock(&BlockConfig{
IndexDownsample: indexDownsample,
BloomFP: .05,
Encoding: encoding,
IndexDownsampleBytes: indexDownsample,
BloomFP: .05,
Encoding: encoding,
}, originatingMeta, iterator, 10000, tempDir, "")
require.NoError(b, err, "error creating block")

Expand Down
10 changes: 5 additions & 5 deletions tempodb/encoding/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@ import (

// BlockConfig holds configuration options for newly created blocks
type BlockConfig struct {
IndexDownsample int `yaml:"index_downsample"`
BloomFP float64 `yaml:"bloom_filter_false_positive"`
Encoding backend.Encoding `yaml:"encoding"`
IndexDownsampleBytes int `yaml:"index_downsample_bytes"`
BloomFP float64 `yaml:"bloom_filter_false_positive"`
Encoding backend.Encoding `yaml:"encoding"`
}

// ValidateConfig returns true if the config is valid
func ValidateConfig(b *BlockConfig) error {
if b.IndexDownsample == 0 {
return fmt.Errorf("Non-zero index downsample required")
if b.IndexDownsampleBytes <= 0 {
return fmt.Errorf("Positive index downsample required")
}

if b.BloomFP <= 0.0 {
Expand Down
16 changes: 8 additions & 8 deletions tempodb/encoding/v0/appender_buffered.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,19 @@ type bufferedAppender struct {
writer io.Writer
records []*common.Record

totalObjects int
currentOffset uint64
currentRecord *common.Record
indexDownsample int
totalObjects int
currentOffset uint64
currentRecord *common.Record
indexDownsampleBytes int
}

// NewBufferedAppender returns an bufferedAppender. This appender builds a writes to
// the provided writer and also builds a downsampled records slice.
func NewBufferedAppender(writer io.Writer, indexDownsample int, totalObjectsEstimate int) common.Appender {
return &bufferedAppender{
writer: writer,
records: make([]*common.Record, 0, totalObjectsEstimate/indexDownsample+1),
indexDownsample: indexDownsample,
writer: writer,
records: make([]*common.Record, 0, totalObjectsEstimate/indexDownsample+1),
indexDownsampleBytes: indexDownsample,
}
}

Expand All @@ -45,7 +45,7 @@ func (a *bufferedAppender) Append(id common.ID, b []byte) error {
a.currentRecord.ID = id
a.currentRecord.Length += uint32(length)

if a.totalObjects%a.indexDownsample == 0 {
if int(a.currentRecord.Length) > a.indexDownsampleBytes {
a.records = append(a.records, a.currentRecord)
a.currentRecord = nil
}
Expand Down
Loading

0 comments on commit 42bd3bd

Please sign in to comment.