diff --git a/CHANGELOG.md b/CHANGELOG.md index bb7f9e7e2a..7791839f8d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -34,6 +34,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel - [#1525](https://github.com/thanos-io/thanos/pull/1525) Thanos now deletes block's file in correct order allowing to detect partial blocks without problems. - [#1505](https://github.com/thanos-io/thanos/pull/1505) Thanos store now removes invalid local cache blocks. +- [#1587](https://github.com/thanos-io/thanos/pull/1587) Cleanup all cache dirs after each compaction run. - [#1582](https://github.com/thanos-io/thanos/pull/1582) Thanos rule correctly parses Alertmanager URL if there is more `+` in it. ## v0.7.0 - 2019.09.02 diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index fdd6e50c9b..12f6573a0e 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -295,7 +295,7 @@ func runCompact( // Generate index file. if generateMissingIndexCacheFiles { - if err := genMissingIndexCacheFiles(ctx, logger, bkt, indexCacheDir); err != nil { + if err := genMissingIndexCacheFiles(ctx, logger, reg, bkt, indexCacheDir); err != nil { return err } } @@ -343,8 +343,19 @@ func runCompact( return nil } +const ( + MetricIndexGenerateName = "thanos_compact_generated_index_total" + MetricIndexGenerateHelp = "Total number of generated indexes." +) + // genMissingIndexCacheFiles scans over all blocks, generates missing index cache files and uploads them to object storage. -func genMissingIndexCacheFiles(ctx context.Context, logger log.Logger, bkt objstore.Bucket, dir string) error { +func genMissingIndexCacheFiles(ctx context.Context, logger log.Logger, reg *prometheus.Registry, bkt objstore.Bucket, dir string) error { + genIndex := prometheus.NewCounter(prometheus.CounterOpts{ + Name: MetricIndexGenerateName, + Help: MetricIndexGenerateHelp, + }) + reg.MustRegister(genIndex) + if err := os.RemoveAll(dir); err != nil { return errors.Wrap(err, "clean index cache directory") } @@ -395,6 +406,7 @@ func genMissingIndexCacheFiles(ctx context.Context, logger log.Logger, bkt objst if err := generateIndexCacheFile(ctx, bkt, logger, dir, meta); err != nil { return err } + genIndex.Inc() } level.Info(logger).Log("msg", "generating index cache files is done, you can remove startup argument `index.generate-missing-cache-file`") diff --git a/cmd/thanos/downsample.go b/cmd/thanos/downsample.go index a9dd739a52..b6b2397127 100644 --- a/cmd/thanos/downsample.go +++ b/cmd/thanos/downsample.go @@ -144,6 +144,13 @@ func downsampleBucket( if err := os.MkdirAll(dir, 0777); err != nil { return errors.Wrap(err, "create dir") } + + defer func() { + if err := os.RemoveAll(dir); err != nil { + level.Error(logger).Log("msg", "failed to remove downsample cache directory", "path", dir, "err", err) + } + }() + var metas []*metadata.Meta err := bkt.Iter(ctx, "", func(name string) error { diff --git a/cmd/thanos/main_test.go b/cmd/thanos/main_test.go new file mode 100644 index 0000000000..138ce52579 --- /dev/null +++ b/cmd/thanos/main_test.go @@ -0,0 +1,128 @@ +package main + +import ( + "context" + "io/ioutil" + "os" + "path" + + "testing" + "time" + + "github.com/go-kit/kit/log" + "github.com/oklog/ulid" + "github.com/prometheus/client_golang/prometheus" + promtest "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/prometheus/prometheus/tsdb" + "github.com/prometheus/prometheus/tsdb/labels" + "github.com/thanos-io/thanos/pkg/block" + "github.com/thanos-io/thanos/pkg/compact" + "github.com/thanos-io/thanos/pkg/compact/downsample" + "github.com/thanos-io/thanos/pkg/objstore" + "github.com/thanos-io/thanos/pkg/objstore/inmem" + "github.com/thanos-io/thanos/pkg/testutil" +) + +func TestCleanupCompactCacheFolder(t *testing.T) { + ctx, logger, dir, _, bkt, actReg := bootstrap(t) + defer func() { testutil.Ok(t, os.RemoveAll(dir)) }() + + sy, err := compact.NewSyncer(logger, actReg, bkt, 0*time.Second, 1, false, nil) + testutil.Ok(t, err) + + expReg := prometheus.NewRegistry() + syncExp := prometheus.NewCounter(prometheus.CounterOpts{ + Name: compact.MetricSyncMetaName, + Help: compact.MetricSyncMetaHelp, + }) + expReg.MustRegister(syncExp) + + testutil.GatherAndCompare(t, expReg, actReg, compact.MetricSyncMetaName) + + comp, err := tsdb.NewLeveledCompactor(ctx, nil, logger, []int64{1}, nil) + testutil.Ok(t, err) + + bComp, err := compact.NewBucketCompactor(logger, sy, comp, dir, bkt, 1) + testutil.Ok(t, err) + + // Even with with a single uploaded block the bucker compactor needs to + // downloads the meta file to plan the compaction groups. + testutil.Ok(t, bComp.Compact(ctx)) + + syncExp.Inc() + + testutil.GatherAndCompare(t, expReg, actReg, compact.MetricSyncMetaName) + + _, err = os.Stat(dir) + testutil.Assert(t, os.IsNotExist(err), "index cache dir shouldn't not exist at the end of execution") + +} + +func TestCleanupIndexCacheFolder(t *testing.T) { + ctx, logger, dir, _, bkt, actReg := bootstrap(t) + defer func() { testutil.Ok(t, os.RemoveAll(dir)) }() + + expReg := prometheus.NewRegistry() + genIndexExp := prometheus.NewCounter(prometheus.CounterOpts{ + Name: MetricIndexGenerateName, + Help: MetricIndexGenerateHelp, + }) + expReg.MustRegister(genIndexExp) + + testutil.GatherAndCompare(t, expReg, actReg, compact.MetricSyncMetaName) + + testutil.Ok(t, genMissingIndexCacheFiles(ctx, logger, actReg, bkt, dir)) + + genIndexExp.Inc() + testutil.GatherAndCompare(t, expReg, actReg, compact.MetricSyncMetaName) + + _, err := os.Stat(dir) + testutil.Assert(t, os.IsNotExist(err), "index cache dir shouldn't not exist at the end of execution") +} + +func TestCleanupDownsampleCacheFolder(t *testing.T) { + ctx, logger, dir, blckID, bkt, reg := bootstrap(t) + defer func() { testutil.Ok(t, os.RemoveAll(dir)) }() + + meta, err := block.DownloadMeta(ctx, logger, bkt, blckID) + testutil.Ok(t, err) + + metrics := newDownsampleMetrics(reg) + testutil.Equals(t, 0.0, promtest.ToFloat64(metrics.downsamples.WithLabelValues(compact.GroupKey(meta)))) + testutil.Ok(t, downsampleBucket(ctx, logger, metrics, bkt, dir)) + testutil.Equals(t, 1.0, promtest.ToFloat64(metrics.downsamples.WithLabelValues(compact.GroupKey(meta)))) + + _, err = os.Stat(dir) + testutil.Assert(t, os.IsNotExist(err), "index cache dir shouldn't not exist at the end of execution") +} + +func bootstrap(t *testing.T) (context.Context, log.Logger, string, ulid.ULID, objstore.Bucket, *prometheus.Registry) { + logger := log.NewLogfmtLogger(os.Stderr) + dir, err := ioutil.TempDir("", "test-compact-cleanup") + testutil.Ok(t, err) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + bkt := inmem.NewBucket() + var blckID ulid.ULID + + // Create and upload a single block to the bucker. + // The compaction will download the meta block of + // this block to plan the compaction groups. + { + blckID, err = testutil.CreateBlock( + ctx, + dir, + []labels.Labels{ + {{Name: "a", Value: "1"}}, + }, + 1, 0, downsample.DownsampleRange0+1, // Pass the minimum DownsampleRange0 check. + labels.Labels{{Name: "e1", Value: "1"}}, + downsample.ResLevel0) + testutil.Ok(t, err) + testutil.Ok(t, block.Upload(ctx, logger, bkt, path.Join(dir, blckID.String()))) + } + + return ctx, logger, dir, blckID, bkt, prometheus.NewRegistry() +} diff --git a/go.mod b/go.mod index 8caf93352c..77568fe388 100644 --- a/go.mod +++ b/go.mod @@ -34,6 +34,7 @@ require ( github.com/opentracing/opentracing-go v1.1.0 github.com/pkg/errors v0.8.1 github.com/prometheus/client_golang v1.1.0 + github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4 github.com/prometheus/common v0.6.0 github.com/prometheus/prometheus v1.8.2-0.20190913102521-8ab628b35467 // v1.8.2 is misleading as Prometheus does not have v2 module. github.com/uber-go/atomic v1.4.0 // indirect diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index 6f0f14604b..348a0aa26b 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -67,13 +67,19 @@ type syncerMetrics struct { compactionFailures *prometheus.CounterVec } +const ( + MetricSyncMetaName = "thanos_compact_sync_meta_total" + MetricSyncMetaHelp = "Total number of sync meta operations." +) + func newSyncerMetrics(reg prometheus.Registerer) *syncerMetrics { var m syncerMetrics m.syncMetas = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "thanos_compact_sync_meta_total", - Help: "Total number of sync meta operations.", + Name: MetricSyncMetaName, + Help: MetricSyncMetaHelp, }) + m.syncMetaFailures = prometheus.NewCounter(prometheus.CounterOpts{ Name: "thanos_compact_sync_meta_failures_total", Help: "Total number of failed sync meta operations.", @@ -1001,6 +1007,11 @@ func NewBucketCompactor( // Compact runs compaction over bucket. func (c *BucketCompactor) Compact(ctx context.Context) error { + defer func() { + if err := os.RemoveAll(c.compactDir); err != nil { + level.Error(c.logger).Log("msg", "failed to remove compaction cache directory", "path", c.compactDir, "err", err) + } + }() // Loop over bucket and compact until there's no work left. for { var ( diff --git a/pkg/testutil/testutil.go b/pkg/testutil/testutil.go index 53ab6bbe59..f4a3933fb4 100644 --- a/pkg/testutil/testutil.go +++ b/pkg/testutil/testutil.go @@ -28,6 +28,9 @@ import ( "reflect" "runtime" "testing" + + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" ) // Assert fails the test if the condition is false. @@ -71,3 +74,25 @@ func Equals(tb testing.TB, exp, act interface{}, v ...interface{}) { tb.FailNow() } } + +// GatherAndCompare compares the metrics of a Gatherers pair. +func GatherAndCompare(t *testing.T, g1 prometheus.Gatherer, g2 prometheus.Gatherer, filter string) { + g1m, err := g1.Gather() + Ok(t, err) + g2m, err := g2.Gather() + Ok(t, err) + + var m1 *dto.MetricFamily + for _, m := range g1m { + if *m.Name == filter { + m1 = m + } + } + var m2 *dto.MetricFamily + for _, m := range g2m { + if *m.Name == filter { + m2 = m + } + } + Equals(t, m1.String(), m2.String()) +}