From e8c13e02e0e3ae95a0b408b5446497f6f5affd1f Mon Sep 17 00:00:00 2001 From: Aleksei Semiglazov Date: Tue, 18 Sep 2018 23:35:03 +0100 Subject: [PATCH] compact: avoid memory leak while downsampling Add instant writer implementation to shrink memory consumption during the downsampling stage. Encoded chunks are written to chunks blob files right away after series was handled. Flush method closes chunk writer and sync all symbols, series, labels, posting and meta data to files. It still works in one thread, hence operates only on one core. Estimated memory consumption is unlikely more than 1Gb, but depends on data set, labels size and series' density: chunk data size (512MB) + encoded buffers + index data Fixes #297 --- .errcheck_excludes.txt | 2 +- pkg/compact/downsample/downsample.go | 142 ++------- pkg/compact/downsample/downsample_test.go | 122 +++++++- pkg/compact/downsample/writer.go | 342 ++++++++++++++++++++++ 4 files changed, 469 insertions(+), 139 deletions(-) create mode 100644 pkg/compact/downsample/writer.go diff --git a/.errcheck_excludes.txt b/.errcheck_excludes.txt index 5fad7c252ea..9e2e3a71e1c 100644 --- a/.errcheck_excludes.txt +++ b/.errcheck_excludes.txt @@ -1,3 +1,3 @@ (github.com/improbable-eng/thanos/vendor/github.com/go-kit/kit/log.Logger).Log fmt.Fprintln -fmt.Fprint \ No newline at end of file +fmt.Fprint diff --git a/pkg/compact/downsample/downsample.go b/pkg/compact/downsample/downsample.go index 305f72021c0..1628cd3999c 100644 --- a/pkg/compact/downsample/downsample.go +++ b/pkg/compact/downsample/downsample.go @@ -2,15 +2,11 @@ package downsample import ( "math" - "path/filepath" - "sort" "github.com/improbable-eng/thanos/pkg/block" "github.com/prometheus/prometheus/pkg/value" "github.com/prometheus/tsdb/chunkenc" - "os" - "github.com/go-kit/kit/log" "github.com/improbable-eng/thanos/pkg/runutil" "github.com/oklog/ulid" @@ -52,13 +48,16 @@ func Downsample( } defer runutil.CloseWithErrCapture(logger, &err, chunkr, "downsample chunk reader") - rng := origMeta.MaxTime - origMeta.MinTime - - // Write downsampled data in a custom memory block where we have fine-grained control - // over created chunks. + // NewWriter downsampled data and puts chunks immediately into files, allow save lot of memory of aggregated data. + // Flushes index and meta data afterwards aggregations. // This is necessary since we need to inject special values at the end of chunks for // some aggregations. - newb := newMemBlock() + writer, err := NewWriter(dir, logger, *origMeta, resolution) + defer runutil.CloseWithErrCapture(logger, &err, writer, "downsample instant writer") + + if err != nil { + return id, errors.Wrap(err, "get instantWriter") + } pall, err := indexr.Postings(index.AllPostingsKey()) if err != nil { @@ -85,7 +84,7 @@ func Downsample( for i, c := range chks { chk, err := chunkr.Chunk(c.Ref) if err != nil { - return id, errors.Wrapf(err, "get chunk %d", c.Ref) + return id, errors.Wrapf(err, "get chunk %d, series %d", c.Ref, pall.At()) } chks[i].Chunk = chk } @@ -94,10 +93,12 @@ func Downsample( if origMeta.Thanos.Downsample.Resolution == 0 { for _, c := range chks { if err := expandChunkIterator(c.Chunk.Iterator(), &all); err != nil { - return id, errors.Wrapf(err, "expand chunk %d", c.Ref) + return id, errors.Wrapf(err, "expand chunk %d, series %d", c.Ref, pall.At()) } } - newb.addSeries(&series{lset: lset, chunks: downsampleRaw(all, resolution)}) + if err := writer.AddSeries(&series{lset: lset, chunks: downsampleRaw(all, resolution)}); err != nil { + return id, errors.Wrapf(err, "downsample raw data, series: %d", pall.At()) + } continue } @@ -114,127 +115,24 @@ func Downsample( resolution, ) if err != nil { - return id, errors.Wrap(err, "downsample aggregate block") + return id, errors.Wrapf(err, "downsample aggregate block, series: %d", pall.At()) + } + if err := writer.AddSeries(&series{lset: lset, chunks: res}); err != nil { + return id, errors.Wrapf(err, "downsample aggregated block, series: %d", pall.At()) } - newb.addSeries(&series{lset: lset, chunks: res}) } if pall.Err() != nil { return id, errors.Wrap(pall.Err(), "iterate series set") } - comp, err := tsdb.NewLeveledCompactor(nil, log.NewNopLogger(), []int64{rng}, NewPool()) - if err != nil { - return id, errors.Wrap(err, "create compactor") - } - id, err = comp.Write(dir, newb, origMeta.MinTime, origMeta.MaxTime) - if err != nil { - return id, errors.Wrap(err, "compact head") - } - bdir := filepath.Join(dir, id.String()) - - var tmeta block.ThanosMeta - tmeta = origMeta.Thanos - tmeta.Source = block.CompactorSource - tmeta.Downsample.Resolution = resolution - _, err = block.InjectThanosMeta(logger, bdir, tmeta, &origMeta.BlockMeta) + id, err = writer.Flush() if err != nil { - return id, errors.Wrapf(err, "failed to finalize the block %s", bdir) + return id, errors.Wrap(err, "compact head") } - if err = os.Remove(filepath.Join(bdir, "tombstones")); err != nil { - return id, errors.Wrap(err, "remove tombstones") - } return id, nil } -// memBlock is an in-memory block that implements a subset of the tsdb.BlockReader interface -// to allow tsdb.LeveledCompactor to persist the data as a block. -type memBlock struct { - // Dummies to implement unused methods. - tsdb.IndexReader - - symbols map[string]struct{} - postings []uint64 - series []*series - chunks []chunkenc.Chunk -} - -func newMemBlock() *memBlock { - return &memBlock{symbols: map[string]struct{}{}} -} - -func (b *memBlock) addSeries(s *series) { - sid := uint64(len(b.series)) - b.postings = append(b.postings, sid) - b.series = append(b.series, s) - - for _, l := range s.lset { - b.symbols[l.Name] = struct{}{} - b.symbols[l.Value] = struct{}{} - } - - for i, cm := range s.chunks { - cid := uint64(len(b.chunks)) - s.chunks[i].Ref = cid - b.chunks = append(b.chunks, cm.Chunk) - } -} - -func (b *memBlock) Postings(name, val string) (index.Postings, error) { - allName, allVal := index.AllPostingsKey() - - if name != allName || val != allVal { - return nil, errors.New("unsupported call to Postings()") - } - sort.Slice(b.postings, func(i, j int) bool { - return labels.Compare(b.series[b.postings[i]].lset, b.series[b.postings[j]].lset) < 0 - }) - return index.NewListPostings(b.postings), nil -} - -func (b *memBlock) Series(id uint64, lset *labels.Labels, chks *[]chunks.Meta) error { - if id >= uint64(len(b.series)) { - return errors.Wrapf(tsdb.ErrNotFound, "series with ID %d does not exist", id) - } - s := b.series[id] - - *lset = append((*lset)[:0], s.lset...) - *chks = append((*chks)[:0], s.chunks...) - - return nil -} - -func (b *memBlock) Chunk(id uint64) (chunkenc.Chunk, error) { - if id >= uint64(len(b.chunks)) { - return nil, errors.Wrapf(tsdb.ErrNotFound, "chunk with ID %d does not exist", id) - } - return b.chunks[id], nil -} - -func (b *memBlock) Symbols() (map[string]struct{}, error) { - return b.symbols, nil -} - -func (b *memBlock) SortedPostings(p index.Postings) index.Postings { - return p -} - -func (b *memBlock) Index() (tsdb.IndexReader, error) { - return b, nil -} - -func (b *memBlock) Chunks() (tsdb.ChunkReader, error) { - return b, nil -} - -func (b *memBlock) Tombstones() (tsdb.TombstoneReader, error) { - return tsdb.EmptyTombstoneReader(), nil -} - -func (b *memBlock) Close() error { - return nil -} - // currentWindow returns the end timestamp of the window that t falls into. func currentWindow(t, r int64) int64 { // The next timestamp is the next number after s.t that's aligned with window. @@ -482,7 +380,7 @@ func downsampleAggr(chks []*AggrChunk, buf *[]sample, mint, maxt, inRes, outRes return res, nil } -// expandChunkIterator reads all samples from the iterater and appends them to buf. +// expandChunkIterator reads all samples from the iterator and appends them to buf. // Stale markers and out of order samples are skipped. func expandChunkIterator(it chunkenc.Iterator, buf *[]sample) error { // For safety reasons, we check for each sample that it does not go back in time. diff --git a/pkg/compact/downsample/downsample_test.go b/pkg/compact/downsample/downsample_test.go index d3844784162..f550d1191ed 100644 --- a/pkg/compact/downsample/downsample_test.go +++ b/pkg/compact/downsample/downsample_test.go @@ -5,19 +5,19 @@ import ( "math" "os" "path/filepath" + "sort" "testing" - - "github.com/prometheus/prometheus/pkg/value" - - "github.com/prometheus/tsdb/chunks" - "time" "github.com/fortytw2/leaktest" "github.com/go-kit/kit/log" "github.com/improbable-eng/thanos/pkg/block" "github.com/improbable-eng/thanos/pkg/testutil" + "github.com/pkg/errors" + "github.com/prometheus/prometheus/pkg/value" + "github.com/prometheus/tsdb" "github.com/prometheus/tsdb/chunkenc" + "github.com/prometheus/tsdb/chunks" "github.com/prometheus/tsdb/index" "github.com/prometheus/tsdb/labels" ) @@ -69,30 +69,30 @@ func TestDownsampleAggr(t *testing.T) { { lset: labels.FromStrings("__name__", "a"), inAggr: map[AggrType][]sample{ - AggrCount: []sample{ + AggrCount: { {199, 5}, {299, 1}, {399, 10}, {400, 3}, {499, 10}, {699, 0}, {999, 100}, }, - AggrSum: []sample{ + AggrSum: { {199, 5}, {299, 1}, {399, 10}, {400, 3}, {499, 10}, {699, 0}, {999, 100}, }, - AggrMin: []sample{ + AggrMin: { {199, 5}, {299, 1}, {399, 10}, {400, -3}, {499, 10}, {699, 0}, {999, 100}, }, - AggrMax: []sample{ + AggrMax: { {199, 5}, {299, 1}, {399, 10}, {400, -3}, {499, 10}, {699, 0}, {999, 100}, }, - AggrCounter: []sample{ + AggrCounter: { {99, 100}, {299, 150}, {499, 210}, {499, 10}, // chunk 1 {599, 20}, {799, 50}, {999, 120}, {999, 50}, // chunk 2, no reset {1099, 40}, {1199, 80}, {1299, 110}, // chunk 3, reset }, }, output: map[AggrType][]sample{ - AggrCount: []sample{{499, 29}, {999, 100}}, - AggrSum: []sample{{499, 29}, {999, 100}}, - AggrMin: []sample{{499, -3}, {999, 0}}, - AggrMax: []sample{{499, 10}, {999, 100}}, - AggrCounter: []sample{{499, 210}, {999, 320}, {1299, 430}, {1299, 110}}, + AggrCount: {{499, 29}, {999, 100}}, + AggrSum: {{499, 29}, {999, 100}}, + AggrMin: {{499, -3}, {999, 0}}, + AggrMax: {{499, 10}, {999, 100}}, + AggrCounter: {{499, 210}, {999, 320}, {1299, 430}, {1299, 110}}, }, }, } @@ -157,7 +157,6 @@ func testDownsample(t *testing.T, data []*downsampleTestSet, meta *block.Meta, r } mb.addSeries(ser) } - id, err := Downsample(log.NewNopLogger(), meta, mb, dir, resolution) testutil.Ok(t, err) @@ -375,3 +374,94 @@ func (it *sampleIterator) Seek(int64) bool { func (it *sampleIterator) At() (t int64, v float64) { return it.l[it.i].t, it.l[it.i].v } + +// memBlock is an in-memory block that implements a subset of the tsdb.BlockReader interface +// to allow tsdb.instantWriter to persist the data as a block. +type memBlock struct { + // Dummies to implement unused methods. + tsdb.IndexReader + + symbols map[string]struct{} + postings []uint64 + series []*series + chunks []chunkenc.Chunk + + numberOfChunks uint64 +} + +func newMemBlock() *memBlock { + return &memBlock{symbols: map[string]struct{}{}} +} + +func (b *memBlock) addSeries(s *series) { + sid := uint64(len(b.series)) + b.postings = append(b.postings, sid) + b.series = append(b.series, s) + + for _, l := range s.lset { + b.symbols[l.Name] = struct{}{} + b.symbols[l.Value] = struct{}{} + } + + for i, cm := range s.chunks { + s.chunks[i].Ref = b.numberOfChunks + b.chunks = append(b.chunks, cm.Chunk) + b.numberOfChunks++ + } +} + +func (b *memBlock) Postings(name, val string) (index.Postings, error) { + allName, allVal := index.AllPostingsKey() + + if name != allName || val != allVal { + return nil, errors.New("unsupported call to Postings()") + } + sort.Slice(b.postings, func(i, j int) bool { + return labels.Compare(b.series[b.postings[i]].lset, b.series[b.postings[j]].lset) < 0 + }) + return index.NewListPostings(b.postings), nil +} + +func (b *memBlock) Series(id uint64, lset *labels.Labels, chks *[]chunks.Meta) error { + if id >= uint64(len(b.series)) { + return errors.Wrapf(tsdb.ErrNotFound, "series with ID %d does not exist", id) + } + s := b.series[id] + + *lset = append((*lset)[:0], s.lset...) + *chks = append((*chks)[:0], s.chunks...) + + return nil +} + +func (b *memBlock) Chunk(id uint64) (chunkenc.Chunk, error) { + if id >= uint64(b.numberOfChunks) { + return nil, errors.Wrapf(tsdb.ErrNotFound, "chunk with ID %d does not exist", id) + } + + return b.chunks[id], nil +} + +func (b *memBlock) Symbols() (map[string]struct{}, error) { + return b.symbols, nil +} + +func (b *memBlock) SortedPostings(p index.Postings) index.Postings { + return p +} + +func (b *memBlock) Index() (tsdb.IndexReader, error) { + return b, nil +} + +func (b *memBlock) Chunks() (tsdb.ChunkReader, error) { + return b, nil +} + +func (b *memBlock) Tombstones() (tsdb.TombstoneReader, error) { + return tsdb.EmptyTombstoneReader(), nil +} + +func (b *memBlock) Close() error { + return nil +} diff --git a/pkg/compact/downsample/writer.go b/pkg/compact/downsample/writer.go new file mode 100644 index 00000000000..b7c13469f9e --- /dev/null +++ b/pkg/compact/downsample/writer.go @@ -0,0 +1,342 @@ +package downsample + +import ( + "encoding/json" + "math/rand" + "os" + "path/filepath" + "sort" + "time" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/improbable-eng/thanos/pkg/block" + "github.com/oklog/ulid" + "github.com/pkg/errors" + "github.com/prometheus/tsdb" + "github.com/prometheus/tsdb/chunks" + "github.com/prometheus/tsdb/fileutil" + "github.com/prometheus/tsdb/index" + "github.com/prometheus/tsdb/labels" +) + +type symbols map[string]struct{} +type labelValues map[string]struct{} + +func (lv labelValues) add(value string) { + lv[value] = struct{}{} +} +func (lv labelValues) get(set *[]string) { + for value := range lv { + *set = append(*set, value) + } +} + +type labelsValues map[string]labelValues + +func (lv labelsValues) add(labelSet labels.Labels) { + for _, label := range labelSet { + values, ok := lv[label.Name] + if !ok { + // Add new label. + values = labelValues{} + lv[label.Name] = values + } + values.add(label.Value) + } +} + +// InstantWriter writes downsampled block to a new data block. Chunks will be written immediately in order to avoid +// memory consumption. +type instantWriter struct { + dir string + tmpDir string + logger log.Logger + uid ulid.ULID + resolution int64 + + symbols symbols + postings []uint64 + series []*series + + chunkWriter tsdb.ChunkWriter + meta block.Meta + totalChunks uint64 + totalSamples uint64 +} + +func NewWriter(dir string, l log.Logger, originMeta block.Meta, resolution int64) (*instantWriter, error) { + var err error + var chunkWriter tsdb.ChunkWriter + + // Generate new block id + entropy := rand.New(rand.NewSource(time.Now().UnixNano())) + uid := ulid.MustNew(ulid.Now(), entropy) + + // Populate chunk, meta and index files into temporary directory with + // data of all blocks. + dir = filepath.Join(dir, uid.String()) + tmpDir, err := createTmpDir(dir) + if err != nil { + return nil, err + } + + chunkDir := func(dir string) string { + return filepath.Join(dir, block.ChunksDirname) + } + + chunkWriter, err = chunks.NewWriter(chunkDir(tmpDir)) + if err != nil { + return nil, errors.Wrap(err, "create tmp chunk instantWriter") + } + + return &instantWriter{ + logger: l, + dir: dir, + tmpDir: tmpDir, + symbols: symbols{}, + chunkWriter: chunkWriter, + uid: uid, + meta: originMeta, + resolution: resolution, + }, nil +} + +func (w *instantWriter) AddSeries(s *series) error { + if len(s.chunks) == 0 { + level.Info(w.logger).Log("empty chunks happened", s.lset) + } + + if err := w.chunkWriter.WriteChunks(s.chunks...); err != nil { + return errors.Wrap(err, "add series") + } + + w.postings = append(w.postings, uint64(len(w.series))) + w.series = append(w.series, s) + + for _, l := range s.lset { + w.symbols[l.Name] = struct{}{} + w.symbols[l.Value] = struct{}{} + } + + w.totalChunks += uint64(len(s.chunks)) + for i := range s.chunks { + chk := &s.chunks[i] + w.totalSamples += uint64(chk.Chunk.NumSamples()) + chk.Chunk = nil + } + + return nil +} + +func (w *instantWriter) Flush() (ulid.ULID, error) { + var err error + + // All the chunks have been written by this moment, can close writer. + if err := w.chunkWriter.Close(); err != nil { + return w.uid, errors.Wrap(err, "close chunk writer") + } + w.chunkWriter = nil + + indexw, err := index.NewWriter(filepath.Join(w.tmpDir, block.IndexFilename)) + if err != nil { + return w.uid, errors.Wrap(err, "open index instantWriter") + } + + if err := w.populateBlock(indexw); err != nil { + return w.uid, errors.Wrap(err, "write compaction") + } + + if err = w.writeMetaFile(w.tmpDir); err != nil { + return w.uid, errors.Wrap(err, "write merged meta") + } + + if err = indexw.Close(); err != nil { + return w.uid, errors.Wrap(err, "close index instantWriter") + } + + df, err := fileutil.OpenDir(w.tmpDir) + if err != nil { + return w.uid, errors.Wrap(err, "open temporary block dir") + } + defer func() { + if df != nil { + if err := df.Close(); err != nil { + log.Logger(w.logger).Log(err, "close temporary block dir") + } + } + }() + + if err := fileutil.Fsync(df); err != nil { + return w.uid, errors.Wrap(err, "sync temporary dir") + } + + // Close temp dir before rename block dir (for windows platform). + if err = df.Close(); err != nil { + return w.uid, errors.Wrap(err, "close temporary dir") + } + df = nil + + // Block successfully written, make visible and remove old ones. + err = renameFile(w.tmpDir, w.dir) + // Assume we cleaned tmp dir up + w.tmpDir = "" + if err != nil { + return w.uid, errors.Wrap(err, "rename block dir") + } + + level.Info(w.logger).Log( + "msg", "write downsampled block", + "mint", w.meta.MinTime, + "maxt", w.meta.MaxTime, + "ulid", w.meta.ULID, + "resolution", w.meta.Thanos.Downsample.Resolution, + ) + return w.uid, nil +} + +// populateBlock fills the index and chunk writers with new data gathered as the union +// of the provided blocks. It returns meta information for the new block. +func (w *instantWriter) populateBlock(indexWriter tsdb.IndexWriter) error { + var ( + i = uint64(0) + labelsValues = labelsValues{} + memPostings = index.NewUnorderedMemPostings() + ) + + if err := indexWriter.AddSymbols(w.symbols); err != nil { + return errors.Wrap(err, "add symbols") + } + + sort.Slice(w.postings, func(i, j int) bool { + return labels.Compare(w.series[w.postings[i]].lset, w.series[w.postings[j]].lset) < 0 + }) + + all := index.NewListPostings(w.postings) + // all := w.postings.All() + for all.Next() { + // i := all.At() + s := w.series[i] + // Skip the series with all deleted chunks. + if len(s.chunks) == 0 { + level.Info(w.logger).Log("empty chunks", i, s.lset) + continue + } + + if err := indexWriter.AddSeries(uint64(i), s.lset, s.chunks...); err != nil { + return errors.Wrap(err, "add series") + } + + labelsValues.add(s.lset) + memPostings.Add(i, s.lset) + i++ + } + + s := make([]string, 0, 256) + for n, v := range labelsValues { + s = s[:0] + v.get(&s) + if err := indexWriter.WriteLabelIndex([]string{n}, s); err != nil { + return errors.Wrap(err, "write label index") + } + } + + memPostings.EnsureOrder() + + for _, l := range memPostings.SortedKeys() { + if err := indexWriter.WritePostings(l.Name, l.Value, memPostings.Get(l.Name, l.Value)); err != nil { + return errors.Wrap(err, "write postings") + } + } + return nil +} + +// TODO probably tsdb.BlockMeta should expose method writeToFile /w encode. +func (w *instantWriter) writeMetaFile(dest string) error { + w.meta.ULID = w.uid + w.meta.Version = 1 + w.meta.Thanos.Source = block.CompactorSource + w.meta.Thanos.Downsample.Resolution = w.resolution + w.meta.Stats.NumChunks = w.totalChunks + w.meta.Stats.NumSamples = w.totalSamples + w.meta.Stats.NumSeries = uint64(len(w.series)) + + // Make any changes to the file appear atomic. + path := filepath.Join(dest, block.MetaFilename) + tmp := path + ".tmp" + + f, err := os.Create(tmp) + if err != nil { + return errors.Wrapf(err, "create tmp meta file %s", tmp) + } + + enc := json.NewEncoder(f) + enc.SetIndent("", "\t") + + var merr tsdb.MultiError + + if merr.Add(enc.Encode(w.meta)); merr.Err() != nil { + merr.Add(f.Close()) + return errors.Wrapf(merr.Err(), "encoding meta file to json %s", tmp) + } + if err := f.Close(); err != nil { + return errors.Wrapf(err, "close tmp meta file %s", tmp) + } + + if err := renameFile(tmp, path); err != nil { + return errors.Wrapf(err, "rename tmp meta file %s", tmp) + } + + return nil +} + +func (w *instantWriter) Close() error { + var merr tsdb.MultiError + + if w.tmpDir != "" { + merr.Add(os.RemoveAll(w.tmpDir)) + } + + if w.chunkWriter != nil { + merr.Add(w.chunkWriter.Close()) + } + + if merr.Err() != nil { + return errors.Wrap(merr.Err(), "close chunk writer") + } + return nil +} + +func renameFile(from, to string) error { + if err := os.RemoveAll(to); err != nil { + return err + } + if err := os.Rename(from, to); err != nil { + return err + } + + // Directory was renamed; sync parent dir to persist rename. + pdir, err := fileutil.OpenDir(filepath.Dir(to)) + if err != nil { + return err + } + + var merr tsdb.MultiError + merr.Add(fileutil.Fsync(pdir)) + merr.Add(pdir.Close()) + return merr.Err() +} + +func createTmpDir(parent string) (string, error) { + tmp := parent + ".tmp" + + if err := os.RemoveAll(tmp); err != nil { + return "", errors.Wrap(err, "removing tmp dir") + } + + if err := os.MkdirAll(tmp, 0777); err != nil { + return "", errors.Wrap(err, "mkdir tmp dir") + } + + return tmp, nil +}