From 7466e9091a600fa8029e3dea35fca5a8cc4e03c0 Mon Sep 17 00:00:00 2001 From: Aleksei Semiglazov Date: Thu, 28 Mar 2019 00:14:42 +0000 Subject: [PATCH] store+compactor: process index cache during compaction Add few steps during compaction: 1. Generate index cache for old blocks made by compactor until this version. 2. Generate index cache during group compaction. 3. Generate index cache during downsampling. 4. Add index cache version to cache file. Store downloads index cache files from object store or generate on the fly if they don't exist. Signed-off-by: Aleksei Semiglazov --- cmd/thanos/compact.go | 136 ++++++++++++++++++ pkg/block/block.go | 8 ++ pkg/block/index.go | 22 +-- pkg/block/metadata/meta.go | 2 +- pkg/compact/compact.go | 24 +++- .../downsample/streamed_block_writer.go | 14 ++ pkg/objstore/objstore.go | 23 ++- pkg/store/bucket.go | 45 ++++-- 8 files changed, 243 insertions(+), 31 deletions(-) diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index a7f3fff550e..022fc0af697 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -2,17 +2,22 @@ package main import ( "context" + "encoding/json" "fmt" "os" "path" + "path/filepath" "strconv" "strings" "time" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" + "github.com/improbable-eng/thanos/pkg/block" + "github.com/improbable-eng/thanos/pkg/block/metadata" "github.com/improbable-eng/thanos/pkg/compact" "github.com/improbable-eng/thanos/pkg/compact/downsample" + "github.com/improbable-eng/thanos/pkg/objstore" "github.com/improbable-eng/thanos/pkg/objstore/client" "github.com/improbable-eng/thanos/pkg/runutil" "github.com/oklog/run" @@ -87,6 +92,9 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri wait := cmd.Flag("wait", "Do not exit after all compactions have been processed and wait for new work."). Short('w').Bool() + generateMissingIndexCacheFiles := cmd.Flag("index.generate-missing-cache-file", "Process indices' cache, upload them to object store and update metas."). + Hidden().Default("false").Bool() + // TODO(bplotka): Remove this flag once https://github.com/improbable-eng/thanos/issues/297 is fixed. disableDownsampling := cmd.Flag("debug.disable-downsampling", "Disables downsampling. This is not recommended "+ "as querying long time ranges without non-downsampled data is not efficient and not useful (is not possible to render all for human eye)."). @@ -110,6 +118,7 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri *haltOnError, *acceptMalformedIndex, *wait, + *generateMissingIndexCacheFiles, map[compact.ResolutionLevel]time.Duration{ compact.ResolutionLevelRaw: time.Duration(*retentionRaw), compact.ResolutionLevel5m: time.Duration(*retention5m), @@ -135,6 +144,7 @@ func runCompact( haltOnError bool, acceptMalformedIndex bool, wait bool, + generateMissingIndexCacheFiles bool, retentionByResolution map[compact.ResolutionLevel]time.Duration, component string, disableDownsampling bool, @@ -197,6 +207,7 @@ func runCompact( var ( compactDir = path.Join(dataDir, "compact") downsamplingDir = path.Join(dataDir, "downsample") + indexCacheDir = path.Join(dataDir, "index_cache") ) if err := os.RemoveAll(downsamplingDir); err != nil { @@ -255,6 +266,13 @@ func runCompact( g.Add(func() error { defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client") + // Generate index file + if generateMissingIndexCacheFiles { + if err := genMissingIndexCacheFiles(ctx, logger, bkt, indexCacheDir); err != nil { + return err + } + } + if !wait { return f() } @@ -300,3 +318,121 @@ func runCompact( level.Info(logger).Log("msg", "starting compact node") return nil } + +// genMissingIndexCacheFiles generates missing index cache files and uploads them to object storage. +func genMissingIndexCacheFiles(ctx context.Context, logger log.Logger, bkt objstore.Bucket, dir string) error { + if err := os.RemoveAll(dir); err != nil { + return errors.Wrap(err, "clean index cache directory") + } + if err := os.MkdirAll(dir, 0777); err != nil { + return errors.Wrap(err, "create dir") + } + + defer func() { + if err := os.RemoveAll(dir); err != nil { + level.Error(logger).Log("msg", "failed to remove index cache directory", "path", dir, "err", err) + } + }() + + level.Info(logger).Log("msg", "start index cache processing") + + var ( + metas []*metadata.Meta + ) + + err := bkt.Iter(ctx, "", func(name string) error { + id, ok := block.IsBlockDir(name) + if !ok { + return nil + } + + rc, err := bkt.Get(ctx, path.Join(id.String(), block.MetaFilename)) + if err != nil { + // Probably not finished block, skip it. + if bkt.IsObjNotFoundErr(err) { + level.Warn(logger).Log("msg", "meta file wasn't found", "block", id.String()) + return nil + } + return errors.Wrapf(err, "get meta for block %s", id) + } + defer runutil.CloseWithLogOnErr(logger, rc, "block reader") + + var meta metadata.Meta + if err := json.NewDecoder(rc).Decode(&meta); err != nil { + return errors.Wrap(err, "decode meta") + } + + // New version of compactor pushes index cache along with data block. + // Skip uncompacted blocks. + if meta.Compaction.Level == 1 { + return nil + } + + metas = append(metas, &meta) + + return nil + }) + if err != nil { + return errors.Wrap(err, "retrieve bucket block metas") + } + + for _, meta := range metas { + if err := generateIndexCacheFile(ctx, bkt, logger, dir, meta); err != nil { + return err + } + } + + level.Info(logger).Log("msg", "generating index cache files is done, you can remove startup argument `index.generate-missing-cache-file`") + return nil +} + +func generateIndexCacheFile( + ctx context.Context, + bkt objstore.Bucket, + logger log.Logger, + indexCacheDir string, + meta *metadata.Meta, +) error { + id := meta.ULID + + bdir := filepath.Join(indexCacheDir, id.String()) + if err := os.MkdirAll(bdir, 0777); err != nil { + return errors.Wrap(err, "create block dir") + } + + defer func() { + if err := os.Remove(bdir); err != nil { + level.Error(logger).Log("msg", "failed to remove index cache directory", "path", bdir, "err", err) + } + }() + + cachePath := filepath.Join(bdir, block.IndexCacheFilename) + cache := path.Join(meta.ULID.String(), block.IndexCacheFilename) + + ok, err := objstore.Exists(ctx, bkt, cache) + if ok { + return nil + } + if err != nil { + return errors.Wrapf(err, "attempt to check if a cached index file exists") + } + + level.Debug(logger).Log("msg", "make index cache", "block", id) + + // Try to download index file from obj store. + indexPath := filepath.Join(bdir, block.IndexFilename) + index := path.Join(id.String(), block.IndexFilename) + + if err := objstore.DownloadFile(ctx, logger, bkt, index, indexPath); err != nil { + return errors.Wrap(err, "download index file") + } + + if err := block.WriteIndexCache(logger, indexPath, cachePath); err != nil { + return errors.Wrap(err, "write index cache") + } + + if err := objstore.UploadFile(ctx, logger, bkt, cachePath, cache); err != nil { + return errors.Wrap(err, "upload index cache") + } + return nil +} diff --git a/pkg/block/block.go b/pkg/block/block.go index 008e1798d4b..5c1033a7d8d 100644 --- a/pkg/block/block.go +++ b/pkg/block/block.go @@ -25,6 +25,8 @@ const ( MetaFilename = "meta.json" // IndexFilename is the known index file for block index. IndexFilename = "index" + // IndexCacheFilename is the canonical name for index cache file that stores essential information needed. + IndexCacheFilename = "index.cache.json" // ChunksDirname is the known dir name for chunks with compressed samples. ChunksDirname = "chunks" @@ -93,6 +95,12 @@ func Upload(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bdir st return cleanUp(bkt, id, errors.Wrap(err, "upload index")) } + if meta.Thanos.Source == metadata.CompactorSource { + if err := objstore.UploadFile(ctx, logger, bkt, path.Join(bdir, IndexCacheFilename), path.Join(id.String(), IndexCacheFilename)); err != nil { + return cleanUp(bkt, id, errors.Wrap(err, "upload index cache")) + } + } + // Meta.json always need to be uploaded as a last item. This will allow to assume block directories without meta file // to be pending uploads. if err := objstore.UploadFile(ctx, logger, bkt, path.Join(bdir, MetaFilename), path.Join(id.String(), MetaFilename)); err != nil { diff --git a/pkg/block/index.go b/pkg/block/index.go index 777654f2e6b..41294e76f28 100644 --- a/pkg/block/index.go +++ b/pkg/block/index.go @@ -26,8 +26,10 @@ import ( "github.com/prometheus/tsdb/labels" ) -// IndexCacheFilename is the canonical name for index cache files. -const IndexCacheFilename = "index.cache.json" +const ( + // IndexCacheVersion is a enumeration of index cache versions supported by Thanos. + IndexCacheVersion1 = iota + 1 +) type postingsRange struct { Name, Value string @@ -35,10 +37,11 @@ type postingsRange struct { } type indexCache struct { - Version int - Symbols map[uint32]string - LabelValues map[string][]string - Postings []postingsRange + Version int + CacheVersion int + Symbols map[uint32]string + LabelValues map[string][]string + Postings []postingsRange } type realByteSlice []byte @@ -112,9 +115,10 @@ func WriteIndexCache(logger log.Logger, indexFn string, fn string) error { defer runutil.CloseWithLogOnErr(logger, f, "index cache writer") v := indexCache{ - Version: indexr.Version(), - Symbols: symbols, - LabelValues: map[string][]string{}, + Version: indexr.Version(), + CacheVersion: IndexCacheVersion1, + Symbols: symbols, + LabelValues: map[string][]string{}, } // Extract label value indices. diff --git a/pkg/block/metadata/meta.go b/pkg/block/metadata/meta.go index 0d8b22dd190..765c3533db4 100644 --- a/pkg/block/metadata/meta.go +++ b/pkg/block/metadata/meta.go @@ -36,7 +36,7 @@ const ( ) const ( - // MetaVersion is a enumeration of versions supported by Thanos. + // MetaVersion is a enumeration of meta versions supported by Thanos. MetaVersion1 = iota + 1 ) diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index 2de928ab6a0..dc0ab1682bf 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -3,6 +3,7 @@ package compact import ( "context" "fmt" + "io/ioutil" "os" "path/filepath" "sort" @@ -11,8 +12,6 @@ import ( "github.com/improbable-eng/thanos/pkg/block/metadata" - "io/ioutil" - "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/improbable-eng/thanos/pkg/block" @@ -61,6 +60,9 @@ type syncerMetrics struct { garbageCollectionDuration prometheus.Histogram compactions *prometheus.CounterVec compactionFailures *prometheus.CounterVec + indexCacheBlocks prometheus.Counter + indexCacheTraverse prometheus.Counter + indexCacheFailures prometheus.Counter } func newSyncerMetrics(reg prometheus.Registerer) *syncerMetrics { @@ -535,7 +537,6 @@ func (cg *Group) Compact(ctx context.Context, dir string, comp tsdb.Compactor) ( cg.compactionFailures.Inc() } cg.compactions.Inc() - return shouldRerun, compID, err } @@ -813,6 +814,8 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) ( "blocks", fmt.Sprintf("%v", plan), "duration", time.Since(begin)) bdir := filepath.Join(dir, compID.String()) + index := filepath.Join(bdir, block.IndexFilename) + indexCache := filepath.Join(bdir, block.IndexCacheFilename) newMeta, err := metadata.InjectThanos(cg.logger, bdir, metadata.Thanos{ Labels: cg.labels.Map(), @@ -828,7 +831,7 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) ( } // Ensure the output block is valid. - if err := block.VerifyIndex(cg.logger, filepath.Join(bdir, block.IndexFilename), newMeta.MinTime, newMeta.MaxTime); !cg.acceptMalformedIndex && err != nil { + if err := block.VerifyIndex(cg.logger, index, newMeta.MinTime, newMeta.MaxTime); err != nil { return false, ulid.ULID{}, halt(errors.Wrapf(err, "invalid result block %s", bdir)) } @@ -837,6 +840,10 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) ( return false, ulid.ULID{}, halt(errors.Wrapf(err, "resulted compacted block %s overlaps with something", bdir)) } + if err := block.WriteIndexCache(cg.logger, index, indexCache); err != nil { + return false, ulid.ULID{}, errors.Wrap(err, "write index cache") + } + begin = time.Now() if err := block.Upload(ctx, cg.logger, cg.bkt, bdir); err != nil { @@ -888,7 +895,14 @@ type BucketCompactor struct { } // NewBucketCompactor creates a new bucket compactor. -func NewBucketCompactor(logger log.Logger, sy *Syncer, comp tsdb.Compactor, compactDir string, bkt objstore.Bucket, concurrency int) (*BucketCompactor, error) { +func NewBucketCompactor( + logger log.Logger, + sy *Syncer, + comp tsdb.Compactor, + compactDir string, + bkt objstore.Bucket, + concurrency int, +) (*BucketCompactor, error) { if concurrency <= 0 { return nil, errors.New("invalid concurrency level (%d), concurrency level must be > 0") } diff --git a/pkg/compact/downsample/streamed_block_writer.go b/pkg/compact/downsample/streamed_block_writer.go index 2e2921ac344..16213c0cd01 100644 --- a/pkg/compact/downsample/streamed_block_writer.go +++ b/pkg/compact/downsample/streamed_block_writer.go @@ -195,6 +195,10 @@ func (w *streamedBlockWriter) finalize() error { return errors.Wrap(err, "write mem postings") } + if err := w.writeIndexCache(); err != nil { + return errors.Wrap(err, "write index cache") + } + if err := w.writeMetaFile(); err != nil { return errors.Wrap(err, "write meta meta") } @@ -253,6 +257,16 @@ func (w *streamedBlockWriter) writeMemPostings() error { return nil } +func (w *streamedBlockWriter) writeIndexCache() error { + indexFile := filepath.Join(w.blockDir, block.IndexFilename) + indexCacheFile := filepath.Join(w.blockDir, block.IndexCacheFilename) + if err := block.WriteIndexCache(w.logger, indexFile, indexCacheFile); err != nil { + return errors.Wrap(err, "write index cache") + } + + return nil +} + // writeMetaFile writes meta file. func (w *streamedBlockWriter) writeMetaFile() error { w.meta.Version = metadata.MetaVersion1 diff --git a/pkg/objstore/objstore.go b/pkg/objstore/objstore.go index 4b241e972a1..81d52d7d457 100644 --- a/pkg/objstore/objstore.go +++ b/pkg/objstore/objstore.go @@ -106,7 +106,7 @@ func DeleteDir(ctx context.Context, bkt Bucket, dir string) error { // DownloadFile downloads the src file from the bucket to dst. If dst is an existing // directory, a file with the same name as the source is created in dst. // If destination file is already existing, download file will overwrite it. -func DownloadFile(ctx context.Context, logger log.Logger, bkt BucketReader, src, dst string) error { +func DownloadFile(ctx context.Context, logger log.Logger, bkt BucketReader, src, dst string) (err error) { if fi, err := os.Stat(dst); err == nil { if fi.IsDir() { dst = filepath.Join(dst, filepath.Base(src)) @@ -125,8 +125,6 @@ func DownloadFile(ctx context.Context, logger log.Logger, bkt BucketReader, src, if err != nil { return errors.Wrap(err, "create file") } - defer runutil.CloseWithLogOnErr(logger, f, "download block's output file") - defer func() { if err != nil { if rerr := os.Remove(dst); rerr != nil { @@ -134,6 +132,8 @@ func DownloadFile(ctx context.Context, logger log.Logger, bkt BucketReader, src, } } }() + defer runutil.CloseWithLogOnErr(logger, f, "download block's output file") + if _, err = io.Copy(f, rc); err != nil { return errors.Wrap(err, "copy object to file") } @@ -170,6 +170,23 @@ func DownloadDir(ctx context.Context, logger log.Logger, bkt BucketReader, src, return nil } +// Exists returns true, if file exists, otherwise false and nil error if presence IsObjNotFoundErr, otherwise false with +// returning error. +func Exists(ctx context.Context, bkt Bucket, src string) (bool, error) { + rc, err := bkt.Get(ctx, src) + if rc != nil { + _ = rc.Close() + } + if err != nil { + if bkt.IsObjNotFoundErr(err) { + return false, nil + } + return false, errors.Wrap(err, "stat object") + } + + return true, nil +} + // BucketWithMetrics takes a bucket and registers metrics with the given registry for // operations run against the bucket. func BucketWithMetrics(name string, b Bucket, r prometheus.Registerer) Bucket { diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index d2070229f10..8e6c3533c58 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -1033,10 +1033,11 @@ type bucketBlock struct { indexVersion int symbols map[uint32]string + symbolsV2 map[string]struct{} lvals map[string][]string postings map[labels.Label]index.Range - indexObj string + id ulid.ULID chunkObjs []string pendingReaders sync.WaitGroup @@ -1057,7 +1058,7 @@ func newBucketBlock( b = &bucketBlock{ logger: logger, bucket: bkt, - indexObj: path.Join(id.String(), block.IndexFilename), + id: id, indexCache: indexCache, chunkPool: chunkPool, dir: dir, @@ -1080,6 +1081,14 @@ func newBucketBlock( return b, nil } +func (b *bucketBlock) indexFilename() string { + return path.Join(b.id.String(), block.IndexFilename) +} + +func (b *bucketBlock) indexCacheFilename() string { + return path.Join(b.id.String(), block.IndexCacheFilename) +} + func (b *bucketBlock) loadMeta(ctx context.Context, id ulid.ULID) error { // If we haven't seen the block before download the meta.json file. if _, err := os.Stat(b.dir); os.IsNotExist(err) { @@ -1104,20 +1113,29 @@ func (b *bucketBlock) loadMeta(ctx context.Context, id ulid.ULID) error { func (b *bucketBlock) loadIndexCache(ctx context.Context) (err error) { cachefn := filepath.Join(b.dir, block.IndexCacheFilename) - - b.indexVersion, b.symbols, b.lvals, b.postings, err = block.ReadIndexCache(b.logger, cachefn) - if err == nil { + if err = b.loadIndexCacheFromFile(ctx, cachefn); err == nil { return nil } if !os.IsNotExist(errors.Cause(err)) { return errors.Wrap(err, "read index cache") } - // No cache exists is on disk yet, build it from the downloaded index and retry. + + // Try to download index cache file from object store. + if err = objstore.DownloadFile(ctx, b.logger, b.bucket, b.indexCacheFilename(), cachefn); err == nil { + return b.loadIndexCacheFromFile(ctx, cachefn) + } + + if !b.bucket.IsObjNotFoundErr(errors.Cause(err)) { + return errors.Wrap(err, "download index cache file") + } + + // No cache exists on disk yet, build it from the downloaded index and retry. fn := filepath.Join(b.dir, block.IndexFilename) - if err := objstore.DownloadFile(ctx, b.logger, b.bucket, b.indexObj, fn); err != nil { + if err := objstore.DownloadFile(ctx, b.logger, b.bucket, b.indexFilename(), fn); err != nil { return errors.Wrap(err, "download index file") } + defer func() { if rerr := os.Remove(fn); rerr != nil { level.Error(b.logger).Log("msg", "failed to remove temp index file", "path", fn, "err", rerr) @@ -1128,15 +1146,16 @@ func (b *bucketBlock) loadIndexCache(ctx context.Context) (err error) { return errors.Wrap(err, "write index cache") } - b.indexVersion, b.symbols, b.lvals, b.postings, err = block.ReadIndexCache(b.logger, cachefn) - if err != nil { - return errors.Wrap(err, "read index cache") - } - return nil + return errors.Wrap(b.loadIndexCacheFromFile(ctx, cachefn), "read index cache") +} + +func (b *bucketBlock) loadIndexCacheFromFile(ctx context.Context, cache string) (err error) { + b.indexVersion, b.symbols, b.lvals, b.postings, err = block.ReadIndexCache(b.logger, cache) + return err } func (b *bucketBlock) readIndexRange(ctx context.Context, off, length int64) ([]byte, error) { - r, err := b.bucket.GetRange(ctx, b.indexObj, off, length) + r, err := b.bucket.GetRange(ctx, b.indexFilename(), off, length) if err != nil { return nil, errors.Wrap(err, "get range reader") }