Skip to content

Commit

Permalink
added tests
Browse files Browse the repository at this point in the history
Signed-off-by: Krasi Georgiev <[email protected]>
  • Loading branch information
krasi-georgiev committed Oct 2, 2019
1 parent bb1610f commit 3646e13
Show file tree
Hide file tree
Showing 6 changed files with 173 additions and 57 deletions.
16 changes: 14 additions & 2 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ func runCompact(

// Generate index file.
if generateMissingIndexCacheFiles {
if err := genMissingIndexCacheFiles(ctx, logger, bkt, indexCacheDir); err != nil {
if err := genMissingIndexCacheFiles(ctx, logger, reg, bkt, indexCacheDir); err != nil {
return err
}
}
Expand Down Expand Up @@ -323,8 +323,19 @@ func runCompact(
return nil
}

const (
MetricIndexGenerateName = "thanos_compact_generated_index_total"
MetricIndexGenerateHelp = "Total number of generated indexes."
)

// genMissingIndexCacheFiles scans over all blocks, generates missing index cache files and uploads them to object storage.
func genMissingIndexCacheFiles(ctx context.Context, logger log.Logger, bkt objstore.Bucket, dir string) error {
func genMissingIndexCacheFiles(ctx context.Context, logger log.Logger, reg *prometheus.Registry, bkt objstore.Bucket, dir string) error {
genIndex := prometheus.NewCounter(prometheus.CounterOpts{
Name: MetricIndexGenerateName,
Help: MetricIndexGenerateHelp,
})
reg.MustRegister(genIndex)

if err := os.RemoveAll(dir); err != nil {
return errors.Wrap(err, "clean index cache directory")
}
Expand Down Expand Up @@ -375,6 +386,7 @@ func genMissingIndexCacheFiles(ctx context.Context, logger log.Logger, bkt objst
if err := generateIndexCacheFile(ctx, bkt, logger, dir, meta); err != nil {
return err
}
genIndex.Inc()
}

level.Info(logger).Log("msg", "generating index cache files is done, you can remove startup argument `index.generate-missing-cache-file`")
Expand Down
150 changes: 150 additions & 0 deletions cmd/thanos/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package main

import (
"context"
"io/ioutil"
"os"
"path"

"testing"
"time"

"github.com/go-kit/kit/log"
"github.com/oklog/ulid"
"github.com/prometheus/client_golang/prometheus"
promtest "github.com/prometheus/client_golang/prometheus/testutil"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/labels"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/compact"
"github.com/thanos-io/thanos/pkg/compact/downsample"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/objstore/inmem"
"github.com/thanos-io/thanos/pkg/testutil"
)

func TestCleanupCompactCacheFolder(t *testing.T) {
ctx, logger, dir, _, bkt, actReg := bootstrap(t)
defer func() { testutil.Ok(t, os.RemoveAll(dir)) }()

sy, err := compact.NewSyncer(logger, actReg, bkt, 0*time.Second, 1, false)
testutil.Ok(t, err)

expReg := prometheus.NewRegistry()
syncExp := prometheus.NewCounter(prometheus.CounterOpts{
Name: compact.MetricSyncMetaName,
Help: compact.MetricSyncMetaHelp,
})
expReg.MustRegister(syncExp)

GatherAndCompare(t, expReg, actReg, compact.MetricSyncMetaName)

comp, err := tsdb.NewLeveledCompactor(ctx, nil, logger, []int64{1}, nil)
testutil.Ok(t, err)

bComp, err := compact.NewBucketCompactor(logger, sy, comp, dir, bkt, 1)
testutil.Ok(t, err)

// Even with with a single uploaded block the bucker compactor needs to
// downloads the meta file to plan the compaction groups.
testutil.Ok(t, bComp.Compact(ctx))

syncExp.Inc()

GatherAndCompare(t, expReg, actReg, compact.MetricSyncMetaName)

_, err = os.Stat(dir)
testutil.Assert(t, os.IsNotExist(err), "index cache dir shouldn't not exist at the end of execution")

}

func TestCleanupIndexCacheFolder(t *testing.T) {
ctx, logger, dir, _, bkt, actReg := bootstrap(t)
defer func() { testutil.Ok(t, os.RemoveAll(dir)) }()

expReg := prometheus.NewRegistry()
genIndexExp := prometheus.NewCounter(prometheus.CounterOpts{
Name: MetricIndexGenerateName,
Help: MetricIndexGenerateHelp,
})
expReg.MustRegister(genIndexExp)

GatherAndCompare(t, expReg, actReg, compact.MetricSyncMetaName)

genMissingIndexCacheFiles(ctx, logger, actReg, bkt, dir)

genIndexExp.Inc()
GatherAndCompare(t, expReg, actReg, compact.MetricSyncMetaName)

_, err := os.Stat(dir)
testutil.Assert(t, os.IsNotExist(err), "index cache dir shouldn't not exist at the end of execution")
}

func TestCleanupDownsampleCacheFolder(t *testing.T) {
ctx, logger, dir, blckID, bkt, reg := bootstrap(t)
defer func() { testutil.Ok(t, os.RemoveAll(dir)) }()

meta, err := block.DownloadMeta(ctx, logger, bkt, blckID)
testutil.Ok(t, err)

metrics := newDownsampleMetrics(reg)
testutil.Equals(t, 0.0, promtest.ToFloat64(metrics.downsamples.WithLabelValues(compact.GroupKey(meta))))
testutil.Ok(t, downsampleBucket(ctx, logger, metrics, bkt, dir))
testutil.Equals(t, 1.0, promtest.ToFloat64(metrics.downsamples.WithLabelValues(compact.GroupKey(meta))))

_, err = os.Stat(dir)
testutil.Assert(t, os.IsNotExist(err), "index cache dir shouldn't not exist at the end of execution")
}

func bootstrap(t *testing.T) (context.Context, log.Logger, string, ulid.ULID, objstore.Bucket, *prometheus.Registry) {
logger := log.NewLogfmtLogger(os.Stderr)
dir, err := ioutil.TempDir("", "test-compact-cleanup")
testutil.Ok(t, err)

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

bkt := inmem.NewBucket()
var blckID ulid.ULID

// Create and upload a single block to the bucker.
// The compaction will download the meta block of this block
// to plan the compaction groups.
{
blckID, err = testutil.CreateBlock(
ctx,
dir,
[]labels.Labels{
{{Name: "a", Value: "1"}},
},
1, 0, downsample.DownsampleRange0+1, // Pass the minimum DownsampleRange0 check.
labels.Labels{{Name: "e1", Value: "1"}},
downsample.ResLevel0)
testutil.Ok(t, err)
testutil.Ok(t, block.Upload(ctx, logger, bkt, path.Join(dir, blckID.String())))
}

return ctx, logger, dir, blckID, bkt, prometheus.NewRegistry()
}

func GatherAndCompare(t *testing.T, g1 prometheus.Gatherer, g2 prometheus.Gatherer, filter string) {
g1m, err := g1.Gather()
testutil.Ok(t, err)
g2m, err := g2.Gather()
testutil.Ok(t, err)

var m1 *dto.MetricFamily
for _, m := range g1m {
if *m.Name == filter {
m1 = m
}
}
var m2 *dto.MetricFamily
for _, m := range g2m {
if *m.Name == filter {
m2 = m
}
}
testutil.Equals(t, m1.String(), m2.String())
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ require (
github.com/opentracing/opentracing-go v1.1.0
github.com/pkg/errors v0.8.1
github.com/prometheus/client_golang v1.1.0
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4
github.com/prometheus/common v0.6.0
github.com/prometheus/prometheus v1.8.2-0.20190913102521-8ab628b35467 // v1.8.2 is misleading as Prometheus does not have v2 module.
github.com/uber-go/atomic v1.4.0 // indirect
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -448,8 +448,6 @@ github.com/uber-go/atomic v1.4.0 h1:yOuPqEq4ovnhEjpHmfFwsqBXDYbQeT6Nb0bwD6XnD5o=
github.com/uber-go/atomic v1.4.0/go.mod h1:/Ct5t2lcmbJ4OSe/waGBoaVvVqtO0bmtfVNex1PFV8g=
github.com/uber/jaeger-client-go v2.16.0+incompatible h1:Q2Pp6v3QYiocMxomCaJuwQGFt7E53bPYqEgug/AoBtY=
github.com/uber/jaeger-client-go v2.16.0+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk=
github.com/uber/jaeger-lib v2.0.0+incompatible h1:iMSCV0rmXEogjNWPh2D0xk9YVKvrtGoHJNe9ebLu/pw=
github.com/uber/jaeger-lib v2.0.0+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U=
github.com/uber/jaeger-lib v2.2.0+incompatible h1:MxZXOiR2JuoANZ3J6DE/U0kSFv/eJ/GfSYVCjK7dyaw=
github.com/uber/jaeger-lib v2.2.0+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U=
github.com/xlab/treeprint v0.0.0-20180616005107-d6fb6747feb6/go.mod h1:ce1O1j6UtZfjr22oyGxGLbauSBp2YVXpARAosm7dHBg=
Expand Down
10 changes: 8 additions & 2 deletions pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,19 @@ type syncerMetrics struct {
compactionFailures *prometheus.CounterVec
}

const (
MetricSyncMetaName = "thanos_compact_sync_meta_total"
MetricSyncMetaHelp = "Total number of sync meta operations."
)

func newSyncerMetrics(reg prometheus.Registerer) *syncerMetrics {
var m syncerMetrics

m.syncMetas = prometheus.NewCounter(prometheus.CounterOpts{
Name: "thanos_compact_sync_meta_total",
Help: "Total number of sync meta operations.",
Name: MetricSyncMetaName,
Help: MetricSyncMetaHelp,
})

m.syncMetaFailures = prometheus.NewCounter(prometheus.CounterOpts{
Name: "thanos_compact_sync_meta_failures_total",
Help: "Total number of failed sync meta operations.",
Expand Down
51 changes: 0 additions & 51 deletions pkg/compact/compact_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,12 @@ import (
"github.com/go-kit/kit/log"
"github.com/oklog/ulid"
"github.com/pkg/errors"
promtest "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/index"
"github.com/prometheus/prometheus/tsdb/labels"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/objstore/inmem"
"github.com/thanos-io/thanos/pkg/objstore/objtesting"
"github.com/thanos-io/thanos/pkg/testutil"
)
Expand Down Expand Up @@ -304,55 +302,6 @@ func TestGroup_Compact_e2e(t *testing.T) {
})
}

func TestCleanupCacheFolder(t *testing.T) {
logger := log.NewLogfmtLogger(os.Stderr)

dir, err := ioutil.TempDir("", "test-compact-cleanup")
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(dir)) }()

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

bkt := inmem.NewBucket()

// Create and upload a single block to the bucker.
// The compaction will download the meta block of this block
// to plan the compaction groups.
{
b, err := testutil.CreateBlock(
ctx,
dir,
[]labels.Labels{
{{Name: "a", Value: "1"}},
},
1, 0, 1,
labels.Labels{{Name: "e1", Value: "1"}},
1)
testutil.Ok(t, err)
testutil.Ok(t, block.Upload(ctx, logger, bkt, path.Join(dir, b.String())))
}

sy, err := NewSyncer(logger, nil, bkt, 0*time.Second, 1, false)
testutil.Ok(t, err)
testutil.Equals(t, 0.0, promtest.ToFloat64(sy.metrics.syncMetas))

comp, err := tsdb.NewLeveledCompactor(ctx, nil, logger, []int64{1}, nil)
testutil.Ok(t, err)

bComp, err := NewBucketCompactor(logger, sy, comp, dir, bkt, 1)
testutil.Ok(t, err)

// Even with with a single uploaded block the bucker compactor needs to
// downloads the meta file to plan the compaction groups.
testutil.Ok(t, bComp.Compact(ctx))
testutil.Equals(t, 1.0, promtest.ToFloat64(sy.metrics.syncMetas))

_, err = os.Stat(dir)
testutil.Assert(t, os.IsNotExist(err), "compaction cache dir shouldn't not exist after a compaction run")

}

// createEmptyBlock produces empty block like it was the case before fix: https://github.com/prometheus/tsdb/pull/374.
// (Prometheus pre v2.7.0)
func createEmptyBlock(dir string, mint int64, maxt int64, extLset labels.Labels, resolution int64) (ulid.ULID, error) {
Expand Down

0 comments on commit 3646e13

Please sign in to comment.