Skip to content

Commit

Permalink
store+compactor: process index cache during compaction
Browse files Browse the repository at this point in the history
Add few steps during compaction:

1. Generate index cache for old blocks made by compactor until this version.
2. Generate index cache during group compaction.
3. Generate index cache during downsampling.
4. Add index cache version to cache file.

Store downloads index cache files from object store or generate on the
fly if they don't exist.

Signed-off-by: Aleksei Semiglazov <[email protected]>
  • Loading branch information
xjewer committed Apr 11, 2019
1 parent 6ffd36b commit 7466e90
Show file tree
Hide file tree
Showing 8 changed files with 243 additions and 31 deletions.
136 changes: 136 additions & 0 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,22 @@ package main

import (
"context"
"encoding/json"
"fmt"
"os"
"path"
"path/filepath"
"strconv"
"strings"
"time"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/improbable-eng/thanos/pkg/block"
"github.com/improbable-eng/thanos/pkg/block/metadata"
"github.com/improbable-eng/thanos/pkg/compact"
"github.com/improbable-eng/thanos/pkg/compact/downsample"
"github.com/improbable-eng/thanos/pkg/objstore"
"github.com/improbable-eng/thanos/pkg/objstore/client"
"github.com/improbable-eng/thanos/pkg/runutil"
"github.com/oklog/run"
Expand Down Expand Up @@ -87,6 +92,9 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri
wait := cmd.Flag("wait", "Do not exit after all compactions have been processed and wait for new work.").
Short('w').Bool()

generateMissingIndexCacheFiles := cmd.Flag("index.generate-missing-cache-file", "Process indices' cache, upload them to object store and update metas.").
Hidden().Default("false").Bool()

// TODO(bplotka): Remove this flag once https://github.com/improbable-eng/thanos/issues/297 is fixed.
disableDownsampling := cmd.Flag("debug.disable-downsampling", "Disables downsampling. This is not recommended "+
"as querying long time ranges without non-downsampled data is not efficient and not useful (is not possible to render all for human eye).").
Expand All @@ -110,6 +118,7 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri
*haltOnError,
*acceptMalformedIndex,
*wait,
*generateMissingIndexCacheFiles,
map[compact.ResolutionLevel]time.Duration{
compact.ResolutionLevelRaw: time.Duration(*retentionRaw),
compact.ResolutionLevel5m: time.Duration(*retention5m),
Expand All @@ -135,6 +144,7 @@ func runCompact(
haltOnError bool,
acceptMalformedIndex bool,
wait bool,
generateMissingIndexCacheFiles bool,
retentionByResolution map[compact.ResolutionLevel]time.Duration,
component string,
disableDownsampling bool,
Expand Down Expand Up @@ -197,6 +207,7 @@ func runCompact(
var (
compactDir = path.Join(dataDir, "compact")
downsamplingDir = path.Join(dataDir, "downsample")
indexCacheDir = path.Join(dataDir, "index_cache")
)

if err := os.RemoveAll(downsamplingDir); err != nil {
Expand Down Expand Up @@ -255,6 +266,13 @@ func runCompact(
g.Add(func() error {
defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client")

// Generate index file
if generateMissingIndexCacheFiles {
if err := genMissingIndexCacheFiles(ctx, logger, bkt, indexCacheDir); err != nil {
return err
}
}

if !wait {
return f()
}
Expand Down Expand Up @@ -300,3 +318,121 @@ func runCompact(
level.Info(logger).Log("msg", "starting compact node")
return nil
}

// genMissingIndexCacheFiles generates missing index cache files and uploads them to object storage.
func genMissingIndexCacheFiles(ctx context.Context, logger log.Logger, bkt objstore.Bucket, dir string) error {
if err := os.RemoveAll(dir); err != nil {
return errors.Wrap(err, "clean index cache directory")
}
if err := os.MkdirAll(dir, 0777); err != nil {
return errors.Wrap(err, "create dir")
}

defer func() {
if err := os.RemoveAll(dir); err != nil {
level.Error(logger).Log("msg", "failed to remove index cache directory", "path", dir, "err", err)
}
}()

level.Info(logger).Log("msg", "start index cache processing")

var (
metas []*metadata.Meta
)

err := bkt.Iter(ctx, "", func(name string) error {
id, ok := block.IsBlockDir(name)
if !ok {
return nil
}

rc, err := bkt.Get(ctx, path.Join(id.String(), block.MetaFilename))
if err != nil {
// Probably not finished block, skip it.
if bkt.IsObjNotFoundErr(err) {
level.Warn(logger).Log("msg", "meta file wasn't found", "block", id.String())
return nil
}
return errors.Wrapf(err, "get meta for block %s", id)
}
defer runutil.CloseWithLogOnErr(logger, rc, "block reader")

var meta metadata.Meta
if err := json.NewDecoder(rc).Decode(&meta); err != nil {
return errors.Wrap(err, "decode meta")
}

// New version of compactor pushes index cache along with data block.
// Skip uncompacted blocks.
if meta.Compaction.Level == 1 {
return nil
}

metas = append(metas, &meta)

return nil
})
if err != nil {
return errors.Wrap(err, "retrieve bucket block metas")
}

for _, meta := range metas {
if err := generateIndexCacheFile(ctx, bkt, logger, dir, meta); err != nil {
return err
}
}

level.Info(logger).Log("msg", "generating index cache files is done, you can remove startup argument `index.generate-missing-cache-file`")
return nil
}

func generateIndexCacheFile(
ctx context.Context,
bkt objstore.Bucket,
logger log.Logger,
indexCacheDir string,
meta *metadata.Meta,
) error {
id := meta.ULID

bdir := filepath.Join(indexCacheDir, id.String())
if err := os.MkdirAll(bdir, 0777); err != nil {
return errors.Wrap(err, "create block dir")
}

defer func() {
if err := os.Remove(bdir); err != nil {
level.Error(logger).Log("msg", "failed to remove index cache directory", "path", bdir, "err", err)
}
}()

cachePath := filepath.Join(bdir, block.IndexCacheFilename)
cache := path.Join(meta.ULID.String(), block.IndexCacheFilename)

ok, err := objstore.Exists(ctx, bkt, cache)
if ok {
return nil
}
if err != nil {
return errors.Wrapf(err, "attempt to check if a cached index file exists")
}

level.Debug(logger).Log("msg", "make index cache", "block", id)

// Try to download index file from obj store.
indexPath := filepath.Join(bdir, block.IndexFilename)
index := path.Join(id.String(), block.IndexFilename)

if err := objstore.DownloadFile(ctx, logger, bkt, index, indexPath); err != nil {
return errors.Wrap(err, "download index file")
}

if err := block.WriteIndexCache(logger, indexPath, cachePath); err != nil {
return errors.Wrap(err, "write index cache")
}

if err := objstore.UploadFile(ctx, logger, bkt, cachePath, cache); err != nil {
return errors.Wrap(err, "upload index cache")
}
return nil
}
8 changes: 8 additions & 0 deletions pkg/block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ const (
MetaFilename = "meta.json"
// IndexFilename is the known index file for block index.
IndexFilename = "index"
// IndexCacheFilename is the canonical name for index cache file that stores essential information needed.
IndexCacheFilename = "index.cache.json"
// ChunksDirname is the known dir name for chunks with compressed samples.
ChunksDirname = "chunks"

Expand Down Expand Up @@ -93,6 +95,12 @@ func Upload(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bdir st
return cleanUp(bkt, id, errors.Wrap(err, "upload index"))
}

if meta.Thanos.Source == metadata.CompactorSource {
if err := objstore.UploadFile(ctx, logger, bkt, path.Join(bdir, IndexCacheFilename), path.Join(id.String(), IndexCacheFilename)); err != nil {
return cleanUp(bkt, id, errors.Wrap(err, "upload index cache"))
}
}

// Meta.json always need to be uploaded as a last item. This will allow to assume block directories without meta file
// to be pending uploads.
if err := objstore.UploadFile(ctx, logger, bkt, path.Join(bdir, MetaFilename), path.Join(id.String(), MetaFilename)); err != nil {
Expand Down
22 changes: 13 additions & 9 deletions pkg/block/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,22 @@ import (
"github.com/prometheus/tsdb/labels"
)

// IndexCacheFilename is the canonical name for index cache files.
const IndexCacheFilename = "index.cache.json"
const (
// IndexCacheVersion is a enumeration of index cache versions supported by Thanos.
IndexCacheVersion1 = iota + 1
)

type postingsRange struct {
Name, Value string
Start, End int64
}

type indexCache struct {
Version int
Symbols map[uint32]string
LabelValues map[string][]string
Postings []postingsRange
Version int
CacheVersion int
Symbols map[uint32]string
LabelValues map[string][]string
Postings []postingsRange
}

type realByteSlice []byte
Expand Down Expand Up @@ -112,9 +115,10 @@ func WriteIndexCache(logger log.Logger, indexFn string, fn string) error {
defer runutil.CloseWithLogOnErr(logger, f, "index cache writer")

v := indexCache{
Version: indexr.Version(),
Symbols: symbols,
LabelValues: map[string][]string{},
Version: indexr.Version(),
CacheVersion: IndexCacheVersion1,
Symbols: symbols,
LabelValues: map[string][]string{},
}

// Extract label value indices.
Expand Down
2 changes: 1 addition & 1 deletion pkg/block/metadata/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ const (
)

const (
// MetaVersion is a enumeration of versions supported by Thanos.
// MetaVersion is a enumeration of meta versions supported by Thanos.
MetaVersion1 = iota + 1
)

Expand Down
24 changes: 19 additions & 5 deletions pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package compact
import (
"context"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"sort"
Expand All @@ -11,8 +12,6 @@ import (

"github.com/improbable-eng/thanos/pkg/block/metadata"

"io/ioutil"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/improbable-eng/thanos/pkg/block"
Expand Down Expand Up @@ -61,6 +60,9 @@ type syncerMetrics struct {
garbageCollectionDuration prometheus.Histogram
compactions *prometheus.CounterVec
compactionFailures *prometheus.CounterVec
indexCacheBlocks prometheus.Counter
indexCacheTraverse prometheus.Counter
indexCacheFailures prometheus.Counter
}

func newSyncerMetrics(reg prometheus.Registerer) *syncerMetrics {
Expand Down Expand Up @@ -535,7 +537,6 @@ func (cg *Group) Compact(ctx context.Context, dir string, comp tsdb.Compactor) (
cg.compactionFailures.Inc()
}
cg.compactions.Inc()

return shouldRerun, compID, err
}

Expand Down Expand Up @@ -813,6 +814,8 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) (
"blocks", fmt.Sprintf("%v", plan), "duration", time.Since(begin))

bdir := filepath.Join(dir, compID.String())
index := filepath.Join(bdir, block.IndexFilename)
indexCache := filepath.Join(bdir, block.IndexCacheFilename)

newMeta, err := metadata.InjectThanos(cg.logger, bdir, metadata.Thanos{
Labels: cg.labels.Map(),
Expand All @@ -828,7 +831,7 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) (
}

// Ensure the output block is valid.
if err := block.VerifyIndex(cg.logger, filepath.Join(bdir, block.IndexFilename), newMeta.MinTime, newMeta.MaxTime); !cg.acceptMalformedIndex && err != nil {
if err := block.VerifyIndex(cg.logger, index, newMeta.MinTime, newMeta.MaxTime); err != nil {
return false, ulid.ULID{}, halt(errors.Wrapf(err, "invalid result block %s", bdir))
}

Expand All @@ -837,6 +840,10 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) (
return false, ulid.ULID{}, halt(errors.Wrapf(err, "resulted compacted block %s overlaps with something", bdir))
}

if err := block.WriteIndexCache(cg.logger, index, indexCache); err != nil {
return false, ulid.ULID{}, errors.Wrap(err, "write index cache")
}

begin = time.Now()

if err := block.Upload(ctx, cg.logger, cg.bkt, bdir); err != nil {
Expand Down Expand Up @@ -888,7 +895,14 @@ type BucketCompactor struct {
}

// NewBucketCompactor creates a new bucket compactor.
func NewBucketCompactor(logger log.Logger, sy *Syncer, comp tsdb.Compactor, compactDir string, bkt objstore.Bucket, concurrency int) (*BucketCompactor, error) {
func NewBucketCompactor(
logger log.Logger,
sy *Syncer,
comp tsdb.Compactor,
compactDir string,
bkt objstore.Bucket,
concurrency int,
) (*BucketCompactor, error) {
if concurrency <= 0 {
return nil, errors.New("invalid concurrency level (%d), concurrency level must be > 0")
}
Expand Down
14 changes: 14 additions & 0 deletions pkg/compact/downsample/streamed_block_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,10 @@ func (w *streamedBlockWriter) finalize() error {
return errors.Wrap(err, "write mem postings")
}

if err := w.writeIndexCache(); err != nil {
return errors.Wrap(err, "write index cache")
}

if err := w.writeMetaFile(); err != nil {
return errors.Wrap(err, "write meta meta")
}
Expand Down Expand Up @@ -253,6 +257,16 @@ func (w *streamedBlockWriter) writeMemPostings() error {
return nil
}

func (w *streamedBlockWriter) writeIndexCache() error {
indexFile := filepath.Join(w.blockDir, block.IndexFilename)
indexCacheFile := filepath.Join(w.blockDir, block.IndexCacheFilename)
if err := block.WriteIndexCache(w.logger, indexFile, indexCacheFile); err != nil {
return errors.Wrap(err, "write index cache")
}

return nil
}

// writeMetaFile writes meta file.
func (w *streamedBlockWriter) writeMetaFile() error {
w.meta.Version = metadata.MetaVersion1
Expand Down
Loading

0 comments on commit 7466e90

Please sign in to comment.