From fdba0b64a057adec803a84a5ef0679b2f5d1b427 Mon Sep 17 00:00:00 2001 From: Anton Kolesnikov Date: Sun, 21 Jul 2024 09:36:15 +0800 Subject: [PATCH 1/3] experiment: stream compaction --- api/metastore/v1/metastore.proto | 7 +- pkg/compactionworker/compaction_worker.go | 609 +----------------- .../compaction_worker_test.go | 24 - pkg/compactionworker/storage_object_reader.go | 117 ---- pkg/compactionworker/symbol_compactor.go | 136 ---- .../metastore_compaction_queue_test.go | 8 +- pkg/metastore/metastore_state_add_block.go | 3 +- pkg/objstore/reader.go | 7 +- pkg/phlare/phlare.go | 2 +- pkg/phlaredb/symdb/block_reader.go | 41 +- .../block/{merge.go => compaction.go} | 364 ++++++++--- pkg/querybackend/block/compaction_test.go | 30 + pkg/querybackend/block/constants.go | 78 +++ pkg/querybackend/block/object.go | 139 ++-- pkg/querybackend/block/section_profiles.go | 108 +--- pkg/querybackend/block/section_symbols.go | 9 +- pkg/querybackend/block/section_tsdb.go | 34 +- pkg/querybackend/block/tenant_service.go | 90 ++- pkg/querybackend/block/testdata/.gitignore | 1 + .../block}/testdata/block-metas.json | 0 .../anon/01J2VJQPYDC160REPAD2VN88XN/block.bin | Bin .../anon/01J2VJQRGBK8YFWVV8K1MPRRWM/block.bin | Bin .../anon/01J2VJQRTMSCY4VDYBP5N4N5JK/block.bin | Bin .../anon/01J2VJQTJ3PGF7KB39ARR1BX3Y/block.bin | Bin .../anon/01J2VJQV544TF571FDSK2H692P/block.bin | Bin .../anon/01J2VJQX8DYHSEBK7BAQSCJBMG/block.bin | Bin .../anon/01J2VJQYQVZTPZMMJKE7F2XC47/block.bin | Bin .../anon/01J2VJQZPARDJQ779S1JMV0XQA/block.bin | Bin .../anon/01J2VJR0R3NQS23SDADNA6XHCM/block.bin | Bin .../anon/01J2VJR31PT3X4NDJC4Q2BHWQ1/block.bin | Bin pkg/querybackend/block/writer.go | 94 +++ pkg/querybackend/query.go | 13 +- pkg/util/bufferpool/pool.go | 98 +++ pkg/util/bufferpool/pool_test.go | 22 + pkg/util/refctr/refctr.go | 34 +- 35 files changed, 835 insertions(+), 1233 deletions(-) delete mode 100644 pkg/compactionworker/compaction_worker_test.go delete mode 100644 pkg/compactionworker/storage_object_reader.go delete mode 100644 pkg/compactionworker/symbol_compactor.go rename pkg/querybackend/block/{merge.go => compaction.go} (53%) create mode 100644 pkg/querybackend/block/compaction_test.go create mode 100644 pkg/querybackend/block/constants.go create mode 100644 pkg/querybackend/block/testdata/.gitignore rename pkg/{compactionworker => querybackend/block}/testdata/block-metas.json (100%) rename pkg/{compactionworker => querybackend/block}/testdata/segments/1/anon/01J2VJQPYDC160REPAD2VN88XN/block.bin (100%) rename pkg/{compactionworker => querybackend/block}/testdata/segments/1/anon/01J2VJQRGBK8YFWVV8K1MPRRWM/block.bin (100%) rename pkg/{compactionworker => querybackend/block}/testdata/segments/1/anon/01J2VJQRTMSCY4VDYBP5N4N5JK/block.bin (100%) rename pkg/{compactionworker => querybackend/block}/testdata/segments/1/anon/01J2VJQTJ3PGF7KB39ARR1BX3Y/block.bin (100%) rename pkg/{compactionworker => querybackend/block}/testdata/segments/1/anon/01J2VJQV544TF571FDSK2H692P/block.bin (100%) rename pkg/{compactionworker => querybackend/block}/testdata/segments/1/anon/01J2VJQX8DYHSEBK7BAQSCJBMG/block.bin (100%) rename pkg/{compactionworker => querybackend/block}/testdata/segments/1/anon/01J2VJQYQVZTPZMMJKE7F2XC47/block.bin (100%) rename pkg/{compactionworker => querybackend/block}/testdata/segments/1/anon/01J2VJQZPARDJQ779S1JMV0XQA/block.bin (100%) rename pkg/{compactionworker => querybackend/block}/testdata/segments/1/anon/01J2VJR0R3NQS23SDADNA6XHCM/block.bin (100%) rename pkg/{compactionworker => querybackend/block}/testdata/segments/1/anon/01J2VJR31PT3X4NDJC4Q2BHWQ1/block.bin (100%) create mode 100644 pkg/querybackend/block/writer.go create mode 100644 pkg/util/bufferpool/pool.go create mode 100644 pkg/util/bufferpool/pool_test.go diff --git a/api/metastore/v1/metastore.proto b/api/metastore/v1/metastore.proto index fc68fa5472..7d398da60f 100644 --- a/api/metastore/v1/metastore.proto +++ b/api/metastore/v1/metastore.proto @@ -37,9 +37,10 @@ message TenantService { int64 max_time = 4; // Table of contents lists data sections within the tenant - // service region. The interpretation of the table of contents - // is specific to the metadata format version. By default, the - // sections are: + // service region. The offsets are absolute. + // + // The interpretation of the table of contents is specific + // to the metadata format version. By default, the sections are: // - 0: profiles.parquet // - 1: index.tsdb // - 2: symbols.symdb diff --git a/pkg/compactionworker/compaction_worker.go b/pkg/compactionworker/compaction_worker.go index ac6718b931..894dcf95be 100644 --- a/pkg/compactionworker/compaction_worker.go +++ b/pkg/compactionworker/compaction_worker.go @@ -2,39 +2,18 @@ package compactionworker import ( "context" - "crypto/rand" "flag" - "io" - "math" - "os" - "path/filepath" - "sort" - "strconv" "sync" "time" "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/grafana/dskit/services" - "github.com/oklog/ulid" - "github.com/parquet-go/parquet-go" - "github.com/pkg/errors" - "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/storage" compactorv1 "github.com/grafana/pyroscope/api/gen/proto/go/compactor/v1" - metastorev1 "github.com/grafana/pyroscope/api/gen/proto/go/metastore/v1" - "github.com/grafana/pyroscope/pkg/iter" metastoreclient "github.com/grafana/pyroscope/pkg/metastore/client" - phlaremodel "github.com/grafana/pyroscope/pkg/model" "github.com/grafana/pyroscope/pkg/objstore" - phlareparquet "github.com/grafana/pyroscope/pkg/parquet" - "github.com/grafana/pyroscope/pkg/phlaredb" - "github.com/grafana/pyroscope/pkg/phlaredb/block" - schemav1 "github.com/grafana/pyroscope/pkg/phlaredb/schemas/v1" - "github.com/grafana/pyroscope/pkg/phlaredb/symdb" - "github.com/grafana/pyroscope/pkg/phlaredb/tsdb/index" - "github.com/grafana/pyroscope/pkg/util/build" + "github.com/grafana/pyroscope/pkg/querybackend/block" ) type Worker struct { @@ -192,7 +171,7 @@ func (w *Worker) startJob(ctx context.Context, job *compactorv1.CompactionJob) * "job", job.Name, "blocks", len(job.Blocks)) - compactedBlockMetas, err := w.compactBlocks(ctx, job.Name, job.Blocks) + compactedBlockMetas, err := block.Compact(ctx, job.Blocks, w.storage) if err != nil { level.Error(w.logger).Log("msg", "failed to run block compaction", "err", err, "job", job.Name) jobStatus.Status = compactorv1.CompactionStatus_COMPACTION_STATUS_FAILURE @@ -210,587 +189,3 @@ func (w *Worker) startJob(ctx context.Context, job *compactorv1.CompactionJob) * return jobStatus } - -type serviceReader struct { - meta *metastorev1.BlockMeta - svcMeta *metastorev1.TenantService - indexReader *index.Reader - symbolsReader *symdb.Reader - profilesReader *parquet.File -} - -func newServiceReader(meta *metastorev1.BlockMeta, svcMeta *metastorev1.TenantService, indexReader *index.Reader, symbolsReader *symdb.Reader, profilesReader *parquet.File) *serviceReader { - return &serviceReader{ - meta: meta, - svcMeta: svcMeta, - indexReader: indexReader, - symbolsReader: symbolsReader, - profilesReader: profilesReader, - } -} - -type ProfileReader interface { - io.ReaderAt - Schema() *parquet.Schema - Root() *parquet.Column - RowGroups() []parquet.RowGroup -} - -func (w *Worker) compactBlocks(ctx context.Context, jobName string, blocks []*metastorev1.BlockMeta) ([]*metastorev1.BlockMeta, error) { - // download blocks - shard := blocks[0].Shard - compactionLevel := blocks[0].CompactionLevel + 1 - - // create queriers (block readers) from blocks - svcReaders := make(map[string][]*serviceReader) - for _, b := range blocks { - o := newObject(w.storage, b) - for _, svc := range b.TenantServices { - i, err := o.openTsdb(ctx, svc) - if err != nil { - return nil, errors.Wrap(err, "failed to open tsdb index") - } - sdb, err := o.openSymdb(ctx, svc) - if err != nil { - return nil, errors.Wrap(err, "failed to open symbol db") - } - profiles, err := o.openProfileTable(ctx, svc) - if err != nil { - return nil, errors.Wrap(err, "failed to open profile table") - } - svcReaders[svc.TenantId] = append(svcReaders[svc.TenantId], newServiceReader(b, svc, i, sdb, profiles)) - } - } - - writers := make(map[string]*blockWriter) - for tenant, _ := range svcReaders { - dest := filepath.Join("data-compactor", tenant, jobName) - writer, err := w.createBlockWriter(dest, tenant, shard, compactionLevel) - if err != nil { - return nil, errors.Wrap(err, "failed to create block writer") - } - writers[tenant] = writer - } - - // compact - metas := make([]*metastorev1.BlockMeta, 0, len(svcReaders)) - for tenant, readers := range svcReaders { - meta, err := writers[tenant].compact(ctx, readers) - if err != nil { - return nil, err - } - metas = append(metas, meta) - } - - // upload blocks - for _, meta := range metas { - if err := w.uploadBlock(ctx, jobName, meta); err != nil { - return nil, err - } - } - - return metas, nil -} - -func (w *Worker) uploadBlock(ctx context.Context, jobName string, meta *metastorev1.BlockMeta) error { - blockPath := filepath.Join("data-compactor", meta.TenantId, jobName, "block.bin") - file, err := os.Open(blockPath) - if err != nil { - return errors.Wrap(err, "failed to open compacted block") - } - defer file.Close() - - o := newObject(w.storage, meta) - return w.storage.Upload(ctx, o.path, file) -} - -type serviceReaderGroup struct { - svc string - readers []*serviceReader -} - -func (bw *blockWriter) compact(ctx context.Context, readers []*serviceReader) (*metastorev1.BlockMeta, error) { - // group by tenant service - readersByService := make(map[string][]*serviceReader) - for _, reader := range readers { - readersByService[reader.svcMeta.Name] = append(readersByService[reader.svcMeta.Name], reader) - } - - // sort by tenant service - serviceGroups := make([]*serviceReaderGroup, 0, len(readersByService)) - for svc, r := range readersByService { - serviceGroups = append(serviceGroups, &serviceReaderGroup{ - svc: svc, - readers: r, - }) - } - - // prepare output file - blockFile, err := os.Create(filepath.Join(bw.path, "block.bin")) - if err != nil { - return nil, errors.Wrap(err, "failed to open block file") - } - defer blockFile.Close() - offset := uint64(0) - - minTime := int64(math.MaxInt64) - maxTime := int64(math.MinInt64) - for _, group := range serviceGroups { - minTimeSvc := int64(math.MaxInt64) - maxTimeSvc := int64(math.MinInt64) - for _, reader := range group.readers { - it, err := newProfileRowIterator(reader) // TODO aleks: we probably want to sort profiles here - if err != nil { - return nil, err - } - for it.Next() { - p := it.At() - - err := bw.WriteRow(group.svc, p) - if err != nil { - return nil, err - } - } - - if reader.svcMeta.MinTime < minTimeSvc { - minTimeSvc = reader.svcMeta.MinTime - } - if reader.svcMeta.MaxTime > maxTimeSvc { - maxTimeSvc = reader.svcMeta.MaxTime - } - } - if minTimeSvc < minTime { - minTime = minTimeSvc - } - if maxTimeSvc > maxTime { - maxTime = maxTimeSvc - } - tenantServiceMeta, err := bw.Flush(ctx, group.svc) - if err != nil { - return nil, err - } - tenantServiceMeta.Name = group.svc - tenantServiceMeta.MinTime = minTimeSvc - tenantServiceMeta.MaxTime = maxTimeSvc - bw.meta.TenantServices = append(bw.meta.TenantServices, tenantServiceMeta) - - sWriter, _ := bw.getOrCreateService(group.svc) - err = createBlockFile(sWriter.path) - if err != nil { - return nil, err - } - - err = appendFileContent(blockFile, filepath.Join(sWriter.path, "block.bin")) - if err != nil { - return nil, err - } - for i := range tenantServiceMeta.TableOfContents { - tenantServiceMeta.TableOfContents[i] += offset - } - offset += tenantServiceMeta.Size - } - - meta := bw.meta - meta.MinTime = minTime - meta.MaxTime = maxTime - meta.Size = offset // it already holds the sum of sizes for all tenant services - - return meta, nil -} - -func createBlockFile(path string) error { - file, err := os.Create(filepath.Join(path, "block.bin")) - if err != nil { - return err - } - defer file.Close() - - for _, sourceFile := range []string{"profiles.parquet", "index.tsdb", "symbols.symdb"} { - err := appendFileContent(file, filepath.Join(path, sourceFile)) - if err != nil { - return err - } - } - - return nil -} - -func appendFileContent(dst *os.File, srcPath string) error { - src, err := os.Open(srcPath) - if err != nil { - return err - } - defer src.Close() - - _, err = io.Copy(dst, src) - return err -} - -type profileRowIterator struct { - profiles iter.Iterator[parquet.Row] - blockReader *serviceReader - closer io.Closer - index phlaredb.IndexReader - allPostings index.Postings - err error - - currentRow profileRow - currentSeriesIdx uint32 - chunks []index.ChunkMeta -} - -func newProfileRowIterator(s *serviceReader) (*profileRowIterator, error) { - k, v := index.AllPostingsKey() - allPostings, err := s.indexReader.Postings(k, nil, v) - if err != nil { - return nil, err - } - // todo close once https://github.com/grafana/pyroscope/issues/2172 is done. - reader := parquet.NewReader(s.profilesReader, schemav1.ProfilesSchema) - return &profileRowIterator{ - profiles: phlareparquet.NewBufferedRowReaderIterator(reader, 32), - blockReader: s, - closer: reader, - index: s.indexReader, - allPostings: allPostings, - currentSeriesIdx: math.MaxUint32, - chunks: make([]index.ChunkMeta, 1), - }, nil -} - -func (p *profileRowIterator) At() profileRow { - return p.currentRow -} - -func (p *profileRowIterator) Next() bool { - if !p.profiles.Next() { - return false - } - p.currentRow.serviceReader = p.blockReader - p.currentRow.row = schemav1.ProfileRow(p.profiles.At()) - seriesIndex := p.currentRow.row.SeriesIndex() - p.currentRow.timeNanos = p.currentRow.row.TimeNanos() - // do we have a new series? - if seriesIndex == p.currentSeriesIdx { - return true - } - p.currentSeriesIdx = seriesIndex - if !p.allPostings.Next() { - if err := p.allPostings.Err(); err != nil { - p.err = err - return false - } - p.err = errors.New("unexpected end of postings") - return false - } - - fp, err := p.index.Series(p.allPostings.At(), &p.currentRow.labels, &p.chunks) - if err != nil { - p.err = err - return false - } - p.currentRow.fp = model.Fingerprint(fp) - return true -} - -func (p *profileRowIterator) Err() error { - if p.err != nil { - return p.err - } - return p.profiles.Err() -} - -func (p *profileRowIterator) Close() error { - err := p.profiles.Close() - if p.closer != nil { - if err := p.closer.Close(); err != nil { - return err - } - } - return err -} - -type dedupeProfileRowIterator struct { - iter.Iterator[profileRow] - - prevFP model.Fingerprint - prevTimeNanos int64 -} - -func (it *dedupeProfileRowIterator) Next() bool { - for { - if !it.Iterator.Next() { - return false - } - currentProfile := it.Iterator.At() - if it.prevFP == currentProfile.fp && it.prevTimeNanos == currentProfile.timeNanos { - // skip duplicate profile - continue - } - it.prevFP = currentProfile.fp - it.prevTimeNanos = currentProfile.timeNanos - return true - } -} - -func (w *Worker) createBlockWriter(dest, tenant string, shard, compactionLevel uint32) (*blockWriter, error) { - meta := &metastorev1.BlockMeta{} - meta.Id = ulid.MustNew(ulid.Timestamp(time.Now()), rand.Reader).String() - meta.TenantId = tenant - meta.Shard = shard - meta.CompactionLevel = compactionLevel - return newBlockWriter(meta, dest) -} - -func newBlockWriter(meta *metastorev1.BlockMeta, dest string) (*blockWriter, error) { - blockPath := filepath.Join(dest) - - err := os.RemoveAll(blockPath) - if err != nil { - return nil, err - } - - if err := os.MkdirAll(blockPath, 0o777); err != nil { - return nil, err - } - - return &blockWriter{ - serviceWriters: make(map[string]*serviceWriter), - path: blockPath, - meta: meta, - totalProfiles: 0, - }, nil -} - -type serviceWriter struct { - indexRewriter *indexRewriter - symbolsRewriter SymbolsRewriter - profilesWriter *profilesWriter - path string -} - -type blockWriter struct { - serviceWriters map[string]*serviceWriter - - path string - meta *metastorev1.BlockMeta - totalProfiles uint64 -} - -func (bw *blockWriter) getOrCreateService(svc string) (*serviceWriter, error) { - sw, ok := bw.serviceWriters[svc] - if !ok { - path := filepath.Join(bw.path, strconv.Itoa(len(bw.serviceWriters))) - if err := os.MkdirAll(path, 0o777); err != nil { - return nil, err - } - profileWriter, err := newProfileWriter(path) - if err != nil { - return nil, err - } - symbolsCompactor := newSymbolsCompactor(path, symdb.FormatV3) - sw = &serviceWriter{ - indexRewriter: newIndexRewriter(path), - symbolsRewriter: symbolsCompactor.Rewriter(path), - profilesWriter: profileWriter, - path: path, - } - bw.serviceWriters[svc] = sw - } - return sw, nil -} - -func (bw *blockWriter) WriteRow(svc string, r profileRow) error { - sw, err := bw.getOrCreateService(svc) - if err != nil { - return err - } - err = sw.indexRewriter.ReWriteRow(r) - if err != nil { - return err - } - err = sw.symbolsRewriter.ReWriteRow(r) - if err != nil { - return err - } - - if err := sw.profilesWriter.WriteRow(r); err != nil { - return err - } - bw.totalProfiles++ - return nil -} - -func (bw *blockWriter) Flush(ctx context.Context, service string) (*metastorev1.TenantService, error) { - sw, err := bw.getOrCreateService(service) - if err != nil { - return nil, err - } - offsets, totalSize, err := sw.Close(ctx) - if err != nil { - return nil, err - } - tenantService := &metastorev1.TenantService{ - TenantId: bw.meta.TenantId, - Name: service, - TableOfContents: offsets, - Size: totalSize, - ProfileTypes: nil, // TODO - } - return tenantService, nil -} - -func (sw *serviceWriter) Close(ctx context.Context) (offsets []uint64, totalSize uint64, err error) { - if err := sw.profilesWriter.Close(); err != nil { - return nil, 0, err - } - profilesInfo, err := os.Stat(filepath.Join(sw.path, "profiles.parquet")) - if err != nil { - return nil, 0, err - } - if err := sw.indexRewriter.Close(ctx); err != nil { - return nil, 0, err - } - indexInfo, err := os.Stat(filepath.Join(sw.path, "index.tsdb")) - if err != nil { - return nil, 0, err - } - if err := sw.symbolsRewriter.Close(); err != nil { - return nil, 0, err - } - symbolsInfo, err := os.Stat(filepath.Join(sw.path, "symbols.symdb")) - if err != nil { - return nil, 0, err - } - offsets = []uint64{0, uint64(profilesInfo.Size()), uint64(profilesInfo.Size() + indexInfo.Size())} - totalSize = uint64(indexInfo.Size()) + uint64(symbolsInfo.Size()) + uint64(profilesInfo.Size()) - return offsets, totalSize, nil -} - -type profilesWriter struct { - *parquet.GenericWriter[*schemav1.Profile] - file *os.File - - buf []parquet.Row -} - -func newProfileWriter(path string) (*profilesWriter, error) { - profilePath := filepath.Join(path, (&schemav1.ProfilePersister{}).Name()+block.ParquetSuffix) - profileFile, err := os.OpenFile(profilePath, os.O_RDWR|os.O_CREATE|os.O_EXCL, 0o644) - if err != nil { - return nil, err - } - return &profilesWriter{ - GenericWriter: newParquetProfileWriter(profileFile, parquet.MaxRowsPerRowGroup(100_000)), - file: profileFile, - buf: make([]parquet.Row, 1), - }, nil -} - -func newParquetProfileWriter(writer io.Writer, options ...parquet.WriterOption) *parquet.GenericWriter[*schemav1.Profile] { - options = append(options, parquet.PageBufferSize(32*1024)) - options = append(options, parquet.CreatedBy("github.com/grafana/pyroscope/", build.Version, build.Revision)) - options = append(options, schemav1.ProfilesSchema) - return parquet.NewGenericWriter[*schemav1.Profile]( - writer, options..., - ) -} - -func (p *profilesWriter) WriteRow(r profileRow) error { - p.buf[0] = parquet.Row(r.row) - _, err := p.GenericWriter.WriteRows(p.buf) - if err != nil { - return err - } - - return nil -} - -func (p *profilesWriter) Close() error { - err := p.GenericWriter.Close() - if err != nil { - return err - } - return p.file.Close() -} - -func newIndexRewriter(path string) *indexRewriter { - return &indexRewriter{ - symbols: make(map[string]struct{}), - path: path, - } -} - -type indexRewriter struct { - series []struct { - labels phlaremodel.Labels - fp model.Fingerprint - } - symbols map[string]struct{} - chunks []index.ChunkMeta // one chunk per series - - previousFp model.Fingerprint - - path string -} - -func (idxRw *indexRewriter) ReWriteRow(r profileRow) error { - if idxRw.previousFp != r.fp || len(idxRw.series) == 0 { - series := r.labels.Clone() - for _, l := range series { - idxRw.symbols[l.Name] = struct{}{} - idxRw.symbols[l.Value] = struct{}{} - } - idxRw.series = append(idxRw.series, struct { - labels phlaremodel.Labels - fp model.Fingerprint - }{ - labels: series, - fp: r.fp, - }) - idxRw.chunks = append(idxRw.chunks, index.ChunkMeta{ - MinTime: r.timeNanos, - MaxTime: r.timeNanos, - SeriesIndex: uint32(len(idxRw.series) - 1), - }) - idxRw.previousFp = r.fp - } - idxRw.chunks[len(idxRw.chunks)-1].MaxTime = r.timeNanos - r.row.SetSeriesIndex(idxRw.chunks[len(idxRw.chunks)-1].SeriesIndex) - return nil -} - -func (idxRw *indexRewriter) NumSeries() uint64 { - return uint64(len(idxRw.series)) -} - -// Close writes the index to given folder. -func (idxRw *indexRewriter) Close(ctx context.Context) error { - indexw, err := index.NewWriter(ctx, filepath.Join(idxRw.path, block.IndexFilename), 1<<18) - if err != nil { - return err - } - - // Sort symbols - symbols := make([]string, 0, len(idxRw.symbols)) - for s := range idxRw.symbols { - symbols = append(symbols, s) - } - sort.Strings(symbols) - - // Add symbols - for _, symbol := range symbols { - if err := indexw.AddSymbol(symbol); err != nil { - return err - } - } - - // Add Series - for i, series := range idxRw.series { - if err := indexw.AddSeries(storage.SeriesRef(i), series.labels, series.fp, idxRw.chunks[i]); err != nil { - return err - } - } - - return indexw.Close() -} diff --git a/pkg/compactionworker/compaction_worker_test.go b/pkg/compactionworker/compaction_worker_test.go deleted file mode 100644 index cfdf485ecf..0000000000 --- a/pkg/compactionworker/compaction_worker_test.go +++ /dev/null @@ -1,24 +0,0 @@ -package compactionworker - -/* -func TestCompactBlocks(t *testing.T) { - worker, err := New(util.TestLogger(t), nil, nil) - require.NoError(t, err) - - ctx := context.Background() - worker.storage, _ = testutil.NewFilesystemBucket(t, ctx, "testdata") - - var blockMetas compactorv1.CompletedJob // same contract, can break in the future - blockMetasData, err := os.ReadFile("testdata/block-metas.json") - require.NoError(t, err) - err = protojson.Unmarshal(blockMetasData, &blockMetas) - require.NoError(t, err) - - compactedBlocks, err := worker.compactBlocks(ctx, "job-123", blockMetas.Blocks) - require.NoError(t, err) - - require.Len(t, compactedBlocks, 1) - - _ = worker.storage.Delete(ctx, "blocks") -} -*/ diff --git a/pkg/compactionworker/storage_object_reader.go b/pkg/compactionworker/storage_object_reader.go deleted file mode 100644 index c5c129f684..0000000000 --- a/pkg/compactionworker/storage_object_reader.go +++ /dev/null @@ -1,117 +0,0 @@ -package compactionworker - -import ( - "context" - "io" - "strconv" - - "github.com/parquet-go/parquet-go" - - metastorev1 "github.com/grafana/pyroscope/api/gen/proto/go/metastore/v1" - "github.com/grafana/pyroscope/pkg/objstore" - "github.com/grafana/pyroscope/pkg/phlaredb/symdb" - "github.com/grafana/pyroscope/pkg/phlaredb/tsdb/index" -) - -const ( - segmentDirPath = "segments/" - blockDirPath = "blocks/" - anonTenantDirName = "anon" -) - -const ( - // Table of contents sections. - // Version-specific. - - profileTableSectionIdx = iota - tsdbSectionIdx - symdbSectionIdx -) - -// Object represents a block or a segment in the object storage. -// TODO: Better naming? -// TODO: Prefetch small objects into memory. -type storageObject struct { - storage objstore.Bucket - path string - meta *metastorev1.BlockMeta -} - -func newObject(storage objstore.Bucket, meta *metastorev1.BlockMeta) storageObject { - return storageObject{ - storage: storage, - path: objectPath(meta), - meta: meta, - } -} - -func objectPath(md *metastorev1.BlockMeta) string { - topLevel := blockDirPath - tenantDirName := md.TenantId - if md.CompactionLevel == 0 { - topLevel = segmentDirPath - tenantDirName = anonTenantDirName - } - return topLevel + strconv.Itoa(int(md.Shard)) + "/" + tenantDirName + "/" + md.Id + "/block.bin" -} - -func (storageObject) sectionOffsetByIndex(svc *metastorev1.TenantService, idx int) int64 { - return int64(svc.TableOfContents[idx]) -} - -func (b storageObject) sectionSizeByIndex(svc *metastorev1.TenantService, idx int) int64 { - off := b.sectionOffsetByIndex(svc, idx) - var next uint64 - if idx == len(svc.TableOfContents)-1 { - next = svc.Size + svc.TableOfContents[0] - } else { - next = svc.TableOfContents[idx+1] - } - return int64(next) - off -} - -func (b storageObject) sectionReaderByIndex( - ctx context.Context, - svc *metastorev1.TenantService, - idx int, -) (io.ReadCloser, error) { - return b.storage.GetRange(ctx, b.path, - b.sectionOffsetByIndex(svc, idx), - b.sectionSizeByIndex(svc, idx)) -} - -func (b storageObject) openTsdb(ctx context.Context, svc *metastorev1.TenantService) (*index.Reader, error) { - r, err := b.sectionReaderByIndex(ctx, svc, tsdbSectionIdx) - if err != nil { - return nil, err - } - buf := make([]byte, b.sectionSizeByIndex(svc, tsdbSectionIdx)) - if _, err = io.ReadFull(r, buf); err != nil { - return nil, err - } - return index.NewReader(index.RealByteSlice(buf)) -} - -func (b storageObject) openSymdb(ctx context.Context, svc *metastorev1.TenantService) (*symdb.Reader, error) { - offset := b.sectionOffsetByIndex(svc, symdbSectionIdx) - size := b.sectionSizeByIndex(svc, symdbSectionIdx) - reader := objstore.NewBucketReaderWithOffset(b.storage, offset) - symbols, err := symdb.OpenObject(ctx, reader, b.path, size, - symdb.WithPrefetchSize(32<<10)) - if err != nil { - return nil, err - } - return symbols, nil -} - -func (b storageObject) openProfileTable(ctx context.Context, svc *metastorev1.TenantService) (*parquet.File, error) { - offset := b.sectionOffsetByIndex(svc, profileTableSectionIdx) - size := b.sectionSizeByIndex(svc, profileTableSectionIdx) - rat := &objstore.ReaderAt{ - Context: ctx, - GetRangeReader: b.storage, - Name: b.path, - Offset: offset, - } - return parquet.OpenFile(rat, size) // TODO: options, etc. -} diff --git a/pkg/compactionworker/symbol_compactor.go b/pkg/compactionworker/symbol_compactor.go deleted file mode 100644 index fdfe7bbb7e..0000000000 --- a/pkg/compactionworker/symbol_compactor.go +++ /dev/null @@ -1,136 +0,0 @@ -package compactionworker - -import ( - "os" - "path/filepath" - - "github.com/parquet-go/parquet-go" - "github.com/prometheus/common/model" - - phlaremodel "github.com/grafana/pyroscope/pkg/model" - schemav1 "github.com/grafana/pyroscope/pkg/phlaredb/schemas/v1" - "github.com/grafana/pyroscope/pkg/phlaredb/symdb" -) - -type SymbolsRewriter interface { - ReWriteRow(profile profileRow) error - Close() error -} - -type SymbolsRewriterFn func(blockPath string) SymbolsRewriter - -type profileRow struct { - timeNanos int64 - - labels phlaremodel.Labels - fp model.Fingerprint - row schemav1.ProfileRow - - serviceReader *serviceReader -} - -type symbolsCompactor struct { - version symdb.FormatVersion - rewriters map[*serviceReader]*symdb.Rewriter - w *symdb.SymDB - stacktraces []uint32 - - dst string - flushed bool -} - -func newSymbolsCompactor(path string, version symdb.FormatVersion) *symbolsCompactor { - return &symbolsCompactor{ - version: version, - w: symdb.NewSymDB(symdb.DefaultConfig(). - WithVersion(symdb.FormatV3). - WithDirectory(path)), - dst: path, - rewriters: make(map[*serviceReader]*symdb.Rewriter), - } -} - -func (s *symbolsCompactor) Rewriter(dst string) SymbolsRewriter { - return &symbolsRewriter{ - symbolsCompactor: s, - dst: dst, - } -} - -type symbolsRewriter struct { - *symbolsCompactor - - numSamples uint64 - dst string -} - -func (s *symbolsRewriter) NumSamples() uint64 { return s.numSamples } - -func (s *symbolsRewriter) ReWriteRow(profile profileRow) error { - total, err := s.symbolsCompactor.ReWriteRow(profile) - s.numSamples += total - return err -} - -func (s *symbolsRewriter) Close() error { - return s.symbolsCompactor.Flush() -} - -func (s *symbolsCompactor) ReWriteRow(profile profileRow) (uint64, error) { - var ( - err error - rewrittenSamples uint64 - ) - profile.row.ForStacktraceIDsValues(func(values []parquet.Value) { - s.loadStacktraceIDs(values) - r, ok := s.rewriters[profile.serviceReader] - if !ok { - r = symdb.NewRewriter(s.w, profile.serviceReader.symbolsReader) - s.rewriters[profile.serviceReader] = r - } - if err = r.Rewrite(profile.row.StacktracePartitionID(), s.stacktraces); err != nil { - return - } - rewrittenSamples += uint64(len(values)) - for i, v := range values { - // FIXME: the original order is not preserved, which will affect encoding. - values[i] = parquet.Int64Value(int64(s.stacktraces[i])).Level(v.RepetitionLevel(), v.DefinitionLevel(), v.Column()) - } - }) - if err != nil { - return rewrittenSamples, err - } - return rewrittenSamples, nil -} - -func (s *symbolsCompactor) Flush() error { - if s.flushed { - return nil - } - if err := s.w.Flush(); err != nil { - return err - } - s.flushed = true - return nil -} - -func (s *symbolsCompactor) Close() error { - if s.version == symdb.FormatV3 { - return os.RemoveAll(filepath.Join(s.dst, symdb.DefaultFileName)) - } - return os.RemoveAll(s.dst) -} - -func (s *symbolsCompactor) loadStacktraceIDs(values []parquet.Value) { - s.stacktraces = grow(s.stacktraces, len(values)) - for i := range values { - s.stacktraces[i] = values[i].Uint32() - } -} - -func grow[T any](s []T, n int) []T { - if cap(s) < n { - return make([]T, n, 2*n) - } - return s[:n] -} diff --git a/pkg/metastore/metastore_compaction_queue_test.go b/pkg/metastore/metastore_compaction_queue_test.go index 6407c9ed8f..2f21ae81ee 100644 --- a/pkg/metastore/metastore_compaction_queue_test.go +++ b/pkg/metastore/metastore_compaction_queue_test.go @@ -16,17 +16,17 @@ func Test_compactionJobQueue(t *testing.T) { assert.True(t, q.enqueue(&compactionpb.CompactionJob{ Name: "job1", - CommitIndex: 1, + RaftLogIndex: 1, CompactionLevel: 0, })) assert.True(t, q.enqueue(&compactionpb.CompactionJob{ Name: "job2", - CommitIndex: 2, + RaftLogIndex: 2, CompactionLevel: 1, })) assert.True(t, q.enqueue(&compactionpb.CompactionJob{ Name: "job3", - CommitIndex: 3, + RaftLogIndex: 3, CompactionLevel: 0, })) @@ -67,5 +67,5 @@ func Test_compactionJobQueue(t *testing.T) { func assertJob(t *testing.T, j *compactionpb.CompactionJob, name string, commitIndex uint64) { require.NotNil(t, j) assert.Equal(t, name, j.Name) - assert.Equal(t, commitIndex, j.CommitIndex) + assert.Equal(t, commitIndex, j.RaftLogIndex) } diff --git a/pkg/metastore/metastore_state_add_block.go b/pkg/metastore/metastore_state_add_block.go index 8e3d659c59..2cc65c6a93 100644 --- a/pkg/metastore/metastore_state_add_block.go +++ b/pkg/metastore/metastore_state_add_block.go @@ -14,6 +14,7 @@ import ( ) func (m *Metastore) AddBlock(_ context.Context, req *metastorev1.AddBlockRequest) (*metastorev1.AddBlockResponse, error) { + _ = level.Info(m.logger).Log("msg", "adding block", "block_id", req.Block.Id, "shard", req.Block.Shard) t1 := time.Now() defer func() { m.metrics.raftAddBlockDuration.Observe(time.Since(t1).Seconds()) @@ -23,8 +24,6 @@ func (m *Metastore) AddBlock(_ context.Context, req *metastorev1.AddBlockRequest } func (m *metastoreState) applyAddBlock(_ *raft.Log, request *metastorev1.AddBlockRequest) (*metastorev1.AddBlockResponse, error) { - _ = level.Info(m.logger).Log("msg", "adding block", "block_id", request.Block.Id) - name, key := keyForBlockMeta(request.Block.Shard, "", request.Block.Id) value, err := request.Block.MarshalVT() if err != nil { diff --git a/pkg/objstore/reader.go b/pkg/objstore/reader.go index 493c0ac84d..880c53ed1b 100644 --- a/pkg/objstore/reader.go +++ b/pkg/objstore/reader.go @@ -1,7 +1,6 @@ package objstore import ( - "bytes" "context" "fmt" "io" @@ -95,7 +94,7 @@ func (b *ReaderAt) Close() error { return nil } -func FetchRange(ctx context.Context, dst *bytes.Buffer, name string, storage objstore.BucketReader, off, size int64) error { +func ReadRange(ctx context.Context, reader io.ReaderFrom, name string, storage objstore.BucketReader, off, size int64) error { if size == 0 { attrs, err := storage.Attributes(ctx, name) if err != nil { @@ -113,9 +112,7 @@ func FetchRange(ctx context.Context, dst *bytes.Buffer, name string, storage obj defer func() { _ = rc.Close() }() - dst.Reset() - dst.Grow(int(size) + bytes.MinRead) - n, err := dst.ReadFrom(rc) + n, err := reader.ReadFrom(rc) if err != nil { return err } diff --git a/pkg/phlare/phlare.go b/pkg/phlare/phlare.go index 34f79e29c5..01a6e87ee3 100644 --- a/pkg/phlare/phlare.go +++ b/pkg/phlare/phlare.go @@ -334,7 +334,7 @@ func (f *Phlare) setupModuleManager() error { // Add dependencies deps := map[string][]string{ - All: {Ingester, Distributor, QueryScheduler, QueryFrontend, Querier, StoreGateway, Admin, TenantSettings, Compactor, AdHocProfiles, Metastore, MetastoreClient, QueryBackend, QueryBackendClient}, + All: {Ingester, Distributor, QueryScheduler, QueryFrontend, Querier, StoreGateway, Admin, TenantSettings, Compactor, AdHocProfiles, Metastore, MetastoreClient, QueryBackend, QueryBackendClient, CompactionWorker}, Server: {GRPCGateway}, API: {Server}, diff --git a/pkg/phlaredb/symdb/block_reader.go b/pkg/phlaredb/symdb/block_reader.go index d38e04e6c7..861a063f82 100644 --- a/pkg/phlaredb/symdb/block_reader.go +++ b/pkg/phlaredb/symdb/block_reader.go @@ -3,7 +3,6 @@ package symdb import ( "bufio" - "bytes" "context" "fmt" "hash/crc32" @@ -15,13 +14,13 @@ import ( "github.com/grafana/dskit/multierror" "github.com/opentracing/opentracing-go" otlog "github.com/opentracing/opentracing-go/log" - "github.com/valyala/bytebufferpool" "golang.org/x/sync/errgroup" "github.com/grafana/pyroscope/pkg/iter" "github.com/grafana/pyroscope/pkg/objstore" "github.com/grafana/pyroscope/pkg/phlaredb/block" schemav1 "github.com/grafana/pyroscope/pkg/phlaredb/schemas/v1" + "github.com/grafana/pyroscope/pkg/util/bufferpool" "github.com/grafana/pyroscope/pkg/util/refctr" ) @@ -40,7 +39,6 @@ type Reader struct { parquetFiles *parquetFiles prefetchSize uint64 - buf *bytebufferpool.ByteBuffer } type Option func(*Reader) @@ -51,13 +49,13 @@ func WithPrefetchSize(size uint64) Option { } } -func OpenObject(ctx context.Context, b objstore.BucketReader, name string, size int64, options ...Option) (*Reader, error) { +func OpenObject(ctx context.Context, b objstore.BucketReader, name string, offset, size int64, options ...Option) (*Reader, error) { f := block.File{ RelPath: name, SizeBytes: uint64(size), } r := &Reader{ - bucket: b, + bucket: objstore.NewBucketReaderWithOffset(b, offset), file: f, } for _, opt := range options { @@ -81,46 +79,30 @@ func OpenObject(ctx context.Context, b objstore.BucketReader, name string, size return r, nil } -var footerPool bytebufferpool.Pool - -func (r *Reader) freeBuf() { - if r.buf != nil { - footerPool.Put(r.buf) - r.buf = nil - } -} - func (r *Reader) openIndexWithPrefetch(ctx context.Context) (err error) { - r.buf = footerPool.Get() - buf := bytes.NewBuffer(r.buf.B) - defer func() { - r.buf.B = buf.Bytes() - if err != nil { - r.freeBuf() - } - }() prefetchSize := r.prefetchSize if prefetchSize > r.file.SizeBytes { prefetchSize = r.file.SizeBytes } - n, err := r.prefetchIndex(ctx, buf, prefetchSize) + n, err := r.prefetchIndex(ctx, prefetchSize) if err == nil && n != 0 { - _, err = r.prefetchIndex(ctx, buf, prefetchSize) + _, err = r.prefetchIndex(ctx, prefetchSize) } return err } -func (r *Reader) prefetchIndex(ctx context.Context, buf *bytes.Buffer, size uint64) (n uint64, err error) { +func (r *Reader) prefetchIndex(ctx context.Context, size uint64) (n uint64, err error) { if size < uint64(FooterSize) { size = uint64(FooterSize) } prefetchOffset := r.file.SizeBytes - size - if err = objstore.FetchRange(ctx, buf, r.file.RelPath, r.bucket, int64(prefetchOffset), int64(size)); err != nil { + buf := bufferpool.GetBuffer(int(size)) + defer bufferpool.Put(buf) + if err = objstore.ReadRange(ctx, buf, r.file.RelPath, r.bucket, int64(prefetchOffset), int64(size)); err != nil { return 0, fmt.Errorf("fetching index: %w", err) } - b := buf.Bytes() footerOffset := size - uint64(FooterSize) - if err = r.footer.UnmarshalBinary(b[footerOffset:]); err != nil { + if err = r.footer.UnmarshalBinary(buf.B[footerOffset:]); err != nil { return 0, fmt.Errorf("unmarshaling footer: %w", err) } if prefetchOffset > (r.footer.IndexOffset) { @@ -128,7 +110,7 @@ func (r *Reader) prefetchIndex(ctx context.Context, buf *bytes.Buffer, size uint } // prefetch offset is less that or equal to the index offset. indexOffset := r.footer.IndexOffset - prefetchOffset - if r.index, err = OpenIndex(b[indexOffset:footerOffset]); err != nil { + if r.index, err = OpenIndex(buf.B[indexOffset:footerOffset]); err != nil { return 0, fmt.Errorf("opening index: %w", err) } return 0, nil @@ -301,7 +283,6 @@ func (r *Reader) Close() error { if r.parquetFiles != nil { return r.parquetFiles.Close() } - r.freeBuf() return nil } diff --git a/pkg/querybackend/block/merge.go b/pkg/querybackend/block/compaction.go similarity index 53% rename from pkg/querybackend/block/merge.go rename to pkg/querybackend/block/compaction.go index 8083ed7d22..7bc92dcfc5 100644 --- a/pkg/querybackend/block/merge.go +++ b/pkg/querybackend/block/compaction.go @@ -2,129 +2,185 @@ package block import ( "context" + "crypto/rand" "fmt" "os" "path/filepath" "slices" "sort" "strings" + "sync" + "time" - "github.com/go-kit/log" - "github.com/grafana/dskit/runutil" + "github.com/grafana/dskit/multierror" + "github.com/oklog/ulid" "github.com/parquet-go/parquet-go" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/storage" + "golang.org/x/sync/errgroup" metastorev1 "github.com/grafana/pyroscope/api/gen/proto/go/metastore/v1" - "github.com/grafana/pyroscope/pkg/iter" phlaremodel "github.com/grafana/pyroscope/pkg/model" + "github.com/grafana/pyroscope/pkg/objstore" "github.com/grafana/pyroscope/pkg/phlaredb/block" schemav1 "github.com/grafana/pyroscope/pkg/phlaredb/schemas/v1" "github.com/grafana/pyroscope/pkg/phlaredb/symdb" "github.com/grafana/pyroscope/pkg/phlaredb/tsdb/index" + "github.com/grafana/pyroscope/pkg/util" ) var ( - ErrNoBlocksToMerge = fmt.Errorf("no blocks to merge") - ErrNoTenantServicesToMerge = fmt.Errorf("no tenant services to merge") - ErrShardMergeMismatch = fmt.Errorf("only blocks from the same shard can be merged") - ErrLevelMergeMismatch = fmt.Errorf("only blocks os the same compaction level can be merged") + ErrNoBlocksToMerge = fmt.Errorf("no blocks to merge") + ErrShardMergeMismatch = fmt.Errorf("only blocks from the same shard can be merged") ) -func EstimateMergeMemoryFootprint(objects []*Object) (int64, error) { - groups, err := tenantServiceMergeIterator(objects) +func Compact( + ctx context.Context, + blocks []*metastorev1.BlockMeta, + storage objstore.Bucket, +) (m []*metastorev1.BlockMeta, err error) { + objects := ObjectsFromMetas(storage, blocks) + plan, err := PlanCompaction(objects) if err != nil { - return 0, err + return nil, err + } + + if err = objects.Open(ctx); err != nil { + return nil, err + } + defer func() { + err = objects.Close() + }() + + compacted := make([]*metastorev1.BlockMeta, len(plan)) + for i, p := range plan { + compacted[i], err = p.Compact(ctx, storage) + if err != nil { + return compacted, err + } } - var m int64 - for groups.Next() { - g := groups.At() - g.estimate() - m = max(m, g.estimates.inMemorySizeTotal()) + + return compacted, nil +} + +// ObjectsFromMetas binds block metas to corresponding objects in the storage. +func ObjectsFromMetas(storage objstore.Bucket, blocks []*metastorev1.BlockMeta) Objects { + objects := make([]*Object, len(blocks)) + for i, m := range blocks { + objects[i] = NewObject(storage, m) } - return m, nil + return objects } -func tenantServiceMergeIterator(objects []*Object) (iter.Iterator[*tenantServiceMerge], error) { +func PlanCompaction(objects Objects) ([]*CompactionPlan, error) { if len(objects) == 0 { // Even if there's just a single object, we still need to rewrite it. return nil, ErrNoBlocksToMerge } + r := objects[0] + var c uint32 for _, obj := range objects { if r.meta.Shard != obj.meta.Shard { return nil, ErrShardMergeMismatch } - // This is not strictly necessary, but it's a good sanity check. - if r.meta.CompactionLevel != obj.meta.CompactionLevel { - return nil, ErrLevelMergeMismatch - } + c = max(c, obj.meta.CompactionLevel) } + c++ - services := make(map[tenantServiceKey]*tenantServiceMerge) + m := make(map[string]*CompactionPlan) for _, obj := range objects { for _, s := range obj.meta.TenantServices { - k := tenantServiceKey{tenantID: s.TenantId, name: s.Name} - m, ok := services[k] + tm, ok := m[s.TenantId] if !ok { - m = newTenantServiceMerge(k) + tm = newBlockCompaction(s.TenantId, r.meta.Shard, c) + m[s.TenantId] = tm } - m.append(NewTenantService(s, obj)) + sm := tm.addTenantService(s) + // Bind objects to services. + sm.append(NewTenantService(s, obj)) } } - if len(services) == 0 { - return nil, ErrNoTenantServicesToMerge - } - for _, group := range services { - slices.SortFunc(group.services, func(a, b *TenantService) int { - return strings.Compare(a.obj.path, b.obj.path) + ordered := make([]*CompactionPlan, 0, len(m)) + for _, tm := range m { + ordered = append(ordered, tm) + slices.SortFunc(tm.services, func(a, b *tenantServiceCompaction) int { + return strings.Compare(a.meta.Name, b.meta.Name) }) } - - groups := make([]*tenantServiceMerge, 0, len(services)) - for _, g := range services { - groups = append(groups, g) - } - slices.SortFunc(groups, func(a, b *tenantServiceMerge) int { - return a.tenantServiceKey.compare(b.tenantServiceKey) + slices.SortFunc(ordered, func(a, b *CompactionPlan) int { + return strings.Compare(a.tenantID, b.tenantID) }) - return iter.NewSliceIterator(groups), nil + return ordered, nil } -type tenantServiceKey struct { - tenantID string - name string +type CompactionPlan struct { + tenantID string + serviceMap map[string]*tenantServiceCompaction + services []*tenantServiceCompaction + meta *metastorev1.BlockMeta } -func (k tenantServiceKey) compare(x tenantServiceKey) int { - if k.tenantID != x.tenantID { - return strings.Compare(k.tenantID, x.tenantID) +func newBlockCompaction(tenantID string, shard uint32, compactionLevel uint32) *CompactionPlan { + return &CompactionPlan{ + tenantID: tenantID, + serviceMap: make(map[string]*tenantServiceCompaction), + meta: &metastorev1.BlockMeta{ + FormatVersion: 1, + Id: ulid.MustNew(uint64(time.Now().UnixMilli()), rand.Reader).String(), + TenantId: tenantID, + Shard: shard, + CompactionLevel: compactionLevel, + TenantServices: nil, + MinTime: 0, + MaxTime: 0, + Size: 0, + }, } - return strings.Compare(k.name, x.name) } -type tenantServiceMerge struct { - tenantServiceKey - log log.Logger - path string - - meta *metastorev1.TenantService - services []*TenantService - ptypes map[string]struct{} +func (b *CompactionPlan) Estimate() { + // TODO(kolesnikovae): Implement. +} - indexRewriter *indexRewriter - symbolsRewriter *symbolsRewriter - profilesWriter *profilesWriter +func (b *CompactionPlan) Compact(ctx context.Context, storage objstore.Bucket) (m *metastorev1.BlockMeta, err error) { + dir := filepath.Join(os.TempDir(), "pyroscope-compactor", b.meta.Id) + w := NewBlockWriter(ctx, storage, ObjectPath(b.meta), dir) + defer func() { + err = multierror.New(err, w.Close()).Err() + }() + // Services are compacted in strict order. + for _, s := range b.services { + s.estimate() + // TODO(kolesnikovae): Wait until the required resources are available? + if err = s.compact(ctx, w); err != nil { + return nil, fmt.Errorf("compacting block: %w", err) + } + b.meta.TenantServices = append(b.meta.TenantServices, s.meta) + } + b.meta.Size = w.Offset() + return b.meta, nil +} - estimates mergeEstimates - samples uint64 - series uint64 - profiles uint64 +func (b *CompactionPlan) addTenantService(s *metastorev1.TenantService) *tenantServiceCompaction { + sm, ok := b.serviceMap[s.Name] + if !ok { + sm = newTenantServiceCompaction(s.TenantId, s.Name) + b.serviceMap[s.Name] = sm + b.services = append(b.services, sm) + } + if b.meta.MinTime == 0 || s.MinTime < b.meta.MinTime { + b.meta.MinTime = s.MinTime + } + if s.MaxTime > b.meta.MaxTime { + b.meta.MaxTime = s.MaxTime + } + return sm } -type mergeEstimates struct { +type compactionEstimates struct { inMemorySizeInputSymbols int64 inMemorySizeInputIndex int64 inMemorySizeInputProfiles int64 @@ -138,7 +194,7 @@ type mergeEstimates struct { outputSizeProfiles int64 } -func (m *mergeEstimates) inMemorySizeTotal() int64 { +func (m *compactionEstimates) inMemorySizeTotal() int64 { return m.inMemorySizeInputSymbols + m.inMemorySizeInputIndex + m.inMemorySizeInputProfiles + @@ -147,17 +203,43 @@ func (m *mergeEstimates) inMemorySizeTotal() int64 { m.inMemorySizeOutputProfiles } -func newTenantServiceMerge(k tenantServiceKey) *tenantServiceMerge { - return &tenantServiceMerge{ - tenantServiceKey: k, +type tenantServiceCompaction struct { + meta *metastorev1.TenantService + ptypes map[string]struct{} + path string // Set at open. + + services []*TenantService + + indexRewriter *indexRewriter + symbolsRewriter *symbolsRewriter + profilesWriter *profilesWriter + + estimates compactionEstimates + samples uint64 + series uint64 + profiles uint64 + + flushOnce sync.Once +} + +func newTenantServiceCompaction(tenantID, name string) *tenantServiceCompaction { + return &tenantServiceCompaction{ + ptypes: make(map[string]struct{}, 10), meta: &metastorev1.TenantService{ - TenantId: k.tenantID, - Name: k.name, + TenantId: tenantID, + Name: name, + // Updated at append. + MinTime: 0, + MaxTime: 0, + // Updated at writeTo. + TableOfContents: nil, + Size: 0, + ProfileTypes: nil, }, } } -func (m *tenantServiceMerge) append(s *TenantService) { +func (m *tenantServiceCompaction) append(s *TenantService) { m.services = append(m.services, s) if m.meta.MinTime == 0 || s.meta.MinTime < m.meta.MinTime { m.meta.MinTime = s.meta.MinTime @@ -170,20 +252,26 @@ func (m *tenantServiceMerge) append(s *TenantService) { } } -func (m *tenantServiceMerge) build() *metastorev1.TenantService { - m.meta.ProfileTypes = make([]string, 0, len(m.ptypes)) - for pt := range m.ptypes { - m.meta.ProfileTypes = append(m.meta.ProfileTypes, pt) +func (m *tenantServiceCompaction) compact(ctx context.Context, w *Writer) (err error) { + if err = m.open(ctx, w.Dir()); err != nil { + return fmt.Errorf("failed to open sections for compaction: %w", err) } - sort.Strings(m.meta.ProfileTypes) - // TODO: Collect files, fill TOC, Size - return m.meta + defer func() { + err = multierror.New(err, m.cleanup()).Err() + }() + if err = m.mergeAndClose(ctx); err != nil { + return fmt.Errorf("failed to merge profiles: %w", err) + } + if err = m.writeTo(w); err != nil { + return fmt.Errorf("failed to write sections: %w", err) + } + return nil } // TODO(kolesnikovae): // - Add statistics to the block meta. // - Measure. Ideally, we should track statistics. -func (m *tenantServiceMerge) estimate() { +func (m *tenantServiceCompaction) estimate() { columns := len(schemav1.ProfilesSchema.Columns()) // Services are to be opened concurrently. for _, s := range m.services { @@ -230,30 +318,65 @@ func (m *tenantServiceMerge) estimate() { m.estimates.inMemorySizeOutputProfiles += columnBuffers + pageBuffers } -func (m *tenantServiceMerge) open() (rows iter.Iterator[ProfileEntry], err error) { +func (m *tenantServiceCompaction) open(ctx context.Context, path string) (err error) { + m.path = path + defer func() { + if err != nil { + err = multierror.New(err, m.cleanup()).Err() + } + }() + if err = os.MkdirAll(m.path, 0o777); err != nil { - return nil, err + return err } + m.profilesWriter, err = newProfileWriter(m.path, m.estimates.outputSizeProfiles) if err != nil { - return nil, err + return err } + m.indexRewriter = newIndexRewriter(m.path) m.symbolsRewriter = newSymbolsRewriter(m.path) - return NewMergeRowProfileIterator(m.services) + + g, ctx := errgroup.WithContext(ctx) + for _, s := range m.services { + s := s + g.Go(util.RecoverPanic(func() error { + if err = s.Open(ctx, allSections...); err != nil { + return fmt.Errorf("opening tenant service (block %s): %w", s.obj.path, err) + } + return nil + })) + } + if err = g.Wait(); err != nil { + merr := multierror.New(err) + for _, s := range m.services { + merr.Add(s.Close()) + } + return merr.Err() + } + + return nil } -func (m *tenantServiceMerge) merge(ctx context.Context, path string) (err error) { - m.path = path - m.estimate() - rows, err := m.open() +func (m *tenantServiceCompaction) mergeAndClose(ctx context.Context) (err error) { + defer func() { + err = multierror.New(err, m.close()).Err() + }() + return m.merge(ctx) +} + +func (m *tenantServiceCompaction) merge(ctx context.Context) (err error) { + rows, err := NewMergeRowProfileIterator(m.services) if err != nil { return err } - defer runutil.CloseWithLogOnErr(m.log, rows, "closing rows iterator") + defer func() { + err = multierror.New(err, rows.Close()).Err() + }() var i int for rows.Next() { - if i++; i%10000 == 0 { + if i++; i%1000 == 0 { if err = ctx.Err(); err != nil { return err } @@ -262,13 +385,10 @@ func (m *tenantServiceMerge) merge(ctx context.Context, path string) (err error) return err } } - if err = rows.Err(); err != nil { - return err - } - return m.Close() + return rows.Err() } -func (m *tenantServiceMerge) writeRow(r ProfileEntry) (err error) { +func (m *tenantServiceCompaction) writeRow(r ProfileEntry) (err error) { if err = m.indexRewriter.rewriteRow(r); err != nil { return err } @@ -278,22 +398,51 @@ func (m *tenantServiceMerge) writeRow(r ProfileEntry) (err error) { return m.profilesWriter.writeRow(r) } -func (m *tenantServiceMerge) Close() (err error) { - if err = m.symbolsRewriter.Flush(); err != nil { - return err - } - m.samples = m.symbolsRewriter.samples - if err = m.indexRewriter.Flush(); err != nil { +func (m *tenantServiceCompaction) close() (err error) { + m.flushOnce.Do(func() { + merr := multierror.New() + merr.Add(m.symbolsRewriter.Flush()) + merr.Add(m.indexRewriter.Flush()) + merr.Add(m.profilesWriter.Close()) + m.samples = m.symbolsRewriter.samples + m.series = m.indexRewriter.NumSeries() + m.profiles = m.profilesWriter.profiles + m.symbolsRewriter = nil + m.indexRewriter = nil + m.profilesWriter = nil + // Note that m.services are closed by merge + // iterator as they reach the end of the profile + // table. We do it here again just in case. + // TODO(kolesnikovae): Double check error handling. + m.services = nil + err = merr.Err() + }) + return err +} + +func (m *tenantServiceCompaction) writeTo(w *Writer) (err error) { + off := w.Offset() + m.meta.TableOfContents, err = w.ReadFromFiles( + FileNameProfilesParquet, + block.IndexFilename, + symdb.DefaultFileName, + ) + if err != nil { return err } - m.series = m.indexRewriter.NumSeries() - if err = m.profilesWriter.Close(); err != nil { - return err + m.meta.Size = w.Offset() - off + m.meta.ProfileTypes = make([]string, 0, len(m.ptypes)) + for pt := range m.ptypes { + m.meta.ProfileTypes = append(m.meta.ProfileTypes, pt) } - m.profiles = m.profilesWriter.profiles + sort.Strings(m.meta.ProfileTypes) return nil } +func (m *tenantServiceCompaction) cleanup() error { + return os.RemoveAll(m.path) +} + func newIndexRewriter(path string) *indexRewriter { return &indexRewriter{ symbols: make(map[string]struct{}), @@ -343,7 +492,9 @@ func (rw *indexRewriter) NumSeries() uint64 { return uint64(len(rw.series)) } func (rw *indexRewriter) Flush() error { w, err := index.NewWriter(context.Background(), filepath.Join(rw.path, block.IndexFilename), - index.SegmentsIndexWriterBufSize) + // There is no particular reason to use a buffer (bufio.Writer) + // larger than the default one when writing on disk + 4<<10) if err != nil { return err } @@ -357,7 +508,7 @@ func (rw *indexRewriter) Flush() error { // Add symbols for _, symbol := range symbols { - if err := w.AddSymbol(symbol); err != nil { + if err = w.AddSymbol(symbol); err != nil { return err } } @@ -398,7 +549,6 @@ func (s *symbolsRewriter) rewriteRow(e ProfileEntry) (err error) { } s.samples += uint64(len(values)) for i, v := range values { - // FIXME: the original order is not preserved, which will affect encoding. values[i] = parquet.Int64Value(int64(s.stacktraces[i])).Level(v.RepetitionLevel(), v.DefinitionLevel(), v.Column()) } }) @@ -415,7 +565,7 @@ func (s *symbolsRewriter) rewriterFor(x *TenantService) *symdb.Rewriter { } func (s *symbolsRewriter) loadStacktraceIDs(values []parquet.Value) { - s.stacktraces = slices.Grow(s.stacktraces[0:], len(values)) + s.stacktraces = slices.Grow(s.stacktraces[0:], len(values))[:len(values)] for i := range values { s.stacktraces[i] = values[i].Uint32() } diff --git a/pkg/querybackend/block/compaction_test.go b/pkg/querybackend/block/compaction_test.go new file mode 100644 index 0000000000..2304c937ec --- /dev/null +++ b/pkg/querybackend/block/compaction_test.go @@ -0,0 +1,30 @@ +package block + +import ( + "context" + "os" + "testing" + + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/encoding/protojson" + + compactorv1 "github.com/grafana/pyroscope/api/gen/proto/go/compactor/v1" + "github.com/grafana/pyroscope/pkg/objstore/testutil" +) + +func Test_CompactBlocks(t *testing.T) { + ctx := context.Background() + bucket, _ := testutil.NewFilesystemBucket(t, ctx, "testdata") + + var blockMetas compactorv1.CompletedJob // same contract, can break in the future + blockMetasData, err := os.ReadFile("testdata/block-metas.json") + require.NoError(t, err) + err = protojson.Unmarshal(blockMetasData, &blockMetas) + require.NoError(t, err) + + compactedBlocks, err := Compact(ctx, blockMetas.Blocks, bucket) + require.NoError(t, err) + + // TODO: Assertions. + require.Len(t, compactedBlocks, 1) +} diff --git a/pkg/querybackend/block/constants.go b/pkg/querybackend/block/constants.go new file mode 100644 index 0000000000..9d5b064f8a --- /dev/null +++ b/pkg/querybackend/block/constants.go @@ -0,0 +1,78 @@ +package block + +const ( + DirPathSegment = "segments/" + DirPathBlock = "blocks/" + DirNameAnonTenant = "anon" + + FileNameProfilesParquet = "profiles.parquet" + FileNameDataObject = "block.bin" +) + +const ( + defaultObjectSizeLoadInMemory = 1 << 20 + defaultTenantServiceSizeLoadInMemory = 1 << 20 + + maxRowsPerRowGroup = 10 << 10 + symbolsPrefetchSize = 32 << 10 + compactionCopyBufferSize = 32 << 10 +) + +func estimateReadBufferSize(s int64) int { + const minSize = 64 << 10 + const maxSize = 1 << 20 + // Parquet has global buffer map, where buffer size is key, + // so we want a low cardinality here. + e := nextPowerOfTwo(uint32(s / 10)) + if e < minSize { + return minSize + } + return int(min(e, maxSize)) +} + +// This is a verbatim copy of estimateReadBufferSize. +// It's kept for the sake of clarity and to avoid confusion. +func estimatePageBufferSize(s int64) int { + const minSize = 64 << 10 + const maxSize = 1 << 20 + e := nextPowerOfTwo(uint32(s / 10)) + if e < minSize { + return minSize + } + return int(min(e, maxSize)) +} + +func estimateFooterSize(size int64) int64 { + var s int64 + // as long as we don't keep the exact footer sizes in the meta estimate it + if size > 0 { + s = size / 10000 + } + // set a minimum footer size of 32KiB + if s < 32<<10 { + s = 32 << 10 + } + // set a maximum footer size of 512KiB + if s > 512<<10 { + s = 512 << 10 + } + // now check clamp it to the actual size of the whole object + if s > size { + s = size + } + return s +} + +func nextPowerOfTwo(n uint32) uint32 { + if n == 0 { + return 1 + } + n-- + n |= n >> 1 + n |= n >> 2 + n |= n >> 4 + n |= n >> 8 + n |= n >> 16 + n++ + return n +} diff --git a/pkg/querybackend/block/object.go b/pkg/querybackend/block/object.go index 2e699f4672..474be569b0 100644 --- a/pkg/querybackend/block/object.go +++ b/pkg/querybackend/block/object.go @@ -1,13 +1,17 @@ package block import ( - "bytes" "context" "fmt" "strconv" + "strings" + + "github.com/grafana/dskit/multierror" + "golang.org/x/sync/errgroup" metastorev1 "github.com/grafana/pyroscope/api/gen/proto/go/metastore/v1" "github.com/grafana/pyroscope/pkg/objstore" + "github.com/grafana/pyroscope/pkg/util/bufferpool" "github.com/grafana/pyroscope/pkg/util/refctr" ) @@ -19,12 +23,6 @@ import ( // - Local cache? Useful for all-in-one deployments. // - Distributed cache. -const ( - segmentDirPath = "segments/" - blockDirPath = "blocks/" - anonTenantDirName = "anon" -) - type Section uint32 const ( @@ -35,6 +33,12 @@ const ( SectionSymbols ) +var allSections = []Section{ + SectionProfiles, + SectionTSDB, + SectionSymbols, +} + var ( // Version-specific. sectionNames = [...][]string{1: {"invalid", "profiles", "tsdb", "symbols"}} @@ -54,8 +58,6 @@ func (sc Section) open(ctx context.Context, s *TenantService) (err error) { } } -const loadInMemorySizeThreshold = 1 << 20 - // Object represents a block or a segment in the object storage. type Object struct { storage objstore.Bucket @@ -63,94 +65,141 @@ type Object struct { meta *metastorev1.BlockMeta refs refctr.Counter - buf *bytes.Buffer + buf *bufferpool.Buffer err error + + memSize int } -type Option func(*Object) +type ObjectOption func(*Object) -func WithPath(path string) Option { +func WithPath(path string) ObjectOption { return func(obj *Object) { obj.path = path } } -func NewObject(storage objstore.Bucket, meta *metastorev1.BlockMeta, opts ...Option) *Object { +func WithObjectMaxSizeLoadInMemory(size int) ObjectOption { + return func(obj *Object) { + obj.memSize = size + } +} + +func NewObject(storage objstore.Bucket, meta *metastorev1.BlockMeta, opts ...ObjectOption) *Object { o := &Object{ storage: storage, meta: meta, + path: ObjectPath(meta), + memSize: defaultObjectSizeLoadInMemory, } for _, opt := range opts { opt(o) } - if o.path == "" { - o.path = ObjectPath(meta) - } return o } func ObjectPath(md *metastorev1.BlockMeta) string { - topLevel := blockDirPath + topLevel := DirPathBlock tenantDirName := md.TenantId if md.CompactionLevel == 0 { - topLevel = segmentDirPath - tenantDirName = anonTenantDirName + topLevel = DirPathSegment + tenantDirName = DirNameAnonTenant } - return topLevel + strconv.Itoa(int(md.Shard)) + "/" + tenantDirName + "/" + md.Id + "/block.bin" + var b strings.Builder + b.WriteString(topLevel) + b.WriteString(strconv.Itoa(int(md.Shard))) + b.WriteByte('/') + b.WriteString(tenantDirName) + b.WriteByte('/') + b.WriteString(md.Id) + b.WriteByte('/') + b.WriteString(FileNameDataObject) + return b.String() } -// OpenShared opens the object, loading the data into memory -// if it's small enough. +// Open opens the object, loading the data into memory if it's small enough. // -// OpenShared may be called multiple times concurrently, but the +// Open may be called multiple times concurrently, but the // object is only initialized once. While it is possible to open // the object repeatedly after close, the caller must pass the -// failure reason to the "CloseShared" call, preventing further -// use, if applicable. -func (obj *Object) OpenShared(ctx context.Context) error { - obj.err = obj.refs.Inc(func() error { - return obj.Open(ctx) +// failure reason to the "CloseWithError" call, preventing further +// use, if applicable. +func (obj *Object) Open(ctx context.Context) error { + return obj.refs.IncErr(func() error { + return obj.open(ctx) }) - return obj.err } -func (obj *Object) Open(ctx context.Context) error { +func (obj *Object) open(ctx context.Context) (err error) { if obj.err != nil { // In case if the object has been already closed with an error, // and then released, return the error immediately. return obj.err } - // Estimate the size of the sections to process, and load the - // data into memory, if it's small enough. if len(obj.meta.TenantServices) == 0 { panic("bug: invalid block meta: at least one section is expected") } - obj.buf = new(bytes.Buffer) // TODO: Take from pool. - if err := objstore.FetchRange(ctx, obj.buf, obj.path, obj.storage, 0, int64(obj.meta.Size)); err != nil { + // Estimate the size of the sections to process, and load the + // data into memory, if it's small enough. + if obj.meta.Size > uint64(obj.memSize) { + return nil + } + defer func() { + if err != nil { + _ = obj.closeErr(err) + } + }() + obj.buf = bufferpool.GetBuffer(int(obj.meta.Size)) + if err = objstore.ReadRange(ctx, obj.buf, obj.path, obj.storage, 0, int64(obj.meta.Size)); err != nil { return fmt.Errorf("loading object into memory: %w", err) } return nil } -// CloseShared closes the object, releasing all the acquired resources, +func (obj *Object) Close() error { + return obj.CloseWithError(nil) +} + +// CloseWithError closes the object, releasing all the acquired resources, // once the last reference is released. If the provided error is not nil, // the object will be marked as failed, preventing any further use. -func (obj *Object) CloseShared(err error) { +func (obj *Object) CloseWithError(err error) (closeErr error) { obj.refs.Dec(func() { - obj.closeErr(err) + closeErr = obj.closeErr(err) }) + return closeErr } -func (obj *Object) Close() error { - obj.closeErr(nil) - return obj.err +func (obj *Object) closeErr(err error) error { + obj.err = err + if obj.buf != nil { + bufferpool.Put(obj.buf) + obj.buf = nil + } + return nil +} + +func (obj *Object) Meta() *metastorev1.BlockMeta { + return obj.meta } -func (obj *Object) closeErr(err error) { - if obj.err == nil { - obj.err = err +type Objects []*Object + +func (s Objects) Open(ctx context.Context) error { + g, ctx := errgroup.WithContext(ctx) + for i := range s { + i := i + g.Go(func() error { + return s[i].Open(ctx) + }) } - obj.buf = nil // TODO: Release. + return g.Wait() } -func (obj *Object) Meta() *metastorev1.BlockMeta { return obj.meta } +func (s Objects) Close() error { + var m multierror.MultiError + for i := range s { + m.Add(s[i].Close()) + } + return m.Err() +} diff --git a/pkg/querybackend/block/section_profiles.go b/pkg/querybackend/block/section_profiles.go index 3d62a4bcfb..f563654348 100644 --- a/pkg/querybackend/block/section_profiles.go +++ b/pkg/querybackend/block/section_profiles.go @@ -1,7 +1,6 @@ package block import ( - "bytes" "context" "encoding/binary" "fmt" @@ -13,7 +12,6 @@ import ( "github.com/parquet-go/parquet-go" "github.com/pkg/errors" "github.com/prometheus/common/model" - "github.com/valyala/bytebufferpool" "github.com/grafana/pyroscope/pkg/iter" phlaremodel "github.com/grafana/pyroscope/pkg/model" @@ -23,12 +21,11 @@ import ( "github.com/grafana/pyroscope/pkg/phlaredb/query" schemav1 "github.com/grafana/pyroscope/pkg/phlaredb/schemas/v1" "github.com/grafana/pyroscope/pkg/phlaredb/tsdb/index" + "github.com/grafana/pyroscope/pkg/util/bufferpool" "github.com/grafana/pyroscope/pkg/util/build" "github.com/grafana/pyroscope/pkg/util/loser" ) -const maxRowsPerRowGroup = 10 << 10 - func openProfileTable(_ context.Context, s *TenantService) (err error) { offset := s.sectionOffset(SectionProfiles) size := s.sectionSize(SectionProfiles) @@ -43,7 +40,6 @@ func openProfileTable(_ context.Context, s *TenantService) (err error) { } else { s.profiles, err = openParquetFile( s.obj.storage, s.obj.path, offset, size, - // TODO(kolesnikovae): Store in TOC. estimateFooterSize(size), parquet.SkipBloomFilters(true), parquet.FileReadMode(parquet.ReadModeAsync), @@ -65,8 +61,6 @@ type ParquetFile struct { path string off int64 size int64 - - footer *bytebufferpool.ByteBuffer } func openParquetFile( @@ -100,19 +94,16 @@ func openParquetFile( var ra io.ReaderAt ra = io.NewSectionReader(r, offset, size) if footerSize > 0 { - p.footer = parquetFooterPool.Get() + buf := bufferpool.GetBuffer(int(footerSize)) defer func() { - // Footer is not accessed after the file is opened. - parquetFooterPool.Put(p.footer) - p.footer = nil + // Footer is not used after the file was opened. + bufferpool.Put(buf) }() - if err = p.fetchFooter(ctx, footerSize); err != nil { + if err = p.fetchFooter(ctx, buf, footerSize); err != nil { return nil, err } - rf := newReaderWithFooter(ra, p.footer.B, size) - defer func() { - rf.free() - }() + rf := newReaderWithFooter(ra, buf.B, size) + defer rf.free() ra = rf } @@ -130,27 +121,20 @@ func (f *ParquetFile) RowReader() *parquet.Reader { return parquet.NewReader(f.File, schemav1.ProfilesSchema) } -var parquetFooterPool bytebufferpool.Pool - -func (f *ParquetFile) fetchFooter(ctx context.Context, estimatedSize int64) error { +func (f *ParquetFile) fetchFooter(ctx context.Context, buf *bufferpool.Buffer, estimatedSize int64) error { // Fetch the footer of estimated size. - buf := bytes.NewBuffer(f.footer.B) // Will be grown if needed. - defer func() { - f.footer.B = buf.Bytes() - }() - if err := objstore.FetchRange(ctx, buf, f.path, f.storage, f.off+f.size-estimatedSize, estimatedSize); err != nil { + if err := objstore.ReadRange(ctx, buf, f.path, f.storage, f.off+f.size-estimatedSize, estimatedSize); err != nil { return err } // Footer size is an uint32 located at size-8. - b := buf.Bytes() - sb := b[f.size-8 : f.size-4] + sb := buf.B[f.size-8 : f.size-4] s := int64(binary.LittleEndian.Uint32(sb)) s += 8 // Include the footer size itself and the magic signature. if estimatedSize >= s { // The footer has been fetched. return nil } - return objstore.FetchRange(ctx, buf, f.path, f.storage, f.off+f.size-s, s) + return objstore.ReadRange(ctx, buf, f.path, f.storage, f.off+f.size-s, s) } func (f *ParquetFile) Close() error { @@ -179,7 +163,7 @@ type profilesWriter struct { } func newProfileWriter(dst string, sizeTotal int64) (*profilesWriter, error) { - f, err := os.Create(filepath.Join(dst, "profiles.parquet")) + f, err := os.Create(filepath.Join(dst, FileNameProfilesParquet)) if err != nil { return nil, err } @@ -212,68 +196,6 @@ func (p *profilesWriter) Close() error { return p.file.Close() } -// TODO(kolesnikovae): Figure out optimal thresholds. -// It'd better if it was stored in the metadata. - -func estimateFooterSize(size int64) int64 { - var s int64 - // as long as we don't keep the exact footer sizes in the meta estimate it - if size > 0 { - s = size / 10000 - } - // set a minimum footer size of 32KiB - if s < 32<<10 { - s = 32 << 10 - } - // set a maximum footer size of 512KiB - if s > 512<<10 { - s = 512 << 10 - } - // now check clamp it to the actual size of the whole object - if s > size { - s = size - } - return s -} - -func estimateReadBufferSize(s int64) int { - const minSize = 64 << 10 - const maxSize = 2 << 20 - // Parquet has global buffer map, where buffer size is key, - // so we want a low cardinality here. - e := nextPowerOfTwo(uint32(s / 10)) - if e < minSize { - return minSize - } - return int(min(e, maxSize)) -} - -// This is a verbatim copy of estimateReadBufferSize. -// It's kept for the sake of clarity and to avoid confusion. -func estimatePageBufferSize(s int64) int { - const minSize = 64 << 10 - const maxSize = 2 << 20 - e := nextPowerOfTwo(uint32(s / 10)) - if e < minSize { - return minSize - } - return int(min(e, maxSize)) -} - -func nextPowerOfTwo(n uint32) uint32 { - if n == 0 { - return 1 - } - n-- - n |= n >> 1 - n |= n >> 2 - n |= n >> 4 - n |= n >> 8 - n |= n >> 16 - n++ - return n -} - type readerWithFooter struct { reader io.ReaderAt footer []byte @@ -340,7 +262,7 @@ func NewMergeRowProfileIterator(src []*TenantService) (iter.Iterator[ProfileEntr if len(its) == 1 { return its[0], nil } - return &dedupeProfileRowIterator{ + return &DedupeProfileRowIterator{ Iterator: iter.NewTreeIterator(loser.New( its, ProfileEntry{ @@ -366,14 +288,14 @@ func NewMergeRowProfileIterator(src []*TenantService) (iter.Iterator[ProfileEntr }, nil } -type dedupeProfileRowIterator struct { +type DedupeProfileRowIterator struct { iter.Iterator[ProfileEntry] prevFP model.Fingerprint prevTimeNanos int64 } -func (it *dedupeProfileRowIterator) Next() bool { +func (it *DedupeProfileRowIterator) Next() bool { for { if !it.Iterator.Next() { return false diff --git a/pkg/querybackend/block/section_symbols.go b/pkg/querybackend/block/section_symbols.go index 95e33135ec..0b8701ef2d 100644 --- a/pkg/querybackend/block/section_symbols.go +++ b/pkg/querybackend/block/section_symbols.go @@ -4,7 +4,6 @@ import ( "context" "fmt" - "github.com/grafana/pyroscope/pkg/objstore" "github.com/grafana/pyroscope/pkg/phlaredb/symdb" ) @@ -13,12 +12,10 @@ func openSymbols(ctx context.Context, s *TenantService) (err error) { size := s.sectionSize(SectionSymbols) if buf := s.inMemoryBuffer(); buf != nil { offset -= int64(s.offset()) - reader := objstore.NewBucketReaderWithOffset(s.inMemoryBucket(buf), offset) - s.symbols, err = symdb.OpenObject(ctx, reader, s.obj.path, size) + s.symbols, err = symdb.OpenObject(ctx, s.inMemoryBucket(buf), s.obj.path, offset, size) } else { - reader := objstore.NewBucketReaderWithOffset(s.obj.storage, offset) - s.symbols, err = symdb.OpenObject(ctx, reader, s.obj.path, size, - symdb.WithPrefetchSize(32<<10)) + s.symbols, err = symdb.OpenObject(ctx, s.obj.storage, s.obj.path, offset, size, + symdb.WithPrefetchSize(symbolsPrefetchSize)) } if err != nil { return fmt.Errorf("opening symbols: %w", err) diff --git a/pkg/querybackend/block/section_tsdb.go b/pkg/querybackend/block/section_tsdb.go index e279ea1ae3..cc365a48b2 100644 --- a/pkg/querybackend/block/section_tsdb.go +++ b/pkg/querybackend/block/section_tsdb.go @@ -1,27 +1,30 @@ package block import ( - "bytes" "context" "fmt" "github.com/grafana/pyroscope/pkg/objstore" "github.com/grafana/pyroscope/pkg/phlaredb/tsdb/index" + "github.com/grafana/pyroscope/pkg/util/bufferpool" ) func openTSDB(ctx context.Context, s *TenantService) (err error) { offset := s.sectionOffset(SectionTSDB) size := s.sectionSize(SectionTSDB) + s.tsdb = new(tsdbBuffer) + defer func() { + if err != nil { + _ = s.tsdb.Close() + } + }() if buf := s.inMemoryBuffer(); buf != nil { offset -= int64(s.offset()) - s.tsdb, err = index.NewReader(index.RealByteSlice(buf[offset : offset+size])) + s.tsdb.index, err = index.NewReader(index.RealByteSlice(buf[offset : offset+size])) } else { - // TODO(kolesnikovae): This buffer should be reused. - // Caveat: objects returned by tsdb may reference the buffer - // and be still in use after the object is closed. - var dst bytes.Buffer - if err = objstore.FetchRange(ctx, &dst, s.obj.path, s.obj.storage, offset, size); err == nil { - s.tsdb, err = index.NewReader(index.RealByteSlice(dst.Bytes())) + s.tsdb.buf = bufferpool.GetBuffer(int(size)) + if err = objstore.ReadRange(ctx, s.tsdb.buf, s.obj.path, s.obj.storage, offset, size); err == nil { + s.tsdb.index, err = index.NewReader(index.RealByteSlice(s.tsdb.buf.B)) } } if err != nil { @@ -29,3 +32,18 @@ func openTSDB(ctx context.Context, s *TenantService) (err error) { } return nil } + +type tsdbBuffer struct { + index *index.Reader + buf *bufferpool.Buffer +} + +func (b *tsdbBuffer) Close() (err error) { + if b.buf != nil { + bufferpool.Put(b.buf) + } + if b.index != nil { + err = b.index.Close() + } + return err +} diff --git a/pkg/querybackend/block/tenant_service.go b/pkg/querybackend/block/tenant_service.go index 27599e9445..bbf4244d4a 100644 --- a/pkg/querybackend/block/tenant_service.go +++ b/pkg/querybackend/block/tenant_service.go @@ -1,7 +1,6 @@ package block import ( - "bytes" "context" "fmt" @@ -14,8 +13,8 @@ import ( "github.com/grafana/pyroscope/pkg/objstore/providers/memory" "github.com/grafana/pyroscope/pkg/phlaredb" "github.com/grafana/pyroscope/pkg/phlaredb/symdb" - "github.com/grafana/pyroscope/pkg/phlaredb/tsdb/index" "github.com/grafana/pyroscope/pkg/util" + "github.com/grafana/pyroscope/pkg/util/bufferpool" "github.com/grafana/pyroscope/pkg/util/refctr" ) @@ -24,47 +23,63 @@ type TenantService struct { obj *Object refs refctr.Counter - buf *bytes.Buffer + buf *bufferpool.Buffer err error - tsdb *index.Reader + tsdb *tsdbBuffer symbols *symdb.Reader profiles *ParquetFile + + memSize int } func NewTenantService(meta *metastorev1.TenantService, obj *Object) *TenantService { return &TenantService{ - meta: meta, - obj: obj, + meta: meta, + obj: obj, + memSize: defaultTenantServiceSizeLoadInMemory, } } -func (s *TenantService) OpenShared(ctx context.Context, sections ...Section) (err error) { - s.err = s.refs.Inc(func() error { - return s.Open(ctx, sections...) - }) - return s.err +type TenantServiceOption func(*TenantService) + +func WithTenantServiceMaxSizeLoadInMemory(size int) TenantServiceOption { + return func(s *TenantService) { + s.memSize = size + } } +// Open opens the service, initializing the sections specified. +// +// Open may be called multiple times concurrently, but the service +// is only initialized once. While it is possible to open the service +// repeatedly after close, the caller must pass the failure reason to +// the CloseWithError call, preventing further use, if applicable. func (s *TenantService) Open(ctx context.Context, sections ...Section) (err error) { + return s.refs.IncErr(func() error { + return s.open(ctx, sections...) + }) +} + +func (s *TenantService) open(ctx context.Context, sections ...Section) (err error) { if s.err != nil { // The tenant service has been already closed with an error. return s.err } - if err = s.obj.OpenShared(ctx); err != nil { + if err = s.obj.Open(ctx); err != nil { return fmt.Errorf("failed to open object: %w", err) } defer func() { // Close the object here because the tenant service won't be // closed if it fails to open. if err != nil { - s.obj.CloseShared(err) + _ = s.closeErr(err) } }() - if s.obj.buf == nil && s.meta.Size < loadInMemorySizeThreshold { - s.buf = new(bytes.Buffer) // TODO: Pool. + if s.obj.buf == nil && s.meta.Size < uint64(s.memSize) { + s.buf = bufferpool.GetBuffer(int(s.meta.Size)) off, size := int64(s.offset()), int64(s.meta.Size) - if err = objstore.FetchRange(ctx, s.buf, s.obj.path, s.obj.storage, off, size); err != nil { + if err = objstore.ReadRange(ctx, s.buf, s.obj.path, s.obj.storage, off, size); err != nil { return fmt.Errorf("loading sections into memory: %w", err) } } @@ -81,34 +96,39 @@ func (s *TenantService) Open(ctx context.Context, sections ...Section) (err erro return g.Wait() } -func (s *TenantService) CloseShared(err error) { +func (s *TenantService) Close() error { return s.CloseWithError(nil) } + +// CloseWithError closes the tenant service and disposes all the resources +// associated with it. +// +// Any further attempts to open the service will return the provided error. +func (s *TenantService) CloseWithError(err error) (closeErr error) { s.refs.Dec(func() { - s.closeErr(err) + closeErr = s.closeErr(err) }) + return closeErr } -func (s *TenantService) Close() error { - s.closeErr(nil) - return s.err -} - -func (s *TenantService) closeErr(err error) { +func (s *TenantService) closeErr(err error) error { + s.err = err if s.buf != nil { - s.buf = nil // TODO: Release. + bufferpool.Put(s.buf) + s.buf = nil } - var m multierror.MultiError - m.Add(s.err) // Preserve the existing error - m.Add(err) // Add the new error, if any. + var merr multierror.MultiError if s.tsdb != nil { - m.Add(s.tsdb.Close()) + merr.Add(s.tsdb.Close()) } if s.symbols != nil { - m.Add(s.symbols.Close()) + merr.Add(s.symbols.Close()) } if s.profiles != nil { - m.Add(s.profiles.Close()) + merr.Add(s.profiles.Close()) + } + if s.obj != nil { + merr.Add(s.obj.CloseWithError(err)) } - s.err = m.Err() + return merr.Err() } func (s *TenantService) Meta() *metastorev1.TenantService { return s.meta } @@ -119,7 +139,7 @@ func (s *TenantService) ProfileRowReader() parquet.RowReader { return s.profiles func (s *TenantService) Symbols() symdb.SymbolsReader { return s.symbols } -func (s *TenantService) Index() phlaredb.IndexReader { return s.tsdb } +func (s *TenantService) Index() phlaredb.IndexReader { return s.tsdb.index } // Offset of the tenant service section within the object. func (s *TenantService) offset() uint64 { return s.meta.TableOfContents[0] } @@ -170,13 +190,13 @@ func (s *TenantService) inMemoryBuffer() []byte { // return the tenant service sub-slice. lo := s.offset() hi := lo + s.meta.Size - buf := s.obj.buf.Bytes() + buf := s.obj.buf.B return buf[lo:hi] } if s.buf != nil { // Otherwise, if the tenant service is loaded into memory // individually, return the buffer. - return s.buf.Bytes() + return s.buf.B } // Otherwise, the tenant service is not loaded into memory. return nil diff --git a/pkg/querybackend/block/testdata/.gitignore b/pkg/querybackend/block/testdata/.gitignore new file mode 100644 index 0000000000..7ad764b60f --- /dev/null +++ b/pkg/querybackend/block/testdata/.gitignore @@ -0,0 +1 @@ +blocks/ diff --git a/pkg/compactionworker/testdata/block-metas.json b/pkg/querybackend/block/testdata/block-metas.json similarity index 100% rename from pkg/compactionworker/testdata/block-metas.json rename to pkg/querybackend/block/testdata/block-metas.json diff --git a/pkg/compactionworker/testdata/segments/1/anon/01J2VJQPYDC160REPAD2VN88XN/block.bin b/pkg/querybackend/block/testdata/segments/1/anon/01J2VJQPYDC160REPAD2VN88XN/block.bin similarity index 100% rename from pkg/compactionworker/testdata/segments/1/anon/01J2VJQPYDC160REPAD2VN88XN/block.bin rename to pkg/querybackend/block/testdata/segments/1/anon/01J2VJQPYDC160REPAD2VN88XN/block.bin diff --git a/pkg/compactionworker/testdata/segments/1/anon/01J2VJQRGBK8YFWVV8K1MPRRWM/block.bin b/pkg/querybackend/block/testdata/segments/1/anon/01J2VJQRGBK8YFWVV8K1MPRRWM/block.bin similarity index 100% rename from pkg/compactionworker/testdata/segments/1/anon/01J2VJQRGBK8YFWVV8K1MPRRWM/block.bin rename to pkg/querybackend/block/testdata/segments/1/anon/01J2VJQRGBK8YFWVV8K1MPRRWM/block.bin diff --git a/pkg/compactionworker/testdata/segments/1/anon/01J2VJQRTMSCY4VDYBP5N4N5JK/block.bin b/pkg/querybackend/block/testdata/segments/1/anon/01J2VJQRTMSCY4VDYBP5N4N5JK/block.bin similarity index 100% rename from pkg/compactionworker/testdata/segments/1/anon/01J2VJQRTMSCY4VDYBP5N4N5JK/block.bin rename to pkg/querybackend/block/testdata/segments/1/anon/01J2VJQRTMSCY4VDYBP5N4N5JK/block.bin diff --git a/pkg/compactionworker/testdata/segments/1/anon/01J2VJQTJ3PGF7KB39ARR1BX3Y/block.bin b/pkg/querybackend/block/testdata/segments/1/anon/01J2VJQTJ3PGF7KB39ARR1BX3Y/block.bin similarity index 100% rename from pkg/compactionworker/testdata/segments/1/anon/01J2VJQTJ3PGF7KB39ARR1BX3Y/block.bin rename to pkg/querybackend/block/testdata/segments/1/anon/01J2VJQTJ3PGF7KB39ARR1BX3Y/block.bin diff --git a/pkg/compactionworker/testdata/segments/1/anon/01J2VJQV544TF571FDSK2H692P/block.bin b/pkg/querybackend/block/testdata/segments/1/anon/01J2VJQV544TF571FDSK2H692P/block.bin similarity index 100% rename from pkg/compactionworker/testdata/segments/1/anon/01J2VJQV544TF571FDSK2H692P/block.bin rename to pkg/querybackend/block/testdata/segments/1/anon/01J2VJQV544TF571FDSK2H692P/block.bin diff --git a/pkg/compactionworker/testdata/segments/1/anon/01J2VJQX8DYHSEBK7BAQSCJBMG/block.bin b/pkg/querybackend/block/testdata/segments/1/anon/01J2VJQX8DYHSEBK7BAQSCJBMG/block.bin similarity index 100% rename from pkg/compactionworker/testdata/segments/1/anon/01J2VJQX8DYHSEBK7BAQSCJBMG/block.bin rename to pkg/querybackend/block/testdata/segments/1/anon/01J2VJQX8DYHSEBK7BAQSCJBMG/block.bin diff --git a/pkg/compactionworker/testdata/segments/1/anon/01J2VJQYQVZTPZMMJKE7F2XC47/block.bin b/pkg/querybackend/block/testdata/segments/1/anon/01J2VJQYQVZTPZMMJKE7F2XC47/block.bin similarity index 100% rename from pkg/compactionworker/testdata/segments/1/anon/01J2VJQYQVZTPZMMJKE7F2XC47/block.bin rename to pkg/querybackend/block/testdata/segments/1/anon/01J2VJQYQVZTPZMMJKE7F2XC47/block.bin diff --git a/pkg/compactionworker/testdata/segments/1/anon/01J2VJQZPARDJQ779S1JMV0XQA/block.bin b/pkg/querybackend/block/testdata/segments/1/anon/01J2VJQZPARDJQ779S1JMV0XQA/block.bin similarity index 100% rename from pkg/compactionworker/testdata/segments/1/anon/01J2VJQZPARDJQ779S1JMV0XQA/block.bin rename to pkg/querybackend/block/testdata/segments/1/anon/01J2VJQZPARDJQ779S1JMV0XQA/block.bin diff --git a/pkg/compactionworker/testdata/segments/1/anon/01J2VJR0R3NQS23SDADNA6XHCM/block.bin b/pkg/querybackend/block/testdata/segments/1/anon/01J2VJR0R3NQS23SDADNA6XHCM/block.bin similarity index 100% rename from pkg/compactionworker/testdata/segments/1/anon/01J2VJR0R3NQS23SDADNA6XHCM/block.bin rename to pkg/querybackend/block/testdata/segments/1/anon/01J2VJR0R3NQS23SDADNA6XHCM/block.bin diff --git a/pkg/compactionworker/testdata/segments/1/anon/01J2VJR31PT3X4NDJC4Q2BHWQ1/block.bin b/pkg/querybackend/block/testdata/segments/1/anon/01J2VJR31PT3X4NDJC4Q2BHWQ1/block.bin similarity index 100% rename from pkg/compactionworker/testdata/segments/1/anon/01J2VJR31PT3X4NDJC4Q2BHWQ1/block.bin rename to pkg/querybackend/block/testdata/segments/1/anon/01J2VJR31PT3X4NDJC4Q2BHWQ1/block.bin diff --git a/pkg/querybackend/block/writer.go b/pkg/querybackend/block/writer.go new file mode 100644 index 0000000000..2fa893aa79 --- /dev/null +++ b/pkg/querybackend/block/writer.go @@ -0,0 +1,94 @@ +package block + +import ( + "context" + "io" + "os" + "path/filepath" + "strconv" + + "github.com/grafana/pyroscope/pkg/objstore" + "github.com/grafana/pyroscope/pkg/util/bufferpool" +) + +// TODO(kolesnikovae): +// - Avoid staging files where possible. +// - If stage files are required, at least avoid +// recreating them for each tenant service. +// - objstore.Bucket should provide object writer. + +type Writer struct { + storage objstore.Bucket + + tmp string + n int + cur string + buf *bufferpool.Buffer + off uint64 + + r *io.PipeReader + w *io.PipeWriter + ctx context.Context + cancel context.CancelFunc + done chan struct{} +} + +func NewBlockWriter(ctx context.Context, storage objstore.Bucket, path string, tmp string) *Writer { + b := &Writer{ + storage: storage, + tmp: tmp, + done: make(chan struct{}), + buf: bufferpool.GetBuffer(compactionCopyBufferSize), + } + b.r, b.w = io.Pipe() + b.ctx, b.cancel = context.WithCancel(ctx) + go func() { + defer close(b.done) + _ = b.w.CloseWithError(storage.Upload(b.ctx, path, b.r)) + }() + return b +} + +// Dir returns path to the new temp directory. +func (b *Writer) Dir() string { + b.n++ + b.cur = filepath.Join(b.tmp, strconv.Itoa(b.n)) + return b.cur +} + +// ReadFromFiles located in the directory Dir. +func (b *Writer) ReadFromFiles(files ...string) (toc []uint64, err error) { + toc = make([]uint64, len(files)) + for i := range files { + toc[i] = b.off + if err = b.ReadFromFile(files[i]); err != nil { + break + } + } + return toc, err +} + +// ReadFromFile located in the directory Dir. +func (b *Writer) ReadFromFile(file string) error { + f, err := os.Open(filepath.Join(b.cur, file)) + if err != nil { + return err + } + defer func() { + _ = f.Close() + }() + b.buf.B = b.buf.B[:cap(b.buf.B)] + n, err := io.CopyBuffer(b.w, f, b.buf.B) + b.off += uint64(n) + return err +} + +func (b *Writer) Offset() uint64 { return b.off } + +func (b *Writer) Close() error { + _ = b.r.Close() + b.cancel() + <-b.done + // b.w is closed before close(d.done). + return os.RemoveAll(b.tmp) +} diff --git a/pkg/querybackend/query.go b/pkg/querybackend/query.go index 6713c83dba..853535bdde 100644 --- a/pkg/querybackend/query.go +++ b/pkg/querybackend/query.go @@ -98,7 +98,7 @@ func newQueryContext( } } -func executeQuery(q *queryContext, query *querybackendv1.Query) (*querybackendv1.Report, error) { +func executeQuery(q *queryContext, query *querybackendv1.Query) (r *querybackendv1.Report, err error) { handle, err := getQueryHandler(query.QueryType) if err != nil { return nil, err @@ -107,21 +107,20 @@ func executeQuery(q *queryContext, query *querybackendv1.Query) (*querybackendv1 return nil, fmt.Errorf("failed to initialize query context: %w", err) } defer func() { - q.close(err) + _ = q.close(err) }() - r, err := handle(q, query) - if r != nil { + if r, err = handle(q, query); r != nil { r.ReportType = QueryReportType(query.QueryType) } return r, err } func (q *queryContext) open() error { - return q.svc.OpenShared(q.ctx, q.sections()...) + return q.svc.Open(q.ctx, q.sections()...) } -func (q *queryContext) close(err error) { - q.svc.CloseShared(err) +func (q *queryContext) close(err error) error { + return q.svc.CloseWithError(err) } func (q *queryContext) sections() []block.Section { diff --git a/pkg/util/bufferpool/pool.go b/pkg/util/bufferpool/pool.go new file mode 100644 index 0000000000..b4fa477460 --- /dev/null +++ b/pkg/util/bufferpool/pool.go @@ -0,0 +1,98 @@ +package bufferpool + +import ( + "bytes" + "io" + "sync" +) + +// Sized *bytes.Buffer pools: from 2^9 (512b) to 2^30 (1GB). +var pools [maxPool]sync.Pool + +type Buffer struct { + B []byte + p int64 +} + +const ( + minBits = 9 + maxPool = 22 +) + +// GetBuffer returns a buffer from the pool, or creates a new one. +// The returned buffer has at least the requested capacity. +func GetBuffer(size int) *Buffer { + i := poolIndex(size) + if i < 0 { + return &Buffer{B: make([]byte, 0, size)} + } + x := pools[i].Get() + if x != nil { + return x.(*Buffer) + } + c := 2 << (minBits + i - 1) + c += bytes.MinRead + return &Buffer{ + B: make([]byte, 0, c), + p: i, + } +} + +// Put places the buffer into the pool. +func Put(b *Buffer) { + if b == nil { + return + } + if p := returnPool(cap(b.B), b.p); p > 0 { + b.B = b.B[:0] + pools[p].Put(b) + } +} + +func returnPool(c int, p int64) int64 { + // Empty buffers are ignored. + if c == 0 { + return -1 + } + i := poolIndex(c) + if p == 0 { + // The buffer does not belong to any pool, or it's + // of the smallest size. We pick the pool based on + // its current capacity. + return i + } + d := i - p + if d < 0 { + // This buffer was likely obtained outside the pool. + // For example, an empty one, or with pre-allocated + // byte slice. + return i + } + if d > 1 { + // Relocate the buffer, if it's capacity has been + // grown by more than a power of two. + return i + } + // Otherwise, keep the buffer in the current pool. + return p +} + +func poolIndex(n int) (i int64) { + n-- + n >>= minBits + for n > 0 { + n >>= 1 + i++ + } + if i >= maxPool { + return -1 + } + return i +} + +func (b *Buffer) ReadFrom(r io.Reader) (int64, error) { + buf := bytes.NewBuffer(b.B) + n, err := buf.ReadFrom(r) + b.B = buf.Bytes() + return n, err +} diff --git a/pkg/util/bufferpool/pool_test.go b/pkg/util/bufferpool/pool_test.go new file mode 100644 index 0000000000..5097915208 --- /dev/null +++ b/pkg/util/bufferpool/pool_test.go @@ -0,0 +1,22 @@ +package bufferpool + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func Test_returnPool(t *testing.T) { + assert.EqualValues(t, 0, returnPool(512, 0)) // Buffers can be added to the pool. + assert.EqualValues(t, 1, returnPool(513, 0)) + assert.EqualValues(t, 1, returnPool(1<<10, 0)) + assert.EqualValues(t, -1, returnPool(0, 0)) // Empty buffers are ignored. + assert.EqualValues(t, -1, returnPool(0, 10)) // + assert.EqualValues(t, 5, returnPool(1<<14, 0)) // New buffers are added to the appropriate pool. + assert.EqualValues(t, 5, returnPool(1<<14, 3)) // Buffer of a capacity exceeding the next power of two are relocated. + assert.EqualValues(t, 4, returnPool(1<<14, 4)) // Buffer of a capacity not exceeding the next power of two are retained. + assert.EqualValues(t, 5, returnPool(1<<14, 5)) // Buffer of the nominal capacity. + assert.EqualValues(t, 5, returnPool(1<<14, 6)) // Buffer of a smaller capacity must be relocated. + assert.EqualValues(t, 21, returnPool(1<<30, 13)) + assert.EqualValues(t, -1, returnPool(1<<30+1, 13)) // No pools for buffers larger than 4MB. +} diff --git a/pkg/util/refctr/refctr.go b/pkg/util/refctr/refctr.go index 7bfcba35ee..b3993354dd 100644 --- a/pkg/util/refctr/refctr.go +++ b/pkg/util/refctr/refctr.go @@ -3,8 +3,9 @@ package refctr import "sync" type Counter struct { - m sync.Mutex - c int + m sync.Mutex + c int + err error } // Inc increments the counter and calls the init function, @@ -30,15 +31,42 @@ func (r *Counter) Inc(init func() error) (err error) { return init() } +// IncErr is identical to Inc, with the only difference that if the +// function fails, the error is returned on any further IncErr call, +// preventing from calling the faulty initialization function again. +func (r *Counter) IncErr(init func() error) (err error) { + r.m.Lock() + if r.err != nil { + r.m.Unlock() + return r.err + } + defer func() { + // If initialization fails, we need to make sure + // the next call makes another attempt. + if err != nil { + r.err = err + r.c-- + } + r.m.Unlock() + }() + if r.c++; r.c > 1 { + return nil + } + // Mutex is acquired during the call in order to serialize + // access to the resources, so that the consequent callers + // only have access to them after initialization finishes. + return init() +} + // Dec decrements the counter and calls the release function, // if this is the last reference. func (r *Counter) Dec(release func()) { r.m.Lock() + defer r.m.Unlock() if r.c < 0 { panic("bug: negative reference counter") } if r.c--; r.c < 1 { release() } - r.m.Unlock() } From 60039fb40c98d4575d027578e7228187659444c9 Mon Sep 17 00:00:00 2001 From: Anton Kolesnikov Date: Tue, 23 Jul 2024 11:47:44 +0800 Subject: [PATCH 2/3] fix stream compaction --- pkg/frontend/frontend_meta.go | 7 ++- pkg/ingester/segment.go | 3 +- pkg/querybackend/block/compaction.go | 14 +++-- pkg/querybackend/block/compaction_test.go | 8 +++ pkg/querybackend/block/constants.go | 6 +- pkg/querybackend/block/object.go | 2 +- pkg/querybackend/block/section_profiles.go | 4 +- .../01J2VJQPYDC160REPAD2VN88XN/block.bin | Bin .../01J2VJQRGBK8YFWVV8K1MPRRWM/block.bin | Bin .../01J2VJQRTMSCY4VDYBP5N4N5JK/block.bin | Bin .../01J2VJQTJ3PGF7KB39ARR1BX3Y/block.bin | Bin .../01J2VJQV544TF571FDSK2H692P/block.bin | Bin .../01J2VJQX8DYHSEBK7BAQSCJBMG/block.bin | Bin .../01J2VJQYQVZTPZMMJKE7F2XC47/block.bin | Bin .../01J2VJQZPARDJQ779S1JMV0XQA/block.bin | Bin .../01J2VJR0R3NQS23SDADNA6XHCM/block.bin | Bin .../01J2VJR31PT3X4NDJC4Q2BHWQ1/block.bin | Bin pkg/querybackend/block/writer.go | 55 +++++++++++------- 18 files changed, 67 insertions(+), 32 deletions(-) rename pkg/querybackend/block/testdata/segments/1/{anon => anonymous}/01J2VJQPYDC160REPAD2VN88XN/block.bin (100%) rename pkg/querybackend/block/testdata/segments/1/{anon => anonymous}/01J2VJQRGBK8YFWVV8K1MPRRWM/block.bin (100%) rename pkg/querybackend/block/testdata/segments/1/{anon => anonymous}/01J2VJQRTMSCY4VDYBP5N4N5JK/block.bin (100%) rename pkg/querybackend/block/testdata/segments/1/{anon => anonymous}/01J2VJQTJ3PGF7KB39ARR1BX3Y/block.bin (100%) rename pkg/querybackend/block/testdata/segments/1/{anon => anonymous}/01J2VJQV544TF571FDSK2H692P/block.bin (100%) rename pkg/querybackend/block/testdata/segments/1/{anon => anonymous}/01J2VJQX8DYHSEBK7BAQSCJBMG/block.bin (100%) rename pkg/querybackend/block/testdata/segments/1/{anon => anonymous}/01J2VJQYQVZTPZMMJKE7F2XC47/block.bin (100%) rename pkg/querybackend/block/testdata/segments/1/{anon => anonymous}/01J2VJQZPARDJQ779S1JMV0XQA/block.bin (100%) rename pkg/querybackend/block/testdata/segments/1/{anon => anonymous}/01J2VJR0R3NQS23SDADNA6XHCM/block.bin (100%) rename pkg/querybackend/block/testdata/segments/1/{anon => anonymous}/01J2VJR31PT3X4NDJC4Q2BHWQ1/block.bin (100%) diff --git a/pkg/frontend/frontend_meta.go b/pkg/frontend/frontend_meta.go index df821fbd80..5471b77a28 100644 --- a/pkg/frontend/frontend_meta.go +++ b/pkg/frontend/frontend_meta.go @@ -22,7 +22,12 @@ func (f *Frontend) listMetadata( startTime, endTime int64, query string, ) ([]*metastorev1.BlockMeta, error) { - _ = level.Info(f.log).Log("msg", "listing metadata", "tenants", tenants, "start", startTime, "end", endTime, "query", query) + _ = level.Info(f.log).Log("msg", "listing metadata", + "tenants", strings.Join(tenants, ","), + "start", startTime, + "end", endTime, + "query", query, + ) resp, err := f.metastoreclient.ListBlocksForQuery(ctx, &metastorev1.ListBlocksForQueryRequest{ TenantId: tenants, StartTime: startTime, diff --git a/pkg/ingester/segment.go b/pkg/ingester/segment.go index d12ee86ea7..36f1bc7452 100644 --- a/pkg/ingester/segment.go +++ b/pkg/ingester/segment.go @@ -15,6 +15,7 @@ import ( "time" index2 "github.com/grafana/pyroscope/pkg/phlaredb/tsdb/loki/index" + "github.com/grafana/pyroscope/pkg/tenant" "github.com/go-kit/log" "github.com/go-kit/log/level" @@ -34,7 +35,7 @@ import ( ) const pathSegments = "segments" -const pathAnon = "anon" +const pathAnon = tenant.DefaultTenantID const pathBlock = "block.bin" type shardKey uint32 diff --git a/pkg/querybackend/block/compaction.go b/pkg/querybackend/block/compaction.go index 7bc92dcfc5..247143225f 100644 --- a/pkg/querybackend/block/compaction.go +++ b/pkg/querybackend/block/compaction.go @@ -52,12 +52,13 @@ func Compact( err = objects.Close() }() - compacted := make([]*metastorev1.BlockMeta, len(plan)) - for i, p := range plan { - compacted[i], err = p.Compact(ctx, storage) + compacted := make([]*metastorev1.BlockMeta, 0, len(plan)) + for _, p := range plan { + md, err := p.Compact(ctx, storage) if err != nil { - return compacted, err + return nil, err } + compacted = append(compacted, md) } return compacted, nil @@ -147,7 +148,7 @@ func (b *CompactionPlan) Estimate() { func (b *CompactionPlan) Compact(ctx context.Context, storage objstore.Bucket) (m *metastorev1.BlockMeta, err error) { dir := filepath.Join(os.TempDir(), "pyroscope-compactor", b.meta.Id) - w := NewBlockWriter(ctx, storage, ObjectPath(b.meta), dir) + w := NewBlockWriter(storage, ObjectPath(b.meta), dir) defer func() { err = multierror.New(err, w.Close()).Err() }() @@ -160,6 +161,9 @@ func (b *CompactionPlan) Compact(ctx context.Context, storage objstore.Bucket) ( } b.meta.TenantServices = append(b.meta.TenantServices, s.meta) } + if err = w.Flush(ctx); err != nil { + return nil, fmt.Errorf("flushing block writer: %w", err) + } b.meta.Size = w.Offset() return b.meta, nil } diff --git a/pkg/querybackend/block/compaction_test.go b/pkg/querybackend/block/compaction_test.go index 2304c937ec..20d20cfb1d 100644 --- a/pkg/querybackend/block/compaction_test.go +++ b/pkg/querybackend/block/compaction_test.go @@ -9,6 +9,7 @@ import ( "google.golang.org/protobuf/encoding/protojson" compactorv1 "github.com/grafana/pyroscope/api/gen/proto/go/compactor/v1" + metastorev1 "github.com/grafana/pyroscope/api/gen/proto/go/metastore/v1" "github.com/grafana/pyroscope/pkg/objstore/testutil" ) @@ -27,4 +28,11 @@ func Test_CompactBlocks(t *testing.T) { // TODO: Assertions. require.Len(t, compactedBlocks, 1) + metas := []*metastorev1.BlockMeta{ + compactedBlocks[0], + compactedBlocks[0].CloneVT(), + } + + compactedBlocks, err = Compact(ctx, metas, bucket) + require.NoError(t, err) } diff --git a/pkg/querybackend/block/constants.go b/pkg/querybackend/block/constants.go index 9d5b064f8a..bba95e2164 100644 --- a/pkg/querybackend/block/constants.go +++ b/pkg/querybackend/block/constants.go @@ -1,9 +1,13 @@ package block +import ( + "github.com/grafana/pyroscope/pkg/tenant" +) + const ( DirPathSegment = "segments/" DirPathBlock = "blocks/" - DirNameAnonTenant = "anon" + DirNameAnonTenant = tenant.DefaultTenantID FileNameProfilesParquet = "profiles.parquet" FileNameDataObject = "block.bin" diff --git a/pkg/querybackend/block/object.go b/pkg/querybackend/block/object.go index 474be569b0..8588c08f05 100644 --- a/pkg/querybackend/block/object.go +++ b/pkg/querybackend/block/object.go @@ -151,7 +151,7 @@ func (obj *Object) open(ctx context.Context) (err error) { }() obj.buf = bufferpool.GetBuffer(int(obj.meta.Size)) if err = objstore.ReadRange(ctx, obj.buf, obj.path, obj.storage, 0, int64(obj.meta.Size)); err != nil { - return fmt.Errorf("loading object into memory: %w", err) + return fmt.Errorf("loading object into memory %s: %w", obj.path, err) } return nil } diff --git a/pkg/querybackend/block/section_profiles.go b/pkg/querybackend/block/section_profiles.go index f563654348..1c8046fa9a 100644 --- a/pkg/querybackend/block/section_profiles.go +++ b/pkg/querybackend/block/section_profiles.go @@ -93,7 +93,7 @@ func openParquetFile( var ra io.ReaderAt ra = io.NewSectionReader(r, offset, size) - if footerSize > 0 { + /* if footerSize > 0 { buf := bufferpool.GetBuffer(int(footerSize)) defer func() { // Footer is not used after the file was opened. @@ -105,7 +105,7 @@ func openParquetFile( rf := newReaderWithFooter(ra, buf.B, size) defer rf.free() ra = rf - } + }*/ f, err := parquet.OpenFile(ra, size, options...) if err != nil { diff --git a/pkg/querybackend/block/testdata/segments/1/anon/01J2VJQPYDC160REPAD2VN88XN/block.bin b/pkg/querybackend/block/testdata/segments/1/anonymous/01J2VJQPYDC160REPAD2VN88XN/block.bin similarity index 100% rename from pkg/querybackend/block/testdata/segments/1/anon/01J2VJQPYDC160REPAD2VN88XN/block.bin rename to pkg/querybackend/block/testdata/segments/1/anonymous/01J2VJQPYDC160REPAD2VN88XN/block.bin diff --git a/pkg/querybackend/block/testdata/segments/1/anon/01J2VJQRGBK8YFWVV8K1MPRRWM/block.bin b/pkg/querybackend/block/testdata/segments/1/anonymous/01J2VJQRGBK8YFWVV8K1MPRRWM/block.bin similarity index 100% rename from pkg/querybackend/block/testdata/segments/1/anon/01J2VJQRGBK8YFWVV8K1MPRRWM/block.bin rename to pkg/querybackend/block/testdata/segments/1/anonymous/01J2VJQRGBK8YFWVV8K1MPRRWM/block.bin diff --git a/pkg/querybackend/block/testdata/segments/1/anon/01J2VJQRTMSCY4VDYBP5N4N5JK/block.bin b/pkg/querybackend/block/testdata/segments/1/anonymous/01J2VJQRTMSCY4VDYBP5N4N5JK/block.bin similarity index 100% rename from pkg/querybackend/block/testdata/segments/1/anon/01J2VJQRTMSCY4VDYBP5N4N5JK/block.bin rename to pkg/querybackend/block/testdata/segments/1/anonymous/01J2VJQRTMSCY4VDYBP5N4N5JK/block.bin diff --git a/pkg/querybackend/block/testdata/segments/1/anon/01J2VJQTJ3PGF7KB39ARR1BX3Y/block.bin b/pkg/querybackend/block/testdata/segments/1/anonymous/01J2VJQTJ3PGF7KB39ARR1BX3Y/block.bin similarity index 100% rename from pkg/querybackend/block/testdata/segments/1/anon/01J2VJQTJ3PGF7KB39ARR1BX3Y/block.bin rename to pkg/querybackend/block/testdata/segments/1/anonymous/01J2VJQTJ3PGF7KB39ARR1BX3Y/block.bin diff --git a/pkg/querybackend/block/testdata/segments/1/anon/01J2VJQV544TF571FDSK2H692P/block.bin b/pkg/querybackend/block/testdata/segments/1/anonymous/01J2VJQV544TF571FDSK2H692P/block.bin similarity index 100% rename from pkg/querybackend/block/testdata/segments/1/anon/01J2VJQV544TF571FDSK2H692P/block.bin rename to pkg/querybackend/block/testdata/segments/1/anonymous/01J2VJQV544TF571FDSK2H692P/block.bin diff --git a/pkg/querybackend/block/testdata/segments/1/anon/01J2VJQX8DYHSEBK7BAQSCJBMG/block.bin b/pkg/querybackend/block/testdata/segments/1/anonymous/01J2VJQX8DYHSEBK7BAQSCJBMG/block.bin similarity index 100% rename from pkg/querybackend/block/testdata/segments/1/anon/01J2VJQX8DYHSEBK7BAQSCJBMG/block.bin rename to pkg/querybackend/block/testdata/segments/1/anonymous/01J2VJQX8DYHSEBK7BAQSCJBMG/block.bin diff --git a/pkg/querybackend/block/testdata/segments/1/anon/01J2VJQYQVZTPZMMJKE7F2XC47/block.bin b/pkg/querybackend/block/testdata/segments/1/anonymous/01J2VJQYQVZTPZMMJKE7F2XC47/block.bin similarity index 100% rename from pkg/querybackend/block/testdata/segments/1/anon/01J2VJQYQVZTPZMMJKE7F2XC47/block.bin rename to pkg/querybackend/block/testdata/segments/1/anonymous/01J2VJQYQVZTPZMMJKE7F2XC47/block.bin diff --git a/pkg/querybackend/block/testdata/segments/1/anon/01J2VJQZPARDJQ779S1JMV0XQA/block.bin b/pkg/querybackend/block/testdata/segments/1/anonymous/01J2VJQZPARDJQ779S1JMV0XQA/block.bin similarity index 100% rename from pkg/querybackend/block/testdata/segments/1/anon/01J2VJQZPARDJQ779S1JMV0XQA/block.bin rename to pkg/querybackend/block/testdata/segments/1/anonymous/01J2VJQZPARDJQ779S1JMV0XQA/block.bin diff --git a/pkg/querybackend/block/testdata/segments/1/anon/01J2VJR0R3NQS23SDADNA6XHCM/block.bin b/pkg/querybackend/block/testdata/segments/1/anonymous/01J2VJR0R3NQS23SDADNA6XHCM/block.bin similarity index 100% rename from pkg/querybackend/block/testdata/segments/1/anon/01J2VJR0R3NQS23SDADNA6XHCM/block.bin rename to pkg/querybackend/block/testdata/segments/1/anonymous/01J2VJR0R3NQS23SDADNA6XHCM/block.bin diff --git a/pkg/querybackend/block/testdata/segments/1/anon/01J2VJR31PT3X4NDJC4Q2BHWQ1/block.bin b/pkg/querybackend/block/testdata/segments/1/anonymous/01J2VJR31PT3X4NDJC4Q2BHWQ1/block.bin similarity index 100% rename from pkg/querybackend/block/testdata/segments/1/anon/01J2VJR31PT3X4NDJC4Q2BHWQ1/block.bin rename to pkg/querybackend/block/testdata/segments/1/anonymous/01J2VJR31PT3X4NDJC4Q2BHWQ1/block.bin diff --git a/pkg/querybackend/block/writer.go b/pkg/querybackend/block/writer.go index 2fa893aa79..dcaf5e309b 100644 --- a/pkg/querybackend/block/writer.go +++ b/pkg/querybackend/block/writer.go @@ -19,33 +19,26 @@ import ( type Writer struct { storage objstore.Bucket + path string + local string + off uint64 + w *os.File tmp string n int cur string - buf *bufferpool.Buffer - off uint64 - r *io.PipeReader - w *io.PipeWriter - ctx context.Context - cancel context.CancelFunc - done chan struct{} + buf *bufferpool.Buffer } -func NewBlockWriter(ctx context.Context, storage objstore.Bucket, path string, tmp string) *Writer { +func NewBlockWriter(storage objstore.Bucket, path string, tmp string) *Writer { b := &Writer{ storage: storage, + path: path, tmp: tmp, - done: make(chan struct{}), + local: filepath.Join(tmp, FileNameDataObject), buf: bufferpool.GetBuffer(compactionCopyBufferSize), } - b.r, b.w = io.Pipe() - b.ctx, b.cancel = context.WithCancel(ctx) - go func() { - defer close(b.done) - _ = b.w.CloseWithError(storage.Upload(b.ctx, path, b.r)) - }() return b } @@ -69,7 +62,12 @@ func (b *Writer) ReadFromFiles(files ...string) (toc []uint64, err error) { } // ReadFromFile located in the directory Dir. -func (b *Writer) ReadFromFile(file string) error { +func (b *Writer) ReadFromFile(file string) (err error) { + if b.w == nil { + if b.w, err = os.Create(b.local); err != nil { + return err + } + } f, err := os.Open(filepath.Join(b.cur, file)) if err != nil { return err @@ -85,10 +83,25 @@ func (b *Writer) ReadFromFile(file string) error { func (b *Writer) Offset() uint64 { return b.off } +func (b *Writer) Flush(ctx context.Context) error { + if err := b.w.Close(); err != nil { + return err + } + b.w = nil + f, err := os.Open(b.local) + if err != nil { + return err + } + defer func() { + _ = f.Close() + }() + return b.storage.Upload(ctx, b.path, f) +} + func (b *Writer) Close() error { - _ = b.r.Close() - b.cancel() - <-b.done - // b.w is closed before close(d.done). - return os.RemoveAll(b.tmp) + bufferpool.Put(b.buf) + if b.w != nil { + return b.w.Close() + } + return nil } From 3c4b4f2f3eb490c991680c93564102cefbe5c869 Mon Sep 17 00:00:00 2001 From: Anton Kolesnikov Date: Tue, 23 Jul 2024 18:17:39 +0800 Subject: [PATCH 3/3] fix parquet footer optimization --- .../helm/pyroscope/rendered/micro-services.yaml | 16 ++++++++-------- .../helm/pyroscope/values-micro-services.yaml | 12 ++++++------ .../pyroscope/jsonnet/values-micro-services.json | 12 ++++++------ pkg/querybackend/block/section_profiles.go | 12 +++++++----- 4 files changed, 27 insertions(+), 25 deletions(-) diff --git a/operations/pyroscope/helm/pyroscope/rendered/micro-services.yaml b/operations/pyroscope/helm/pyroscope/rendered/micro-services.yaml index e584fd1e96..3ef187bec4 100644 --- a/operations/pyroscope/helm/pyroscope/rendered/micro-services.yaml +++ b/operations/pyroscope/helm/pyroscope/rendered/micro-services.yaml @@ -2303,7 +2303,7 @@ metadata: app.kubernetes.io/managed-by: Helm app.kubernetes.io/component: "compaction-worker" spec: - replicas: 32 + replicas: 4 selector: matchLabels: app.kubernetes.io/name: pyroscope @@ -2347,14 +2347,14 @@ spec: - "-metastore.raft.advertise-address=$(POD_NAME).pyroscope-dev-metastore-headless.default.svc.cluster.local.:9099" - "-metastore.raft.server-id=$(POD_NAME).pyroscope-dev-metastore-headless.default.svc.cluster.local.:9099" - "-metastore.raft.bootstrap-peers=dnssrvnoa+_raft._tcp.pyroscope-dev-metastore-headless.default.svc.cluster.local.:9099" - - "-metastore.raft.bootstrap-expect-peers=32" + - "-metastore.raft.bootstrap-expect-peers=4" - "-metastore.address=dns:///_grpc._tcp.pyroscope-dev-metastore-headless.default.svc.cluster.local.:9095" - "-query-backend.address=dns:///pyroscope-dev-query-worker-headless.default.svc.cluster.local.:9095" - "-config.file=/etc/pyroscope/config.yaml" - "-runtime-config.file=/etc/pyroscope/overrides/overrides.yaml" - "-ingester.num-tokens=4" - "-distributor.replication-factor=1" - - "-compaction-worker.job-capacity=40" + - "-compaction-worker.job-capacity=5" - "-log.level=debug" - "-store-gateway.sharding-ring.replication-factor=3" env: @@ -2388,7 +2388,7 @@ spec: subPath: compactor resources: limits: - memory: 16Gi + memory: 8Gi requests: cpu: 4 memory: 8Gi @@ -2848,7 +2848,7 @@ metadata: app.kubernetes.io/managed-by: Helm app.kubernetes.io/component: "query-worker" spec: - replicas: 3 + replicas: 8 selector: matchLabels: app.kubernetes.io/name: pyroscope @@ -2892,7 +2892,7 @@ spec: - "-metastore.raft.advertise-address=$(POD_NAME).pyroscope-dev-metastore-headless.default.svc.cluster.local.:9099" - "-metastore.raft.server-id=$(POD_NAME).pyroscope-dev-metastore-headless.default.svc.cluster.local.:9099" - "-metastore.raft.bootstrap-peers=dnssrvnoa+_raft._tcp.pyroscope-dev-metastore-headless.default.svc.cluster.local.:9099" - - "-metastore.raft.bootstrap-expect-peers=3" + - "-metastore.raft.bootstrap-expect-peers=8" - "-metastore.address=dns:///_grpc._tcp.pyroscope-dev-metastore-headless.default.svc.cluster.local.:9095" - "-query-backend.address=dns:///pyroscope-dev-query-worker-headless.default.svc.cluster.local.:9095" - "-config.file=/etc/pyroscope/config.yaml" @@ -2928,9 +2928,9 @@ spec: mountPath: /data resources: limits: - memory: 16Gi + memory: 8Gi requests: - cpu: 4 + cpu: 2 memory: 8Gi volumes: - name: config diff --git a/operations/pyroscope/helm/pyroscope/values-micro-services.yaml b/operations/pyroscope/helm/pyroscope/values-micro-services.yaml index bfe57020ea..b6aa83896f 100644 --- a/operations/pyroscope/helm/pyroscope/values-micro-services.yaml +++ b/operations/pyroscope/helm/pyroscope/values-micro-services.yaml @@ -49,21 +49,21 @@ pyroscope: cpu: 100m query-worker: kind: Deployment - replicaCount: 3 + replicaCount: 8 resources: limits: - memory: 16Gi + memory: 8Gi requests: memory: 8Gi - cpu: 4 + cpu: 2 compaction-worker: extraArgs: - compaction-worker.job-capacity: 40 + compaction-worker.job-capacity: 5 kind: Deployment - replicaCount: 32 + replicaCount: 4 resources: limits: - memory: 16Gi + memory: 8Gi requests: memory: 8Gi cpu: 4 diff --git a/operations/pyroscope/jsonnet/values-micro-services.json b/operations/pyroscope/jsonnet/values-micro-services.json index 740a4a06bd..f1c40523bc 100644 --- a/operations/pyroscope/jsonnet/values-micro-services.json +++ b/operations/pyroscope/jsonnet/values-micro-services.json @@ -6,13 +6,13 @@ "components": { "compaction-worker": { "extraArgs": { - "compaction-worker.job-capacity": 40 + "compaction-worker.job-capacity": 5 }, "kind": "Deployment", - "replicaCount": 32, + "replicaCount": 4, "resources": { "limits": { - "memory": "16Gi" + "memory": "8Gi" }, "requests": { "cpu": 4, @@ -123,13 +123,13 @@ }, "query-worker": { "kind": "Deployment", - "replicaCount": 3, + "replicaCount": 8, "resources": { "limits": { - "memory": "16Gi" + "memory": "8Gi" }, "requests": { - "cpu": 4, + "cpu": 2, "memory": "8Gi" } } diff --git a/pkg/querybackend/block/section_profiles.go b/pkg/querybackend/block/section_profiles.go index 1c8046fa9a..fcb109e7f7 100644 --- a/pkg/querybackend/block/section_profiles.go +++ b/pkg/querybackend/block/section_profiles.go @@ -93,7 +93,7 @@ func openParquetFile( var ra io.ReaderAt ra = io.NewSectionReader(r, offset, size) - /* if footerSize > 0 { + if footerSize > 0 { buf := bufferpool.GetBuffer(int(footerSize)) defer func() { // Footer is not used after the file was opened. @@ -105,7 +105,7 @@ func openParquetFile( rf := newReaderWithFooter(ra, buf.B, size) defer rf.free() ra = rf - }*/ + } f, err := parquet.OpenFile(ra, size, options...) if err != nil { @@ -122,18 +122,20 @@ func (f *ParquetFile) RowReader() *parquet.Reader { } func (f *ParquetFile) fetchFooter(ctx context.Context, buf *bufferpool.Buffer, estimatedSize int64) error { - // Fetch the footer of estimated size. - if err := objstore.ReadRange(ctx, buf, f.path, f.storage, f.off+f.size-estimatedSize, estimatedSize); err != nil { + // Fetch the footer of estimated size at the estimated offset. + estimatedOffset := f.off + f.size - estimatedSize + if err := objstore.ReadRange(ctx, buf, f.path, f.storage, estimatedOffset, estimatedSize); err != nil { return err } // Footer size is an uint32 located at size-8. - sb := buf.B[f.size-8 : f.size-4] + sb := buf.B[len(buf.B)-8 : len(buf.B)-4] s := int64(binary.LittleEndian.Uint32(sb)) s += 8 // Include the footer size itself and the magic signature. if estimatedSize >= s { // The footer has been fetched. return nil } + // Fetch footer to buf for sure. return objstore.ReadRange(ctx, buf, f.path, f.storage, f.off+f.size-s, s) }