From ecf1456fed38bf339a37adceae437621aaae9e5d Mon Sep 17 00:00:00 2001 From: jojohappy Date: Wed, 9 Oct 2019 19:03:18 +0800 Subject: [PATCH] Add e2e test Signed-off-by: jojohappy --- pkg/store/bucket_e2e_test.go | 10 ++- pkg/store/bucket_test.go | 66 ++++++++++++++++++-- test/e2e/spinup_test.go | 27 ++++++++ test/e2e/store_gateway_test.go | 110 +++++++++++++++++++++++++++++++++ 4 files changed, 203 insertions(+), 10 deletions(-) diff --git a/pkg/store/bucket_e2e_test.go b/pkg/store/bucket_e2e_test.go index 1c4d006cf8e..433a5498679 100644 --- a/pkg/store/bucket_e2e_test.go +++ b/pkg/store/bucket_e2e_test.go @@ -126,7 +126,7 @@ func prepareTestBlocks(t testing.TB, now time.Time, count int, dir string, bkt o return } -func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, manyParts bool, maxSampleCount uint64) *storeSuite { +func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, manyParts bool, maxSampleCount uint64, relabelConfig []*relabel.Config) *storeSuite { series := []labels.Labels{ labels.FromStrings("a", "1", "b", "1"), labels.FromStrings("a", "1", "b", "2"), @@ -138,7 +138,6 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m labels.FromStrings("a", "2", "c", "2"), } extLset := labels.FromStrings("ext1", "value1") - relabelConfig := make([]*relabel.Config, 0) blocks, minTime, maxTime := prepareTestBlocks(t, time.Now(), 3, dir, bkt, series, extLset) @@ -393,7 +392,7 @@ func TestBucketStore_e2e(t *testing.T) { testutil.Ok(t, err) defer func() { testutil.Ok(t, os.RemoveAll(dir)) }() - s := prepareStoreWithTestBlocks(t, dir, bkt, false, 0) + s := prepareStoreWithTestBlocks(t, dir, bkt, false, 0, emptyRelabelConfig) defer s.Close() t.Log("Test with no index cache") @@ -442,7 +441,7 @@ func TestBucketStore_ManyParts_e2e(t *testing.T) { testutil.Ok(t, err) defer func() { testutil.Ok(t, os.RemoveAll(dir)) }() - s := prepareStoreWithTestBlocks(t, dir, bkt, true, 0) + s := prepareStoreWithTestBlocks(t, dir, bkt, true, 0, emptyRelabelConfig) defer s.Close() indexCache, err := storecache.NewIndexCache(s.logger, nil, storecache.Opts{ @@ -476,7 +475,6 @@ func TestBucketStore_TimePartitioning_e2e(t *testing.T) { labels.FromStrings("a", "1", "b", "2"), } extLset := labels.FromStrings("ext1", "value1") - relabelConfig := make([]*relabel.Config, 0) _, minTime, _ := prepareTestBlocks(t, time.Now(), 3, dir, bkt, series, extLset) @@ -487,7 +485,7 @@ func TestBucketStore_TimePartitioning_e2e(t *testing.T) { &FilterConfig{ MinTime: minTimeDuration, MaxTime: filterMaxTime, - }, relabelConfig) + }, emptyRelabelConfig) testutil.Ok(t, err) err = store.SyncBlocks(ctx) diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index e22c4f754bf..2c984af976d 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -31,6 +31,8 @@ import ( "gopkg.in/yaml.v2" ) +var emptyRelabelConfig = make([]*relabel.Config, 0) + func TestBucketBlock_Property(t *testing.T) { parameters := gopter.DefaultTestParameters() parameters.Rng.Seed(2000) @@ -428,8 +430,7 @@ func TestBucketStore_Info(t *testing.T) { dir, err := ioutil.TempDir("", "bucketstore-test") testutil.Ok(t, err) - relabelConfig := make([]*relabel.Config, 0) - bucketStore, err := NewBucketStore(nil, nil, nil, dir, noopCache{}, 2e5, 0, 0, false, 20, filterConf, relabelConfig) + bucketStore, err := NewBucketStore(nil, nil, nil, dir, noopCache{}, 2e5, 0, 0, false, 20, filterConf, emptyRelabelConfig) testutil.Ok(t, err) resp, err := bucketStore.Info(ctx, &storepb.InfoRequest{}) @@ -447,7 +448,6 @@ func TestBucketStore_isBlockInMinMaxRange(t *testing.T) { series := []labels.Labels{labels.FromStrings("a", "1", "b", "1")} extLset := labels.FromStrings("ext1", "value1") - relabelConfig := make([]*relabel.Config, 0) // Create a block in range [-2w, -1w]. id1, err := testutil.CreateBlock(ctx, dir, series, 10, @@ -488,7 +488,7 @@ func TestBucketStore_isBlockInMinMaxRange(t *testing.T) { &FilterConfig{ MinTime: minTimeDuration, MaxTime: hourBefore, - }, relabelConfig) + }, emptyRelabelConfig) testutil.Ok(t, err) inRange, err := bucketStore.isBlockInMinMaxRange(context.TODO(), id1) @@ -579,3 +579,61 @@ func TestBucketStore_selectorBlocks(t *testing.T) { testutil.Equals(t, sc.exceptedIds, ids) } } + +func TestBucketStore_InfoWithLabels(t *testing.T) { + defer leaktest.CheckTimeout(t, 10*time.Second)() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + dir, err := ioutil.TempDir("", "bucketstore-test") + testutil.Ok(t, err) + + bkt := inmem.NewBucket() + series := []labels.Labels{labels.FromStrings("a", "1", "b", "1")} + + logger := log.NewNopLogger() + id1, err := testutil.CreateBlock(ctx, dir, series, 10, 0, 1000, labels.Labels{{Name: "cluster", Value: "A"}}, 0) + testutil.Ok(t, err) + testutil.Ok(t, objstore.UploadFile(ctx, logger, bkt, filepath.Join(dir, id1.String(), block.IndexFilename), path.Join(id1.String(), block.IndexFilename))) + + id2, err := testutil.CreateBlock(ctx, dir, series, 10, 0, 1000, labels.Labels{{Name: "cluster", Value: "B"}}, 0) + testutil.Ok(t, err) + testutil.Ok(t, objstore.UploadFile(ctx, logger, bkt, filepath.Join(dir, id2.String(), block.IndexFilename), path.Join(id2.String(), block.IndexFilename))) + + id3, err := testutil.CreateBlock(ctx, dir, series, 10, 0, 1000, labels.Labels{{Name: "cluster", Value: "B"}}, 0) + testutil.Ok(t, err) + testutil.Ok(t, objstore.UploadFile(ctx, logger, bkt, filepath.Join(dir, id3.String(), block.IndexFilename), path.Join(id3.String(), block.IndexFilename))) + + relabelContentYaml := ` + - action: drop + regex: "A" + source_labels: + - cluster + ` + var relabelConfig []*relabel.Config + err = yaml.Unmarshal([]byte(relabelContentYaml), &relabelConfig) + testutil.Ok(t, err) + bucketStore, err := NewBucketStore(nil, nil, bkt, dir, noopCache{}, 2e5, 0, 0, false, 20, filterConf, relabelConfig) + testutil.Ok(t, err) + + err = bucketStore.SyncBlocks(ctx) + testutil.Ok(t, err) + + resp, err := bucketStore.Info(ctx, &storepb.InfoRequest{}) + testutil.Ok(t, err) + + testutil.Equals(t, storepb.StoreType_STORE, resp.StoreType) + testutil.Equals(t, int64(0), resp.MinTime) + testutil.Equals(t, int64(1000), resp.MaxTime) + testutil.Equals(t, []storepb.LabelSet{ + storepb.LabelSet{ + Labels: []storepb.Label{ + { + Name: "cluster", + Value: "B", + }, + }, + }, + }, resp.LabelSets) +} diff --git a/test/e2e/spinup_test.go b/test/e2e/spinup_test.go index 3200da34570..c5c3ae3b78e 100644 --- a/test/e2e/spinup_test.go +++ b/test/e2e/spinup_test.go @@ -269,6 +269,33 @@ func storeGateway(http, grpc address, bucketConfig []byte) *serverScheduler { } } +func storeGatewayWithRelabel(http, grpc address, bucketConfig []byte, relabelConfig []byte) *serverScheduler { + return &serverScheduler{ + HTTP: http, + GRPC: grpc, + schedule: func(workDir string) (Exec, error) { + dbDir := path.Join(workDir, "data", "store-gateway", http.Port) + + if err := os.MkdirAll(dbDir, 0777); err != nil { + return nil, errors.Wrap(err, "creating store gateway dir failed") + } + + return newCmdExec(exec.Command("thanos", + "store", + "--debug.name", fmt.Sprintf("store-gw-%s", http.Port), + "--data-dir", dbDir, + "--grpc-address", grpc.HostPort(), + "--http-address", http.HostPort(), + "--log.level", "debug", + "--objstore.config", string(bucketConfig), + // Accelerated sync time for quicker test (3m by default). + "--sync-block-duration", "5s", + "--selector.relabel-config", string(relabelConfig), + )), nil + }, + } +} + func alertManager(http address) *serverScheduler { return &serverScheduler{ HTTP: http, diff --git a/test/e2e/store_gateway_test.go b/test/e2e/store_gateway_test.go index abf1b00b693..593a95c5122 100644 --- a/test/e2e/store_gateway_test.go +++ b/test/e2e/store_gateway_test.go @@ -168,3 +168,113 @@ func TestStoreGateway(t *testing.T) { "ext1": "value1", }, res[0].Metric) } + +func TestStoreGateway_SelectorBlock(t *testing.T) { + a := newLocalAddresser() + minioAddr := a.New() + + s3Config := s3.Config{ + Bucket: "test-storegateway-selectorblock", + AccessKey: "abc", + SecretKey: "mightysecret", + Endpoint: minioAddr.HostPort(), + Insecure: true, + } + + bucketConfig := client.BucketConfig{ + Type: client.S3, + Config: s3Config, + } + + config, err := yaml.Marshal(bucketConfig) + testutil.Ok(t, err) + + relabelContentYaml := ` + - action: drop + regex: "A" + source_labels: + - cluster + ` + s := storeGatewayWithRelabel(a.New(), a.New(), config, []byte(relabelContentYaml)) + q := querier(a.New(), a.New(), []address{s.GRPC}, nil) + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + + exit, err := e2eSpinupWithS3ObjStorage(t, ctx, minioAddr, &s3Config, s, q) + if err != nil { + t.Errorf("spinup failed: %v", err) + cancel() + return + } + + defer func() { + cancel() + <-exit + }() + + dir, err := ioutil.TempDir("", "test_store_gateway_selectorblock_local") + testutil.Ok(t, err) + defer func() { testutil.Ok(t, os.RemoveAll(dir)) }() + + series := []labels.Labels{ + labels.FromStrings("a", "1", "b", "2"), + } + extLset := labels.FromStrings("cluster", "A", "replica", "1") + extLset2 := labels.FromStrings("cluster", "B", "replica", "2") + extLset3 := labels.FromStrings("cluster", "A", "replica", "3") + + now := time.Now() + id1, err := testutil.CreateBlock(ctx, dir, series, 10, timestamp.FromTime(now), timestamp.FromTime(now.Add(2*time.Hour)), extLset, 0) + testutil.Ok(t, err) + + id2, err := testutil.CreateBlock(ctx, dir, series, 10, timestamp.FromTime(now), timestamp.FromTime(now.Add(2*time.Hour)), extLset2, 0) + testutil.Ok(t, err) + + id3, err := testutil.CreateBlock(ctx, dir, series, 10, timestamp.FromTime(now), timestamp.FromTime(now.Add(2*time.Hour)), extLset3, 0) + testutil.Ok(t, err) + + l := log.NewLogfmtLogger(os.Stdout) + + bkt, err := s3.NewBucketWithConfig(l, s3Config, "test-feed-1") + testutil.Ok(t, err) + + testutil.Ok(t, objstore.UploadDir(ctx, l, bkt, path.Join(dir, id1.String()), id1.String())) + testutil.Ok(t, objstore.UploadDir(ctx, l, bkt, path.Join(dir, id2.String()), id2.String())) + testutil.Ok(t, objstore.UploadDir(ctx, l, bkt, path.Join(dir, id3.String()), id3.String())) + + var res model.Vector + + testutil.Ok(t, runutil.Retry(time.Second, ctx.Done(), func() error { + select { + case <-exit: + cancel() + return nil + default: + } + + var ( + err error + warnings []string + ) + res, warnings, err = promclient.QueryInstant(ctx, nil, urlParse(t, q.HTTP.URL()), "{a=\"1\"}", time.Now(), promclient.QueryOptions{ + Deduplicate: true, + }) + if err != nil { + return err + } + + if len(warnings) > 0 { + return errors.Errorf("unexpected warnings %s", warnings) + } + + if len(res) != 2 { + return errors.Errorf("unexpected result size %d", len(res)) + } + return nil + })) + + testutil.Equals(t, model.Metric{ + "a": "1", + "b": "2", + "cluster": "B", + }, res[0].Metric) +}