From 5c32ed52adb10aa2c158616a74b4936ccc308f70 Mon Sep 17 00:00:00 2001 From: Aleksei Semiglazov Date: Thu, 28 Mar 2019 00:14:42 +0000 Subject: [PATCH] store+compactor: process index cache during compaction Add few steps during compaction: 1. Generate index cache for old blocks made by compactor until this version. 2. Generate index cache during group compaction. 3. Generate index cache during downsampling. 4. Add index cache version to cache file. Store downloads index cache files from object store or generate on the fly if they don't exist. Signed-off-by: Aleksei Semiglazov --- CHANGELOG.md | 2 + cmd/thanos/compact.go | 133 ++++++++++++++++++ pkg/block/block.go | 8 ++ pkg/block/index.go | 22 +-- pkg/block/metadata/meta.go | 2 +- pkg/compact/compact.go | 24 +++- .../downsample/streamed_block_writer.go | 14 ++ pkg/objstore/objstore.go | 23 ++- pkg/store/bucket.go | 45 ++++-- 9 files changed, 242 insertions(+), 31 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0f390cc7cb..41d6a97a50 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,7 @@ New options: * `--store.grpc.series-sample-limit` limits the amount of samples that might be retrieved on a single Series() call. By default it is 0. Consider enabling it by setting it to more than 0 if you are running on limited resources. * `--store.grpc.series-max-concurrency` limits the number of concurrent Series() calls in Thanos Store. By default it is 20. Considering making it lower or bigger depending on the scale of your deployment. +* `--index.generate-missing-cache-file` if enabled, on startup compactor runs an on-off job that scans all the blocks to find all blocks with missing index cache file. It generates those if needed and upload. By default is disabled. Check logs on existence the line `generating index cache files is done`, then you can disable this flag. New metrics: * `thanos_bucket_store_queries_dropped_total` shows how many queries were dropped due to the samples limit; @@ -35,6 +36,7 @@ New tracing span: - [#970](https://github.com/improbable-eng/thanos/pull/970) Added `PartialResponseStrategy` field for `RuleGroups` for `Ruler`. - [#1016](https://github.com/improbable-eng/thanos/pull/1016) Added option for another DNS resolver (miekg/dns client). This to have SRV resolution working on [Golang 1.11+ with KubeDNS below v1.14](https://github.com/golang/go/issues/27546) +- [#986](https://github.com/improbable-eng/thanos/pull/986) Store index cache files in object storage, reduces store start-up time by skipping the generating the index cache for all blocks and only do this for recently created uncompacted blocks. ### Changed - [#970](https://github.com/improbable-eng/thanos/pull/970) Deprecated partial_response_disabled proto field. Added partial_response_strategy instead. Both in gRPC and Query API. diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index a7f3fff550..89cf8cb07a 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -2,17 +2,22 @@ package main import ( "context" + "encoding/json" "fmt" "os" "path" + "path/filepath" "strconv" "strings" "time" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" + "github.com/improbable-eng/thanos/pkg/block" + "github.com/improbable-eng/thanos/pkg/block/metadata" "github.com/improbable-eng/thanos/pkg/compact" "github.com/improbable-eng/thanos/pkg/compact/downsample" + "github.com/improbable-eng/thanos/pkg/objstore" "github.com/improbable-eng/thanos/pkg/objstore/client" "github.com/improbable-eng/thanos/pkg/runutil" "github.com/oklog/run" @@ -87,6 +92,9 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri wait := cmd.Flag("wait", "Do not exit after all compactions have been processed and wait for new work."). Short('w').Bool() + generateMissingIndexCacheFiles := cmd.Flag("index.generate-missing-cache-file", "If enabled, on startup compactor runs an on-off job that scans all the blocks to find all blocks with missing index cache file. It generates those if needed and upload."). + Hidden().Default("false").Bool() + // TODO(bplotka): Remove this flag once https://github.com/improbable-eng/thanos/issues/297 is fixed. disableDownsampling := cmd.Flag("debug.disable-downsampling", "Disables downsampling. This is not recommended "+ "as querying long time ranges without non-downsampled data is not efficient and not useful (is not possible to render all for human eye)."). @@ -110,6 +118,7 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri *haltOnError, *acceptMalformedIndex, *wait, + *generateMissingIndexCacheFiles, map[compact.ResolutionLevel]time.Duration{ compact.ResolutionLevelRaw: time.Duration(*retentionRaw), compact.ResolutionLevel5m: time.Duration(*retention5m), @@ -135,6 +144,7 @@ func runCompact( haltOnError bool, acceptMalformedIndex bool, wait bool, + generateMissingIndexCacheFiles bool, retentionByResolution map[compact.ResolutionLevel]time.Duration, component string, disableDownsampling bool, @@ -197,6 +207,7 @@ func runCompact( var ( compactDir = path.Join(dataDir, "compact") downsamplingDir = path.Join(dataDir, "downsample") + indexCacheDir = path.Join(dataDir, "index_cache") ) if err := os.RemoveAll(downsamplingDir); err != nil { @@ -255,6 +266,13 @@ func runCompact( g.Add(func() error { defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client") + // Generate index file. + if generateMissingIndexCacheFiles { + if err := genMissingIndexCacheFiles(ctx, logger, bkt, indexCacheDir); err != nil { + return err + } + } + if !wait { return f() } @@ -300,3 +318,118 @@ func runCompact( level.Info(logger).Log("msg", "starting compact node") return nil } + +// genMissingIndexCacheFiles scans over all blocks, generates missing index cache files and uploads them to object storage. +func genMissingIndexCacheFiles(ctx context.Context, logger log.Logger, bkt objstore.Bucket, dir string) error { + if err := os.RemoveAll(dir); err != nil { + return errors.Wrap(err, "clean index cache directory") + } + if err := os.MkdirAll(dir, 0777); err != nil { + return errors.Wrap(err, "create dir") + } + + defer func() { + if err := os.RemoveAll(dir); err != nil { + level.Error(logger).Log("msg", "failed to remove index cache directory", "path", dir, "err", err) + } + }() + + level.Info(logger).Log("msg", "start index cache processing") + + var metas []*metadata.Meta + + if err := bkt.Iter(ctx, "", func(name string) error { + id, ok := block.IsBlockDir(name) + if !ok { + return nil + } + + rc, err := bkt.Get(ctx, path.Join(id.String(), block.MetaFilename)) + if err != nil { + // Probably not finished block, skip it. + if bkt.IsObjNotFoundErr(err) { + level.Warn(logger).Log("msg", "meta file wasn't found", "block", id.String()) + return nil + } + return errors.Wrapf(err, "get meta for block %s", id) + } + defer runutil.CloseWithLogOnErr(logger, rc, "block reader") + + var meta metadata.Meta + if err := json.NewDecoder(rc).Decode(&meta); err != nil { + return errors.Wrap(err, "decode meta") + } + + // New version of compactor pushes index cache along with data block. + // Skip uncompacted blocks. + if meta.Compaction.Level == 1 { + return nil + } + + metas = append(metas, &meta) + + return nil + }); err != nil { + return errors.Wrap(err, "retrieve bucket block metas") + } + + for _, meta := range metas { + if err := generateIndexCacheFile(ctx, bkt, logger, dir, meta); err != nil { + return err + } + } + + level.Info(logger).Log("msg", "generating index cache files is done, you can remove startup argument `index.generate-missing-cache-file`") + return nil +} + +func generateIndexCacheFile( + ctx context.Context, + bkt objstore.Bucket, + logger log.Logger, + indexCacheDir string, + meta *metadata.Meta, +) error { + id := meta.ULID + + bdir := filepath.Join(indexCacheDir, id.String()) + if err := os.MkdirAll(bdir, 0777); err != nil { + return errors.Wrap(err, "create block dir") + } + + defer func() { + if err := os.RemoveAll(bdir); err != nil { + level.Error(logger).Log("msg", "failed to remove index cache directory", "path", bdir, "err", err) + } + }() + + cachePath := filepath.Join(bdir, block.IndexCacheFilename) + cache := path.Join(meta.ULID.String(), block.IndexCacheFilename) + + ok, err := objstore.Exists(ctx, bkt, cache) + if ok { + return nil + } + if err != nil { + return errors.Wrapf(err, "attempt to check if a cached index file exists") + } + + level.Debug(logger).Log("msg", "make index cache", "block", id) + + // Try to download index file from obj store. + indexPath := filepath.Join(bdir, block.IndexFilename) + index := path.Join(id.String(), block.IndexFilename) + + if err := objstore.DownloadFile(ctx, logger, bkt, index, indexPath); err != nil { + return errors.Wrap(err, "download index file") + } + + if err := block.WriteIndexCache(logger, indexPath, cachePath); err != nil { + return errors.Wrap(err, "write index cache") + } + + if err := objstore.UploadFile(ctx, logger, bkt, cachePath, cache); err != nil { + return errors.Wrap(err, "upload index cache") + } + return nil +} diff --git a/pkg/block/block.go b/pkg/block/block.go index 008e1798d4..5c1033a7d8 100644 --- a/pkg/block/block.go +++ b/pkg/block/block.go @@ -25,6 +25,8 @@ const ( MetaFilename = "meta.json" // IndexFilename is the known index file for block index. IndexFilename = "index" + // IndexCacheFilename is the canonical name for index cache file that stores essential information needed. + IndexCacheFilename = "index.cache.json" // ChunksDirname is the known dir name for chunks with compressed samples. ChunksDirname = "chunks" @@ -93,6 +95,12 @@ func Upload(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bdir st return cleanUp(bkt, id, errors.Wrap(err, "upload index")) } + if meta.Thanos.Source == metadata.CompactorSource { + if err := objstore.UploadFile(ctx, logger, bkt, path.Join(bdir, IndexCacheFilename), path.Join(id.String(), IndexCacheFilename)); err != nil { + return cleanUp(bkt, id, errors.Wrap(err, "upload index cache")) + } + } + // Meta.json always need to be uploaded as a last item. This will allow to assume block directories without meta file // to be pending uploads. if err := objstore.UploadFile(ctx, logger, bkt, path.Join(bdir, MetaFilename), path.Join(id.String(), MetaFilename)); err != nil { diff --git a/pkg/block/index.go b/pkg/block/index.go index 777654f2e6..41294e76f2 100644 --- a/pkg/block/index.go +++ b/pkg/block/index.go @@ -26,8 +26,10 @@ import ( "github.com/prometheus/tsdb/labels" ) -// IndexCacheFilename is the canonical name for index cache files. -const IndexCacheFilename = "index.cache.json" +const ( + // IndexCacheVersion is a enumeration of index cache versions supported by Thanos. + IndexCacheVersion1 = iota + 1 +) type postingsRange struct { Name, Value string @@ -35,10 +37,11 @@ type postingsRange struct { } type indexCache struct { - Version int - Symbols map[uint32]string - LabelValues map[string][]string - Postings []postingsRange + Version int + CacheVersion int + Symbols map[uint32]string + LabelValues map[string][]string + Postings []postingsRange } type realByteSlice []byte @@ -112,9 +115,10 @@ func WriteIndexCache(logger log.Logger, indexFn string, fn string) error { defer runutil.CloseWithLogOnErr(logger, f, "index cache writer") v := indexCache{ - Version: indexr.Version(), - Symbols: symbols, - LabelValues: map[string][]string{}, + Version: indexr.Version(), + CacheVersion: IndexCacheVersion1, + Symbols: symbols, + LabelValues: map[string][]string{}, } // Extract label value indices. diff --git a/pkg/block/metadata/meta.go b/pkg/block/metadata/meta.go index 0d8b22dd19..765c3533db 100644 --- a/pkg/block/metadata/meta.go +++ b/pkg/block/metadata/meta.go @@ -36,7 +36,7 @@ const ( ) const ( - // MetaVersion is a enumeration of versions supported by Thanos. + // MetaVersion is a enumeration of meta versions supported by Thanos. MetaVersion1 = iota + 1 ) diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index 2de928ab6a..9b820f3500 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -3,6 +3,7 @@ package compact import ( "context" "fmt" + "io/ioutil" "os" "path/filepath" "sort" @@ -11,8 +12,6 @@ import ( "github.com/improbable-eng/thanos/pkg/block/metadata" - "io/ioutil" - "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/improbable-eng/thanos/pkg/block" @@ -61,6 +60,9 @@ type syncerMetrics struct { garbageCollectionDuration prometheus.Histogram compactions *prometheus.CounterVec compactionFailures *prometheus.CounterVec + indexCacheBlocks prometheus.Counter + indexCacheTraverse prometheus.Counter + indexCacheFailures prometheus.Counter } func newSyncerMetrics(reg prometheus.Registerer) *syncerMetrics { @@ -535,7 +537,6 @@ func (cg *Group) Compact(ctx context.Context, dir string, comp tsdb.Compactor) ( cg.compactionFailures.Inc() } cg.compactions.Inc() - return shouldRerun, compID, err } @@ -813,6 +814,8 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) ( "blocks", fmt.Sprintf("%v", plan), "duration", time.Since(begin)) bdir := filepath.Join(dir, compID.String()) + index := filepath.Join(bdir, block.IndexFilename) + indexCache := filepath.Join(bdir, block.IndexCacheFilename) newMeta, err := metadata.InjectThanos(cg.logger, bdir, metadata.Thanos{ Labels: cg.labels.Map(), @@ -828,7 +831,7 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) ( } // Ensure the output block is valid. - if err := block.VerifyIndex(cg.logger, filepath.Join(bdir, block.IndexFilename), newMeta.MinTime, newMeta.MaxTime); !cg.acceptMalformedIndex && err != nil { + if err := block.VerifyIndex(cg.logger, index, newMeta.MinTime, newMeta.MaxTime); !cg.acceptMalformedIndex && err != nil { return false, ulid.ULID{}, halt(errors.Wrapf(err, "invalid result block %s", bdir)) } @@ -837,6 +840,10 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) ( return false, ulid.ULID{}, halt(errors.Wrapf(err, "resulted compacted block %s overlaps with something", bdir)) } + if err := block.WriteIndexCache(cg.logger, index, indexCache); err != nil { + return false, ulid.ULID{}, errors.Wrap(err, "write index cache") + } + begin = time.Now() if err := block.Upload(ctx, cg.logger, cg.bkt, bdir); err != nil { @@ -888,7 +895,14 @@ type BucketCompactor struct { } // NewBucketCompactor creates a new bucket compactor. -func NewBucketCompactor(logger log.Logger, sy *Syncer, comp tsdb.Compactor, compactDir string, bkt objstore.Bucket, concurrency int) (*BucketCompactor, error) { +func NewBucketCompactor( + logger log.Logger, + sy *Syncer, + comp tsdb.Compactor, + compactDir string, + bkt objstore.Bucket, + concurrency int, +) (*BucketCompactor, error) { if concurrency <= 0 { return nil, errors.New("invalid concurrency level (%d), concurrency level must be > 0") } diff --git a/pkg/compact/downsample/streamed_block_writer.go b/pkg/compact/downsample/streamed_block_writer.go index 2e2921ac34..16213c0cd0 100644 --- a/pkg/compact/downsample/streamed_block_writer.go +++ b/pkg/compact/downsample/streamed_block_writer.go @@ -195,6 +195,10 @@ func (w *streamedBlockWriter) finalize() error { return errors.Wrap(err, "write mem postings") } + if err := w.writeIndexCache(); err != nil { + return errors.Wrap(err, "write index cache") + } + if err := w.writeMetaFile(); err != nil { return errors.Wrap(err, "write meta meta") } @@ -253,6 +257,16 @@ func (w *streamedBlockWriter) writeMemPostings() error { return nil } +func (w *streamedBlockWriter) writeIndexCache() error { + indexFile := filepath.Join(w.blockDir, block.IndexFilename) + indexCacheFile := filepath.Join(w.blockDir, block.IndexCacheFilename) + if err := block.WriteIndexCache(w.logger, indexFile, indexCacheFile); err != nil { + return errors.Wrap(err, "write index cache") + } + + return nil +} + // writeMetaFile writes meta file. func (w *streamedBlockWriter) writeMetaFile() error { w.meta.Version = metadata.MetaVersion1 diff --git a/pkg/objstore/objstore.go b/pkg/objstore/objstore.go index 4b241e972a..81d52d7d45 100644 --- a/pkg/objstore/objstore.go +++ b/pkg/objstore/objstore.go @@ -106,7 +106,7 @@ func DeleteDir(ctx context.Context, bkt Bucket, dir string) error { // DownloadFile downloads the src file from the bucket to dst. If dst is an existing // directory, a file with the same name as the source is created in dst. // If destination file is already existing, download file will overwrite it. -func DownloadFile(ctx context.Context, logger log.Logger, bkt BucketReader, src, dst string) error { +func DownloadFile(ctx context.Context, logger log.Logger, bkt BucketReader, src, dst string) (err error) { if fi, err := os.Stat(dst); err == nil { if fi.IsDir() { dst = filepath.Join(dst, filepath.Base(src)) @@ -125,8 +125,6 @@ func DownloadFile(ctx context.Context, logger log.Logger, bkt BucketReader, src, if err != nil { return errors.Wrap(err, "create file") } - defer runutil.CloseWithLogOnErr(logger, f, "download block's output file") - defer func() { if err != nil { if rerr := os.Remove(dst); rerr != nil { @@ -134,6 +132,8 @@ func DownloadFile(ctx context.Context, logger log.Logger, bkt BucketReader, src, } } }() + defer runutil.CloseWithLogOnErr(logger, f, "download block's output file") + if _, err = io.Copy(f, rc); err != nil { return errors.Wrap(err, "copy object to file") } @@ -170,6 +170,23 @@ func DownloadDir(ctx context.Context, logger log.Logger, bkt BucketReader, src, return nil } +// Exists returns true, if file exists, otherwise false and nil error if presence IsObjNotFoundErr, otherwise false with +// returning error. +func Exists(ctx context.Context, bkt Bucket, src string) (bool, error) { + rc, err := bkt.Get(ctx, src) + if rc != nil { + _ = rc.Close() + } + if err != nil { + if bkt.IsObjNotFoundErr(err) { + return false, nil + } + return false, errors.Wrap(err, "stat object") + } + + return true, nil +} + // BucketWithMetrics takes a bucket and registers metrics with the given registry for // operations run against the bucket. func BucketWithMetrics(name string, b Bucket, r prometheus.Registerer) Bucket { diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index d2070229f1..8e6c3533c5 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -1033,10 +1033,11 @@ type bucketBlock struct { indexVersion int symbols map[uint32]string + symbolsV2 map[string]struct{} lvals map[string][]string postings map[labels.Label]index.Range - indexObj string + id ulid.ULID chunkObjs []string pendingReaders sync.WaitGroup @@ -1057,7 +1058,7 @@ func newBucketBlock( b = &bucketBlock{ logger: logger, bucket: bkt, - indexObj: path.Join(id.String(), block.IndexFilename), + id: id, indexCache: indexCache, chunkPool: chunkPool, dir: dir, @@ -1080,6 +1081,14 @@ func newBucketBlock( return b, nil } +func (b *bucketBlock) indexFilename() string { + return path.Join(b.id.String(), block.IndexFilename) +} + +func (b *bucketBlock) indexCacheFilename() string { + return path.Join(b.id.String(), block.IndexCacheFilename) +} + func (b *bucketBlock) loadMeta(ctx context.Context, id ulid.ULID) error { // If we haven't seen the block before download the meta.json file. if _, err := os.Stat(b.dir); os.IsNotExist(err) { @@ -1104,20 +1113,29 @@ func (b *bucketBlock) loadMeta(ctx context.Context, id ulid.ULID) error { func (b *bucketBlock) loadIndexCache(ctx context.Context) (err error) { cachefn := filepath.Join(b.dir, block.IndexCacheFilename) - - b.indexVersion, b.symbols, b.lvals, b.postings, err = block.ReadIndexCache(b.logger, cachefn) - if err == nil { + if err = b.loadIndexCacheFromFile(ctx, cachefn); err == nil { return nil } if !os.IsNotExist(errors.Cause(err)) { return errors.Wrap(err, "read index cache") } - // No cache exists is on disk yet, build it from the downloaded index and retry. + + // Try to download index cache file from object store. + if err = objstore.DownloadFile(ctx, b.logger, b.bucket, b.indexCacheFilename(), cachefn); err == nil { + return b.loadIndexCacheFromFile(ctx, cachefn) + } + + if !b.bucket.IsObjNotFoundErr(errors.Cause(err)) { + return errors.Wrap(err, "download index cache file") + } + + // No cache exists on disk yet, build it from the downloaded index and retry. fn := filepath.Join(b.dir, block.IndexFilename) - if err := objstore.DownloadFile(ctx, b.logger, b.bucket, b.indexObj, fn); err != nil { + if err := objstore.DownloadFile(ctx, b.logger, b.bucket, b.indexFilename(), fn); err != nil { return errors.Wrap(err, "download index file") } + defer func() { if rerr := os.Remove(fn); rerr != nil { level.Error(b.logger).Log("msg", "failed to remove temp index file", "path", fn, "err", rerr) @@ -1128,15 +1146,16 @@ func (b *bucketBlock) loadIndexCache(ctx context.Context) (err error) { return errors.Wrap(err, "write index cache") } - b.indexVersion, b.symbols, b.lvals, b.postings, err = block.ReadIndexCache(b.logger, cachefn) - if err != nil { - return errors.Wrap(err, "read index cache") - } - return nil + return errors.Wrap(b.loadIndexCacheFromFile(ctx, cachefn), "read index cache") +} + +func (b *bucketBlock) loadIndexCacheFromFile(ctx context.Context, cache string) (err error) { + b.indexVersion, b.symbols, b.lvals, b.postings, err = block.ReadIndexCache(b.logger, cache) + return err } func (b *bucketBlock) readIndexRange(ctx context.Context, off, length int64) ([]byte, error) { - r, err := b.bucket.GetRange(ctx, b.indexObj, off, length) + r, err := b.bucket.GetRange(ctx, b.indexFilename(), off, length) if err != nil { return nil, errors.Wrap(err, "get range reader") }