Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(wal): Benchmark and improve WAL writes using Reset. #13272

Merged
merged 34 commits into from
Jun 24, 2024
Merged
Show file tree
Hide file tree
Changes from 33 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
0e5aa15
wip
cyriltovena May 16, 2024
2ef5c3c
wip
cyriltovena May 16, 2024
d68a08d
wip
cyriltovena May 28, 2024
144bb9c
add some doc and vision
cyriltovena May 30, 2024
5f8cf08
move compressed len to chunk
cyriltovena May 30, 2024
f32c755
work on the chunk encoding
cyriltovena May 31, 2024
19bbd76
missing changes
cyriltovena May 31, 2024
9e1d5b1
working on fixes and tests
cyriltovena May 31, 2024
c8b792f
add more tests and found a bug with dod
cyriltovena Jun 2, 2024
749acf7
fix(wal): Use varint encoding for ts_2_dod in WAL format
cyriltovena Jun 3, 2024
7590f55
refactor(wal): Remove unnecessary code in writeChunk function
cyriltovena Jun 3, 2024
7991408
chore: Refactor ChunkReader to improve performance and memory usage
cyriltovena Jun 3, 2024
38fcad4
chore: Add more realistic tests and benchmarks
cyriltovena Jun 3, 2024
bdf389f
refactor: Update index writer to support in memory buffer.
cyriltovena Jun 5, 2024
296daee
pausing work I need a new index different than the current work
cyriltovena Jun 7, 2024
d1cfcae
Add a special in memory index for the wal package
cyriltovena Jun 10, 2024
37ea6d6
Finalize writing and start reading index
cyriltovena Jun 10, 2024
d649646
Add offset/start to chunk ref
cyriltovena Jun 10, 2024
fd1dbd8
wip
cyriltovena Jun 13, 2024
b49d2ba
refactor(wal): Implement SeriesIter.
cyriltovena Jun 16, 2024
f575efb
fix(wal): Fixes snappy block offsets counting.
cyriltovena Jun 17, 2024
071ee04
chore: update format doc to reflect latest changes
cyriltovena Jun 18, 2024
6227361
chore: lint
cyriltovena Jun 18, 2024
f625252
refactor: Removes changes not required.
cyriltovena Jun 18, 2024
32b1d2c
chore: format
cyriltovena Jun 18, 2024
d3f179e
feat(wal): Add sizing information to writer and reader.
cyriltovena Jun 19, 2024
57ad53b
Merge remote-tracking branch 'upstream/main' into wal-sizing
cyriltovena Jun 19, 2024
d7dc2b1
Merge remote-tracking branch 'upstream/main' into wal-sizing
cyriltovena Jun 19, 2024
95c1015
lint
cyriltovena Jun 20, 2024
b6ba673
ensure stable test
cyriltovena Jun 20, 2024
3700f3e
feat(wal): Benchmark and improve WAL writes using Reset.
cyriltovena Jun 20, 2024
ad52803
lint
cyriltovena Jun 20, 2024
6f9489c
Review feedback
cyriltovena Jun 20, 2024
b6263ef
Merge remote-tracking branch 'origin/main' into wal-write-benchmark
benclive Jun 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions pkg/pattern/iter/batch_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package iter

import (
"sort"
"testing"

"github.com/go-kit/log"
Expand Down Expand Up @@ -211,6 +212,12 @@ func TestReadMetricsBatch(t *testing.T) {
it := NewSumMergeSampleIterator(tt.seriesIter)
got, err := ReadMetricsBatch(it, tt.batchSize, log.NewNopLogger())
require.NoError(t, err)
sort.Slice(tt.expected.Series, func(i, j int) bool {
return tt.expected.Series[i].Labels < tt.expected.Series[j].Labels
})
sort.Slice(got.Series, func(i, j int) bool {
return got.Series[i].Labels < got.Series[j].Labels
})
require.Equal(t, tt.expected.Series, got.Series)
})
}
Expand Down
70 changes: 7 additions & 63 deletions pkg/storage/wal/chunks/chunks_test.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,16 @@
package chunks

import (
"bufio"
"bytes"
"fmt"
"os"
"path/filepath"
"strconv"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/storage/wal/testdata"
)

func TestChunkReaderWriter(t *testing.T) {
Expand Down Expand Up @@ -121,11 +119,11 @@ func TestChunkReaderWriter(t *testing.T) {
}

func TestChunkReaderWriterWithLogGenerator(t *testing.T) {
filenames := testDataFile()
filenames := testdata.Files()

for _, filename := range filenames {
t.Run(filename, func(t *testing.T) {
gen := newLogGenerator(t, filename)
gen := testdata.NewLogGenerator(t, filename)
defer gen.Close()

var entries []*logproto.Entry
Expand Down Expand Up @@ -196,10 +194,10 @@ var (

// Benchmark reads with log generator
func BenchmarkReadChunkWithLogGenerator(b *testing.B) {
filenames := testDataFile()
filenames := testdata.Files()
for _, filename := range filenames {
b.Run(filename, func(b *testing.B) {
gen := newLogGenerator(b, filename)
gen := testdata.NewLogGenerator(b, filename)
defer gen.Close()

var entries []*logproto.Entry
Expand Down Expand Up @@ -239,12 +237,12 @@ func BenchmarkReadChunkWithLogGenerator(b *testing.B) {

// Benchmark with log generator
func BenchmarkWriteChunkWithLogGenerator(b *testing.B) {
filenames := testDataFile()
filenames := testdata.Files()

for _, filename := range filenames {
for _, count := range []int{1000, 10000, 100000} {
b.Run(fmt.Sprintf("%s-%d", filename, count), func(b *testing.B) {
gen := newLogGenerator(b, filename)
gen := testdata.NewLogGenerator(b, filename)
defer gen.Close()

var entries []*logproto.Entry
Expand Down Expand Up @@ -278,24 +276,6 @@ func BenchmarkWriteChunkWithLogGenerator(b *testing.B) {
}
}

func testDataFile() []string {
testdataDir := "../testdata"
files, err := os.ReadDir(testdataDir)
if err != nil {
panic(err)
}

var fileNames []string
for _, file := range files {
if !file.IsDir() {
filePath := filepath.Join(testdataDir, file.Name())
fileNames = append(fileNames, filePath)
}
}

return fileNames
}

// generateLogEntries generates a slice of logproto.Entry with the given count.
func generateLogEntries(count int) []*logproto.Entry {
entries := make([]*logproto.Entry, count)
Expand All @@ -307,39 +287,3 @@ func generateLogEntries(count int) []*logproto.Entry {
}
return entries
}

type logGenerator struct {
f *os.File
s *bufio.Scanner
}

func (g *logGenerator) Next() (bool, []byte) {
if g.s.Scan() {
return true, g.s.Bytes()
}
g.reset()
return g.s.Scan(), g.s.Bytes()
}

func (g *logGenerator) Close() {
if g.f != nil {
g.f.Close()
}
g.f = nil
}

func (g *logGenerator) reset() {
_, _ = g.f.Seek(0, 0)
g.s = bufio.NewScanner(g.f)
}

func newLogGenerator(t testing.TB, filename string) *logGenerator {
t.Helper()
file, err := os.Open(filename)
require.NoError(t, err)

return &logGenerator{
f: file,
s: bufio.NewScanner(file),
}
}
5 changes: 5 additions & 0 deletions pkg/storage/wal/index/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ func (fw *BufferWriter) Close() error {
return nil
}

func (fw *BufferWriter) Reset() {
fw.pos = 0
fw.buf.Reset()
}

func (fw *BufferWriter) Remove() error {
return nil
}
50 changes: 31 additions & 19 deletions pkg/storage/wal/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,6 @@ type PostingsEncoder func(*encoding.Encbuf, []uint32) error
// Writer implements the IndexWriter interface for the standard
// serialization format.
type Writer struct {
ctx context.Context

// For the main index file.
f *BufferWriter

Expand Down Expand Up @@ -197,9 +195,8 @@ func NewTOCFromByteSlice(bs ByteSlice) (*TOC, error) {

// NewWriter returns a new Writer to the given filename. It serializes data in format version 2.
// It uses the given encoder to encode each postings list.
func NewWriterWithEncoder(ctx context.Context, encoder PostingsEncoder) (*Writer, error) {
func NewWriterWithEncoder(encoder PostingsEncoder) (*Writer, error) {
iw := &Writer{
ctx: ctx,
f: NewBufferWriter(),
fP: NewBufferWriter(),
fPO: NewBufferWriter(),
Expand All @@ -222,8 +219,8 @@ func NewWriterWithEncoder(ctx context.Context, encoder PostingsEncoder) (*Writer

// NewWriter creates a new index writer using the default encoder. See
// NewWriterWithEncoder.
func NewWriter(ctx context.Context) (*Writer, error) {
return NewWriterWithEncoder(ctx, EncodePostingsRaw)
func NewWriter() (*Writer, error) {
return NewWriterWithEncoder(EncodePostingsRaw)
}

func (w *Writer) write(bufs ...[]byte) error {
Expand All @@ -242,15 +239,36 @@ func (w *Writer) Buffer() ([]byte, io.Closer, error) {
return w.f.Buffer()
}

func (w *Writer) Reset() error {
w.f.Reset()
w.fP.Reset()
w.fPO.Reset()
w.buf1.Reset()
w.buf2.Reset()
w.stage = idxStageNone
w.toc = TOC{}
w.postingsStart = 0
w.numSymbols = 0
w.symbols = nil
w.symbolFile = nil
w.lastSymbol = ""
w.symbolCache = make(map[string]symbolCacheEntry, 1<<8)
w.labelIndexes = w.labelIndexes[:0]
w.labelNames = make(map[string]uint64, 1<<8)
w.lastSeries = nil
w.lastSeriesRef = 0
w.lastChunkRef = 0
w.cntPO = 0
w.crc32.Reset()
if err := w.writeMeta(); err != nil {
return err
}
return nil
}

// ensureStage handles transitions between write stages and ensures that IndexWriter
// methods are called in an order valid for the implementation.
func (w *Writer) ensureStage(s indexWriterStage) error {
select {
case <-w.ctx.Done():
return w.ctx.Err()
default:
}

if w.stage == s {
return nil
}
Expand Down Expand Up @@ -691,7 +709,6 @@ func (w *Writer) writePostingsOffsetTable() error {
if err := w.fPO.Remove(); err != nil {
return err
}
w.fPO = nil

err = w.writeLengthAndHash(startPos)
if err != nil {
Expand Down Expand Up @@ -854,11 +871,7 @@ func (w *Writer) writePostingsToTmpFiles() error {
}
}
}
select {
case <-w.ctx.Done():
return w.ctx.Err()
default:
}

}
return nil
}
Expand Down Expand Up @@ -936,7 +949,6 @@ func (w *Writer) writePostings() error {
if err := w.fP.Remove(); err != nil {
return err
}
w.fP = nil
return nil
}

Expand Down
20 changes: 10 additions & 10 deletions pkg/storage/wal/index/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func (m mockIndex) Series(ref storage.SeriesRef, builder *labels.ScratchBuilder,

func TestIndexRW_Create_Open(t *testing.T) {
// An empty index must still result in a readable file.
iw, err := NewWriter(context.Background())
iw, err := NewWriter()
require.NoError(t, err)
require.NoError(t, iw.Close())

Expand All @@ -160,7 +160,7 @@ func TestIndexRW_Postings(t *testing.T) {
labels: labels.FromStrings("a", "1", "b", strconv.Itoa(i)),
})
}
ir, buf, _ := createReader(ctx, t, input)
ir, buf, _ := createReader(t, input)

p, err := ir.Postings(ctx, "a", "1")
require.NoError(t, err)
Expand Down Expand Up @@ -271,7 +271,7 @@ func TestPostingsMany(t *testing.T) {
labels: labels.FromStrings("i", v, "foo", "bar"),
})
}
ir, _, symbols := createReader(ctx, t, input)
ir, _, symbols := createReader(t, input)

cases := []struct {
in []string
Expand Down Expand Up @@ -353,7 +353,7 @@ func TestPersistence_index_e2e(t *testing.T) {
})
}

ir, _, _ := createReader(ctx, t, input)
ir, _, _ := createReader(t, input)

// Population procedure as done by compaction.
var (
Expand Down Expand Up @@ -435,7 +435,7 @@ func TestPersistence_index_e2e(t *testing.T) {
}

func TestWriter_ShouldReturnErrorOnSeriesWithDuplicatedLabelNames(t *testing.T) {
w, err := NewWriter(context.Background())
w, err := NewWriter()
require.NoError(t, err)

require.NoError(t, w.AddSymbol("__name__"))
Expand Down Expand Up @@ -523,7 +523,7 @@ func BenchmarkReader_ShardedPostings(b *testing.B) {
labels: labels.FromStrings("const", fmt.Sprintf("%10d", 1), "unique", fmt.Sprintf("%10d", i)),
})
}
ir, _, _ := createReader(ctx, b, input)
ir, _, _ := createReader(b, input)
b.ResetTimer()

for n := 0; n < b.N; n++ {
Expand All @@ -540,7 +540,7 @@ func TestDecoder_Postings_WrongInput(t *testing.T) {
}

func TestChunksRefOrdering(t *testing.T) {
idx, err := NewWriter(context.Background())
idx, err := NewWriter()
require.NoError(t, err)

require.NoError(t, idx.AddSymbol("1"))
Expand All @@ -558,7 +558,7 @@ func TestChunksRefOrdering(t *testing.T) {
}

func TestChunksTimeOrdering(t *testing.T) {
idx, err := NewWriter(context.Background())
idx, err := NewWriter()
require.NoError(t, err)

require.NoError(t, idx.AddSymbol("1"))
Expand All @@ -585,10 +585,10 @@ func TestChunksTimeOrdering(t *testing.T) {

// createFileReader creates a temporary index file. It writes the provided input to this file.
// It returns a Reader for this file, the file's name, and the symbol map.
func createReader(ctx context.Context, tb testing.TB, input indexWriterSeriesSlice) (*Reader, []byte, map[string]struct{}) {
func createReader(tb testing.TB, input indexWriterSeriesSlice) (*Reader, []byte, map[string]struct{}) {
tb.Helper()

iw, err := NewWriter(ctx)
iw, err := NewWriter()
require.NoError(tb, err)

symbols := map[string]struct{}{}
Expand Down
Loading
Loading