Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Block v2: Paged indexes/data #577

Merged
merged 28 commits into from
Mar 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
6de5821
Added v2
joe-elliott Mar 3, 2021
585c268
Added v2 page reader
joe-elliott Mar 3, 2021
b334dce
simplified bytes counting test
joe-elliott Mar 3, 2021
ba1842f
possibly cleaned up or made pagereaders more complicated
joe-elliott Mar 3, 2021
5feac5e
paged index writer/reader:at
joe-elliott Mar 4, 2021
c0376a2
Cleaned up tests
joe-elliott Mar 4, 2021
647faf3
moved 'base' objects to a 'base' folder
joe-elliott Mar 4, 2021
124ff51
tests cleanup
joe-elliott Mar 5, 2021
2ebca6a
clean up
joe-elliott Mar 5, 2021
c17df00
Added header logic
joe-elliott Mar 5, 2021
9f39296
added page header
joe-elliott Mar 5, 2021
26d4f87
Added min/max ids and checksums to index header
joe-elliott Mar 5, 2021
5ebab60
cleanup
joe-elliott Mar 5, 2021
d8f84e4
note
joe-elliott Mar 5, 2021
1b9263a
Merge branch 'master' into v2-for-real
joe-elliott Mar 5, 2021
6ab23dc
Added binary record search
joe-elliott Mar 8, 2021
efa6db0
cleanup
joe-elliott Mar 8, 2021
ed5cf77
switched to xxhash
joe-elliott Mar 8, 2021
ad0ddcf
removed min/max ids
joe-elliott Mar 8, 2021
401145e
Merge branch 'master' into v2-for-real
joe-elliott Mar 8, 2021
ee94f9e
all tests pass
joe-elliott Mar 8, 2021
8c3cdd4
Added require
joe-elliott Mar 8, 2021
b455070
Added/adjusted defaults
joe-elliott Mar 8, 2021
04022e8
lint/casting
joe-elliott Mar 8, 2021
7e90318
changelog
joe-elliott Mar 8, 2021
1a90dd8
go mod
joe-elliott Mar 8, 2021
d5b7728
pageReader/Writer => dataReader/Writer
joe-elliott Mar 9, 2021
2448339
Added search with errors
joe-elliott Mar 10, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
## master / unreleased

* [CHANGE] Update to Go 1.16, latest OpenTelemetry proto definition and collector [#546](https://github.com/grafana/tempo/pull/546)
* [FEATURE] Add page based access to the index file. [#557](https://github.com/grafana/tempo/pull/557)
* [ENHANCEMENT] Add a Shutdown handler to flush data to backend, at "/shutdown". [#526](https://github.com/grafana/tempo/pull/526)
* [ENHANCEMENT] Queriers now query all (healthy) ingesters for a trace to mitigate 404s on ingester rollouts/scaleups.
This is a **breaking change** and will likely result in query errors on rollout as the query signature b/n QueryFrontend & Querier has changed. [#557](https://github.com/grafana/tempo/pull/557)
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
contrib.go.opencensus.io/exporter/prometheus v0.2.0
github.com/Azure/azure-storage-blob-go v0.8.0
github.com/alecthomas/kong v0.2.11
github.com/cespare/xxhash v1.1.0
github.com/cortexproject/cortex v1.6.1-0.20210205171041-527f9b58b93c
github.com/dustin/go-humanize v1.0.0
github.com/go-kit/kit v0.10.0
Expand Down
1 change: 1 addition & 0 deletions modules/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ func defaultIngester(t *testing.T, tmpDir string) (*Ingester, []*tempopb.Trace,
IndexDownsampleBytes: 2,
BloomFP: .01,
Encoding: backend.EncLZ4_1M,
IndexPageSizeBytes: 1000,
},
WAL: &wal.Config{
Filepath: tmpDir,
Expand Down
1 change: 1 addition & 0 deletions modules/ingester/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,7 @@ func defaultInstance(t assert.TestingT, tmpDir string) *instance {
IndexDownsampleBytes: 2,
BloomFP: .01,
Encoding: backend.EncLZ4_1M,
IndexPageSizeBytes: 1000,
},
WAL: &wal.Config{
Filepath: tmpDir,
Expand Down
1 change: 1 addition & 0 deletions modules/querier/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func TestReturnAllHits(t *testing.T) {
Encoding: backend.EncNone,
IndexDownsampleBytes: 10,
BloomFP: .05,
IndexPageSizeBytes: 1000,
},
WAL: &wal.Config{
Filepath: path.Join(tempDir, "wal"),
Expand Down
3 changes: 2 additions & 1 deletion modules/storage/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet)

cfg.Trace.Block = &encoding.BlockConfig{}
f.Float64Var(&cfg.Trace.Block.BloomFP, util.PrefixConfig(prefix, "trace.block.bloom-filter-false-positive"), .05, "Bloom False Positive.")
f.IntVar(&cfg.Trace.Block.IndexDownsampleBytes, util.PrefixConfig(prefix, "trace.block.index-downsample-bytes"), 2*1024*1024, "Number of bytes (before compression) per index record.")
f.IntVar(&cfg.Trace.Block.IndexDownsampleBytes, util.PrefixConfig(prefix, "trace.block.index-downsample-bytes"), 1024*1024, "Number of bytes (before compression) per index record.")
f.IntVar(&cfg.Trace.Block.IndexPageSizeBytes, util.PrefixConfig(prefix, "trace.block.index-page-size-bytes"), 250*1024, "Number of bytes per index page.")
cfg.Trace.Block.Encoding = backend.EncZstd

cfg.Trace.Azure = &azure.Config{}
Expand Down
24 changes: 24 additions & 0 deletions pkg/sort/search.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package sort

// SearchWithErrors is forked from https://golang.org/src/sort/search.go
// but with added support for errors
func SearchWithErrors(n int, f func(int) (bool, error)) (int, error) {
// Define f(-1) == false and f(n) == true.
// Invariant: f(i-1) == false, f(j) == true.
i, j := 0, n
for i < j {
h := int(uint(i+j) >> 1) // avoid overflow when computing h
// i ≤ h < j
b, e := f(h)
if e != nil {
return -1, e
}
if !b {
i = h + 1 // preserves f(i-1) == false
} else {
j = h // preserves f(j) == true
}
}
// i == j, f(i-1) == false, and f(j) (= f(i)) == true => answer is i.
return i, nil
}
2 changes: 2 additions & 0 deletions tempodb/backend/block_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ type BlockMeta struct {
Size uint64 `json:"size"`
CompactionLevel uint8 `json:"compactionLevel"`
Encoding Encoding `json:"encoding"`
IndexPageSize uint32 `json:"indexPageSize"`
TotalRecords uint32 `json:"totalRecords"`
}

func NewBlockMeta(tenantID string, blockID uuid.UUID, version string, encoding Encoding) *BlockMeta {
Expand Down
10 changes: 6 additions & 4 deletions tempodb/compactor_bookmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@ import (
"github.com/grafana/tempo/tempodb/encoding"
"github.com/grafana/tempo/tempodb/wal"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestCurrentClear(t *testing.T) {
tempDir, err := ioutil.TempDir("/tmp", "")
defer os.RemoveAll(tempDir)
assert.NoError(t, err, "unexpected error creating temp dir")
require.NoError(t, err, "unexpected error creating temp dir")

r, w, c, err := New(&Config{
Backend: "local",
Expand All @@ -35,13 +36,14 @@ func TestCurrentClear(t *testing.T) {
IndexDownsampleBytes: 17,
BloomFP: .01,
Encoding: backend.EncGZIP,
IndexPageSizeBytes: 1000,
},
WAL: &wal.Config{
Filepath: path.Join(tempDir, "wal"),
},
BlocklistPoll: 0,
}, log.NewNopLogger())
assert.NoError(t, err)
require.NoError(t, err)

c.EnableCompaction(&CompactorConfig{
ChunkSizeBytes: 10,
Expand All @@ -51,12 +53,12 @@ func TestCurrentClear(t *testing.T) {
}, &mockSharder{}, &mockOverrides{})

wal := w.WAL()
assert.NoError(t, err)
require.NoError(t, err)

recordCount := 10
blockID := uuid.New()
head, err := wal.NewBlock(blockID, testTenantID)
assert.NoError(t, err)
require.NoError(t, err)

for i := 0; i < recordCount; i++ {
id := make([]byte, 16)
Expand Down
14 changes: 8 additions & 6 deletions tempodb/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/golang/protobuf/proto"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/grafana/tempo/pkg/tempopb"
"github.com/grafana/tempo/pkg/util/test"
Expand Down Expand Up @@ -64,12 +65,14 @@ func TestCompaction(t *testing.T) {
IndexDownsampleBytes: 11,
BloomFP: .01,
Encoding: backend.EncLZ4_4M,
IndexPageSizeBytes: 1000,
},
WAL: &wal.Config{
Filepath: path.Join(tempDir, "wal"),
},
BlocklistPoll: 0,
}, log.NewNopLogger())
require.NoError(t, err)

c.EnableCompaction(&CompactorConfig{
ChunkSizeBytes: 10,
Expand Down Expand Up @@ -190,6 +193,7 @@ func TestSameIDCompaction(t *testing.T) {
IndexDownsampleBytes: 11,
BloomFP: .01,
Encoding: backend.EncSnappy,
IndexPageSizeBytes: 1000,
},
WAL: &wal.Config{
Filepath: path.Join(tempDir, "wal"),
Expand Down Expand Up @@ -277,6 +281,7 @@ func TestCompactionUpdatesBlocklist(t *testing.T) {
IndexDownsampleBytes: 11,
BloomFP: .01,
Encoding: backend.EncNone,
IndexPageSizeBytes: 1000,
},
WAL: &wal.Config{
Filepath: path.Join(tempDir, "wal"),
Expand Down Expand Up @@ -342,6 +347,7 @@ func TestCompactionMetrics(t *testing.T) {
IndexDownsampleBytes: 11,
BloomFP: .01,
Encoding: backend.EncNone,
IndexPageSizeBytes: 1000,
},
WAL: &wal.Config{
Filepath: path.Join(tempDir, "wal"),
Expand Down Expand Up @@ -390,12 +396,7 @@ func TestCompactionMetrics(t *testing.T) {

bytesEnd, err := test.GetCounterVecValue(metricCompactionBytesWritten, "0")
assert.NoError(t, err)
bytesPerRecord :=
4 /* total length */ +
4 /* id length */ +
16 /* id */ +
3 /* test record length */
assert.Equal(t, float64(blockCount*recordCount*bytesPerRecord), bytesEnd-bytesStart)
assert.Greater(t, bytesEnd, bytesStart) // calculating the exact bytes requires knowledge of the bytes as written in the blocks. just make sure it goes up
}

func TestCompactionIteratesThroughTenants(t *testing.T) {
Expand All @@ -416,6 +417,7 @@ func TestCompactionIteratesThroughTenants(t *testing.T) {
IndexDownsampleBytes: 11,
BloomFP: .01,
Encoding: backend.EncLZ4_64k,
IndexPageSizeBytes: 1000,
},
WAL: &wal.Config{
Filepath: path.Join(tempDir, "wal"),
Expand Down
12 changes: 6 additions & 6 deletions tempodb/encoding/appender.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,23 @@ type Appender interface {
}

type appender struct {
pageWriter common.PageWriter
dataWriter common.DataWriter
records []*common.Record
currentOffset uint64
}

// NewAppender returns an appender. This appender simply appends new objects
// to the provided pageWriter.
func NewAppender(pageWriter common.PageWriter) Appender {
// to the provided dataWriter.
func NewAppender(dataWriter common.DataWriter) Appender {
return &appender{
pageWriter: pageWriter,
dataWriter: dataWriter,
}
}

// Append appends the id/object to the writer. Note that the caller is giving up ownership of the two byte arrays backing the slices.
// Copies should be made and passed in if this is a problem
func (a *appender) Append(id common.ID, b []byte) error {
length, err := a.pageWriter.Write(id, b)
length, err := a.dataWriter.Write(id, b)
if err != nil {
return err
}
Expand Down Expand Up @@ -66,5 +66,5 @@ func (a *appender) DataLength() uint64 {
}

func (a *appender) Complete() error {
return a.pageWriter.Complete()
return a.dataWriter.Complete()
}
4 changes: 2 additions & 2 deletions tempodb/encoding/appender_buffered.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
// index
type bufferedAppender struct {
// output writer
writer common.PageWriter
writer common.DataWriter

// record keeping
records []*common.Record
Expand All @@ -23,7 +23,7 @@ type bufferedAppender struct {

// NewBufferedAppender returns an bufferedAppender. This appender builds a writes to
// the provided writer and also builds a downsampled records slice.
func NewBufferedAppender(writer common.PageWriter, indexDownsample int, totalObjectsEstimate int) (Appender, error) {
func NewBufferedAppender(writer common.DataWriter, indexDownsample int, totalObjectsEstimate int) (Appender, error) {
return &bufferedAppender{
writer: writer,
indexDownsampleBytes: indexDownsample,
Expand Down
18 changes: 10 additions & 8 deletions tempodb/encoding/backend_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ func NewBackendBlock(meta *backend.BlockMeta, r backend.Reader) (*BackendBlock,
encoding = v0Encoding{}
case "v1":
encoding = v1Encoding{}
case "v2":
encoding = v2Encoding{}
default:
return nil, fmt.Errorf("%s is not a valid block version", meta.Version)
}
Expand Down Expand Up @@ -74,20 +76,20 @@ func (b *BackendBlock) Find(ctx context.Context, id common.ID) ([]byte, error) {
}

indexReaderAt := backend.NewContextReader(b.meta, nameIndex, b.reader)
indexReader, err := b.encoding.newIndexReader(indexReaderAt)
indexReader, err := b.encoding.newIndexReader(indexReaderAt, int(b.meta.IndexPageSize), int(b.meta.TotalRecords))
if err != nil {
return nil, fmt.Errorf("error building index reader (%s, %s): %w", b.meta.TenantID, b.meta.BlockID, err)
}

ra := backend.NewContextReader(b.meta, nameObjects, b.reader)
pageReader, err := b.encoding.newPageReader(ra, b.meta.Encoding)
dataReader, err := b.encoding.newDataReader(ra, b.meta.Encoding)
if err != nil {
return nil, fmt.Errorf("error building page reader (%s, %s): %w", b.meta.TenantID, b.meta.BlockID, err)
}
defer pageReader.Close()
defer dataReader.Close()

// passing nil for objectCombiner here. this is fine b/c a backend block should never have dupes
finder := NewPagedFinder(indexReader, pageReader, nil)
finder := NewPagedFinder(indexReader, dataReader, nil)
objectBytes, err := finder.Find(ctx, id)

if err != nil {
Expand All @@ -101,16 +103,16 @@ func (b *BackendBlock) Find(ctx context.Context, id common.ID) ([]byte, error) {
func (b *BackendBlock) Iterator(chunkSizeBytes uint32) (Iterator, error) {
// read index
ra := backend.NewContextReader(b.meta, nameObjects, b.reader)
pageReader, err := b.encoding.newPageReader(ra, b.meta.Encoding)
dataReader, err := b.encoding.newDataReader(ra, b.meta.Encoding)
if err != nil {
return nil, fmt.Errorf("failed to create pageReader (%s, %s): %w", b.meta.TenantID, b.meta.BlockID, err)
return nil, fmt.Errorf("failed to create dataReader (%s, %s): %w", b.meta.TenantID, b.meta.BlockID, err)
}

indexReaderAt := backend.NewContextReader(b.meta, nameIndex, b.reader)
reader, err := b.encoding.newIndexReader(indexReaderAt)
reader, err := b.encoding.newIndexReader(indexReaderAt, int(b.meta.IndexPageSize), int(b.meta.TotalRecords))
if err != nil {
return nil, fmt.Errorf("failed to create index reader (%s, %s): %w", b.meta.TenantID, b.meta.BlockID, err)
}

return newPagedIterator(chunkSizeBytes, reader, pageReader), nil
return newPagedIterator(chunkSizeBytes, reader, dataReader), nil
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package v0
package base

import (
"encoding/binary"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package v0
package base

import (
"bytes"
Expand Down
Loading