Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store+compactor: process index cache during compaction #986

Merged
merged 1 commit into from
Apr 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ New options:

* `--store.grpc.series-sample-limit` limits the amount of samples that might be retrieved on a single Series() call. By default it is 0. Consider enabling it by setting it to more than 0 if you are running on limited resources.
* `--store.grpc.series-max-concurrency` limits the number of concurrent Series() calls in Thanos Store. By default it is 20. Considering making it lower or bigger depending on the scale of your deployment.
* `--index.generate-missing-cache-file` if enabled, on startup compactor runs an on-off job that scans all the blocks to find all blocks with missing index cache file. It generates those if needed and upload. By default is disabled. Check logs on existence the line `generating index cache files is done`, then you can disable this flag.

New metrics:
* `thanos_bucket_store_queries_dropped_total` shows how many queries were dropped due to the samples limit;
Expand All @@ -35,6 +36,7 @@ New tracing span:
- [#970](https://github.com/improbable-eng/thanos/pull/970) Added `PartialResponseStrategy` field for `RuleGroups` for `Ruler`.
- [#1016](https://github.com/improbable-eng/thanos/pull/1016) Added option for another DNS resolver (miekg/dns client).
This to have SRV resolution working on [Golang 1.11+ with KubeDNS below v1.14](https://github.com/golang/go/issues/27546)
- [#986](https://github.com/improbable-eng/thanos/pull/986) Store index cache files in object storage, reduces store start-up time by skipping the generating the index cache for all blocks and only do this for recently created uncompacted blocks.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be reworded, but we can do it later on cut 0.4.0 👍


### Changed
- [#970](https://github.com/improbable-eng/thanos/pull/970) Deprecated partial_response_disabled proto field. Added partial_response_strategy instead. Both in gRPC and Query API.
Expand Down
133 changes: 133 additions & 0 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,22 @@ package main

import (
"context"
"encoding/json"
"fmt"
"os"
"path"
"path/filepath"
"strconv"
"strings"
"time"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/improbable-eng/thanos/pkg/block"
"github.com/improbable-eng/thanos/pkg/block/metadata"
"github.com/improbable-eng/thanos/pkg/compact"
"github.com/improbable-eng/thanos/pkg/compact/downsample"
"github.com/improbable-eng/thanos/pkg/objstore"
"github.com/improbable-eng/thanos/pkg/objstore/client"
"github.com/improbable-eng/thanos/pkg/runutil"
"github.com/oklog/run"
Expand Down Expand Up @@ -87,6 +92,9 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri
wait := cmd.Flag("wait", "Do not exit after all compactions have been processed and wait for new work.").
Short('w').Bool()

generateMissingIndexCacheFiles := cmd.Flag("index.generate-missing-cache-file", "If enabled, on startup compactor runs an on-off job that scans all the blocks to find all blocks with missing index cache file. It generates those if needed and upload.").
Hidden().Default("false").Bool()

// TODO(bplotka): Remove this flag once https://github.com/improbable-eng/thanos/issues/297 is fixed.
disableDownsampling := cmd.Flag("debug.disable-downsampling", "Disables downsampling. This is not recommended "+
"as querying long time ranges without non-downsampled data is not efficient and not useful (is not possible to render all for human eye).").
Expand All @@ -110,6 +118,7 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri
*haltOnError,
*acceptMalformedIndex,
*wait,
*generateMissingIndexCacheFiles,
map[compact.ResolutionLevel]time.Duration{
compact.ResolutionLevelRaw: time.Duration(*retentionRaw),
compact.ResolutionLevel5m: time.Duration(*retention5m),
Expand All @@ -135,6 +144,7 @@ func runCompact(
haltOnError bool,
acceptMalformedIndex bool,
wait bool,
generateMissingIndexCacheFiles bool,
retentionByResolution map[compact.ResolutionLevel]time.Duration,
component string,
disableDownsampling bool,
Expand Down Expand Up @@ -197,6 +207,7 @@ func runCompact(
var (
compactDir = path.Join(dataDir, "compact")
downsamplingDir = path.Join(dataDir, "downsample")
indexCacheDir = path.Join(dataDir, "index_cache")
)

if err := os.RemoveAll(downsamplingDir); err != nil {
Expand Down Expand Up @@ -255,6 +266,13 @@ func runCompact(
g.Add(func() error {
defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client")

// Generate index file.
if generateMissingIndexCacheFiles {
if err := genMissingIndexCacheFiles(ctx, logger, bkt, indexCacheDir); err != nil {
bwplotka marked this conversation as resolved.
Show resolved Hide resolved
return err
}
}

if !wait {
return f()
}
Expand Down Expand Up @@ -300,3 +318,118 @@ func runCompact(
level.Info(logger).Log("msg", "starting compact node")
return nil
}

// genMissingIndexCacheFiles scans over all blocks, generates missing index cache files and uploads them to object storage.
func genMissingIndexCacheFiles(ctx context.Context, logger log.Logger, bkt objstore.Bucket, dir string) error {
if err := os.RemoveAll(dir); err != nil {
return errors.Wrap(err, "clean index cache directory")
}
if err := os.MkdirAll(dir, 0777); err != nil {
return errors.Wrap(err, "create dir")
}

defer func() {
if err := os.RemoveAll(dir); err != nil {
level.Error(logger).Log("msg", "failed to remove index cache directory", "path", dir, "err", err)
}
}()

level.Info(logger).Log("msg", "start index cache processing")

var metas []*metadata.Meta

if err := bkt.Iter(ctx, "", func(name string) error {
id, ok := block.IsBlockDir(name)
if !ok {
return nil
}

rc, err := bkt.Get(ctx, path.Join(id.String(), block.MetaFilename))
if err != nil {
// Probably not finished block, skip it.
if bkt.IsObjNotFoundErr(err) {
level.Warn(logger).Log("msg", "meta file wasn't found", "block", id.String())
bwplotka marked this conversation as resolved.
Show resolved Hide resolved
return nil
}
return errors.Wrapf(err, "get meta for block %s", id)
}
defer runutil.CloseWithLogOnErr(logger, rc, "block reader")

var meta metadata.Meta
if err := json.NewDecoder(rc).Decode(&meta); err != nil {
return errors.Wrap(err, "decode meta")
}

// New version of compactor pushes index cache along with data block.
// Skip uncompacted blocks.
bwplotka marked this conversation as resolved.
Show resolved Hide resolved
if meta.Compaction.Level == 1 {
return nil
}

metas = append(metas, &meta)

return nil
}); err != nil {
return errors.Wrap(err, "retrieve bucket block metas")
}

for _, meta := range metas {
if err := generateIndexCacheFile(ctx, bkt, logger, dir, meta); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be more verbose I would check if index cache is missing first in this loop before going to generateIndexCacheFile?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's keep where it is, otherwise you have to pass IndexCacheFilename as a parameter to generateIndexCacheFile.
I can rename it generateIndexCacheFile -> checkIfExistsOrGenerateIndexCacheFile

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

passing another parameter is not that bad ;p

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but up to you

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not addressed I think

return err
}
}

level.Info(logger).Log("msg", "generating index cache files is done, you can remove startup argument `index.generate-missing-cache-file`")
return nil
}

func generateIndexCacheFile(
ctx context.Context,
bkt objstore.Bucket,
logger log.Logger,
indexCacheDir string,
meta *metadata.Meta,
) error {
id := meta.ULID

bdir := filepath.Join(indexCacheDir, id.String())
if err := os.MkdirAll(bdir, 0777); err != nil {
return errors.Wrap(err, "create block dir")
}

defer func() {
if err := os.RemoveAll(bdir); err != nil {
level.Error(logger).Log("msg", "failed to remove index cache directory", "path", bdir, "err", err)
}
}()

cachePath := filepath.Join(bdir, block.IndexCacheFilename)
cache := path.Join(meta.ULID.String(), block.IndexCacheFilename)

ok, err := objstore.Exists(ctx, bkt, cache)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's move it as commented above

if ok {
return nil
}
if err != nil {
return errors.Wrapf(err, "attempt to check if a cached index file exists")
}

level.Debug(logger).Log("msg", "make index cache", "block", id)

// Try to download index file from obj store.
indexPath := filepath.Join(bdir, block.IndexFilename)
index := path.Join(id.String(), block.IndexFilename)

if err := objstore.DownloadFile(ctx, logger, bkt, index, indexPath); err != nil {
return errors.Wrap(err, "download index file")
}

if err := block.WriteIndexCache(logger, indexPath, cachePath); err != nil {
return errors.Wrap(err, "write index cache")
}

if err := objstore.UploadFile(ctx, logger, bkt, cachePath, cache); err != nil {
return errors.Wrap(err, "upload index cache")
}
return nil
}
8 changes: 8 additions & 0 deletions pkg/block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ const (
MetaFilename = "meta.json"
// IndexFilename is the known index file for block index.
IndexFilename = "index"
// IndexCacheFilename is the canonical name for index cache file that stores essential information needed.
IndexCacheFilename = "index.cache.json"
// ChunksDirname is the known dir name for chunks with compressed samples.
ChunksDirname = "chunks"

Expand Down Expand Up @@ -93,6 +95,12 @@ func Upload(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bdir st
return cleanUp(bkt, id, errors.Wrap(err, "upload index"))
}

if meta.Thanos.Source == metadata.CompactorSource {
if err := objstore.UploadFile(ctx, logger, bkt, path.Join(bdir, IndexCacheFilename), path.Join(id.String(), IndexCacheFilename)); err != nil {
return cleanUp(bkt, id, errors.Wrap(err, "upload index cache"))
}
}

// Meta.json always need to be uploaded as a last item. This will allow to assume block directories without meta file
// to be pending uploads.
if err := objstore.UploadFile(ctx, logger, bkt, path.Join(bdir, MetaFilename), path.Join(id.String(), MetaFilename)); err != nil {
Expand Down
22 changes: 13 additions & 9 deletions pkg/block/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,22 @@ import (
"github.com/prometheus/tsdb/labels"
)

// IndexCacheFilename is the canonical name for index cache files.
const IndexCacheFilename = "index.cache.json"
const (
// IndexCacheVersion is a enumeration of index cache versions supported by Thanos.
IndexCacheVersion1 = iota + 1
)

type postingsRange struct {
Name, Value string
Start, End int64
}

type indexCache struct {
Version int
Symbols map[uint32]string
LabelValues map[string][]string
Postings []postingsRange
Version int
CacheVersion int
xjewer marked this conversation as resolved.
Show resolved Hide resolved
Symbols map[uint32]string
LabelValues map[string][]string
Postings []postingsRange
}

type realByteSlice []byte
Expand Down Expand Up @@ -112,9 +115,10 @@ func WriteIndexCache(logger log.Logger, indexFn string, fn string) error {
defer runutil.CloseWithLogOnErr(logger, f, "index cache writer")

v := indexCache{
Version: indexr.Version(),
Symbols: symbols,
LabelValues: map[string][]string{},
Version: indexr.Version(),
CacheVersion: IndexCacheVersion1,
Symbols: symbols,
LabelValues: map[string][]string{},
}

// Extract label value indices.
Expand Down
2 changes: 1 addition & 1 deletion pkg/block/metadata/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ const (
)

const (
// MetaVersion is a enumeration of versions supported by Thanos.
// MetaVersion is a enumeration of meta versions supported by Thanos.
MetaVersion1 = iota + 1
)

Expand Down
24 changes: 19 additions & 5 deletions pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package compact
import (
"context"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"sort"
Expand All @@ -11,8 +12,6 @@ import (

"github.com/improbable-eng/thanos/pkg/block/metadata"

"io/ioutil"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/improbable-eng/thanos/pkg/block"
Expand Down Expand Up @@ -61,6 +60,9 @@ type syncerMetrics struct {
garbageCollectionDuration prometheus.Histogram
compactions *prometheus.CounterVec
compactionFailures *prometheus.CounterVec
indexCacheBlocks prometheus.Counter
indexCacheTraverse prometheus.Counter
indexCacheFailures prometheus.Counter
}

func newSyncerMetrics(reg prometheus.Registerer) *syncerMetrics {
Expand Down Expand Up @@ -535,7 +537,6 @@ func (cg *Group) Compact(ctx context.Context, dir string, comp tsdb.Compactor) (
cg.compactionFailures.Inc()
}
cg.compactions.Inc()

return shouldRerun, compID, err
}

Expand Down Expand Up @@ -813,6 +814,8 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) (
"blocks", fmt.Sprintf("%v", plan), "duration", time.Since(begin))

bdir := filepath.Join(dir, compID.String())
index := filepath.Join(bdir, block.IndexFilename)
indexCache := filepath.Join(bdir, block.IndexCacheFilename)

newMeta, err := metadata.InjectThanos(cg.logger, bdir, metadata.Thanos{
Labels: cg.labels.Map(),
Expand All @@ -828,7 +831,7 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) (
}

// Ensure the output block is valid.
if err := block.VerifyIndex(cg.logger, filepath.Join(bdir, block.IndexFilename), newMeta.MinTime, newMeta.MaxTime); !cg.acceptMalformedIndex && err != nil {
if err := block.VerifyIndex(cg.logger, index, newMeta.MinTime, newMeta.MaxTime); !cg.acceptMalformedIndex && err != nil {
return false, ulid.ULID{}, halt(errors.Wrapf(err, "invalid result block %s", bdir))
}

Expand All @@ -837,6 +840,10 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) (
return false, ulid.ULID{}, halt(errors.Wrapf(err, "resulted compacted block %s overlaps with something", bdir))
}

if err := block.WriteIndexCache(cg.logger, index, indexCache); err != nil {
return false, ulid.ULID{}, errors.Wrap(err, "write index cache")
}

begin = time.Now()

if err := block.Upload(ctx, cg.logger, cg.bkt, bdir); err != nil {
Expand Down Expand Up @@ -888,7 +895,14 @@ type BucketCompactor struct {
}

// NewBucketCompactor creates a new bucket compactor.
func NewBucketCompactor(logger log.Logger, sy *Syncer, comp tsdb.Compactor, compactDir string, bkt objstore.Bucket, concurrency int) (*BucketCompactor, error) {
func NewBucketCompactor(
logger log.Logger,
sy *Syncer,
comp tsdb.Compactor,
compactDir string,
bkt objstore.Bucket,
concurrency int,
) (*BucketCompactor, error) {
if concurrency <= 0 {
return nil, errors.New("invalid concurrency level (%d), concurrency level must be > 0")
}
Expand Down
14 changes: 14 additions & 0 deletions pkg/compact/downsample/streamed_block_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,10 @@ func (w *streamedBlockWriter) finalize() error {
return errors.Wrap(err, "write mem postings")
}

if err := w.writeIndexCache(); err != nil {
return errors.Wrap(err, "write index cache")
}

if err := w.writeMetaFile(); err != nil {
return errors.Wrap(err, "write meta meta")
}
Expand Down Expand Up @@ -253,6 +257,16 @@ func (w *streamedBlockWriter) writeMemPostings() error {
return nil
}

func (w *streamedBlockWriter) writeIndexCache() error {
indexFile := filepath.Join(w.blockDir, block.IndexFilename)
indexCacheFile := filepath.Join(w.blockDir, block.IndexCacheFilename)
if err := block.WriteIndexCache(w.logger, indexFile, indexCacheFile); err != nil {
return errors.Wrap(err, "write index cache")
}

return nil
}

// writeMetaFile writes meta file.
func (w *streamedBlockWriter) writeMetaFile() error {
w.meta.Version = metadata.MetaVersion1
Expand Down
Loading