From e08853334a6554fd9f70feff2e26cac6e2630ab0 Mon Sep 17 00:00:00 2001 From: Douglas Camata <159076+douglascamata@users.noreply.github.com> Date: Thu, 18 May 2023 13:20:22 +0200 Subject: [PATCH] e2e(query): Reproduce dedup issue from #6257 Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com> --- test/e2e/query_test.go | 147 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 147 insertions(+) diff --git a/test/e2e/query_test.go b/test/e2e/query_test.go index 9c6a8d946ac..dd92ef49829 100644 --- a/test/e2e/query_test.go +++ b/test/e2e/query_test.go @@ -849,6 +849,153 @@ func TestSidecarStorePushdown(t *testing.T) { }) } +func TestQueryStoreDedup(t *testing.T) { + t.Parallel() + + e, err := e2e.New(e2e.WithName("storededup")) + testutil.Ok(t, err) + t.Cleanup(e2ethanos.CleanScenario(t, e)) + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) + t.Cleanup(cancel) + + bucket := "store-gw-dedup-test" + minio := e2edb.NewMinio(e, "thanos-minio", bucket, e2edb.WithMinioTLS()) + testutil.Ok(t, e2e.StartAndWaitReady(minio)) + + l := log.NewLogfmtLogger(os.Stdout) + bkt, err := s3.NewBucketWithConfig(l, e2ethanos.NewS3Config(bucket, minio.Endpoint("http"), minio.Dir()), "test") + testutil.Ok(t, err) + + storeGW := e2ethanos.NewStoreGW( + e, + "s1", + client.BucketConfig{ + Type: client.S3, + Config: e2ethanos.NewS3Config(bucket, minio.InternalEndpoint("http"), minio.InternalDir()), + }, + "", + nil, + ) + testutil.Ok(t, e2e.StartAndWaitReady(storeGW)) + + tests := []struct { + replicas []string + extLabel string + intLabel string + desc string + blockFinder string + }{ + { + desc: "Deduplication works with external label", + replicas: []string{"a", "b"}, + extLabel: "replica", + blockFinder: "dedupext", + }, + { + desc: "Deduplication works with internal label", + replicas: []string{"a", "b"}, + intLabel: "replica", + blockFinder: "dedupint", + }, + { + desc: "Deduplication works with both internal and external label", + replicas: []string{"a", "b"}, + intLabel: "replica", + extLabel: "receive_replica", + blockFinder: "dedupintext", + }, + } + + var totalBlocks int + for _, tt := range tests { + createSimpleReplicatedBlocksInS3(ctx, t, e, l, bkt, tt.replicas, tt.extLabel, tt.intLabel, tt.blockFinder) + totalBlocks += len(tt.replicas) + } + testutil.Ok(t, storeGW.WaitSumMetrics(e2emon.Equals(float64(totalBlocks)), "thanos_blocks_meta_synced")) + + for _, tt := range tests { + t.Run(tt.desc, func(t *testing.T) { + querierBuilder := e2ethanos.NewQuerierBuilder(e, tt.blockFinder, storeGW.InternalEndpoint("grpc")).WithProxyStrategy("lazy") + var replicaLabels []string + if tt.intLabel != "" { + replicaLabels = append(replicaLabels, tt.intLabel) + } + if tt.extLabel != "" { + replicaLabels = append(replicaLabels, tt.extLabel) + } + if len(replicaLabels) > 0 { + sort.Strings(replicaLabels) + querierBuilder = querierBuilder.WithReplicaLabels(replicaLabels...) + } + querier := querierBuilder.Init() + testutil.Ok(t, e2e.StartAndWaitReady(querier)) + + instantQuery(t, ctx, querier.Endpoint("http"), func() string { + return fmt.Sprintf("max_over_time(simple_series{instance='foo_0', block_finder='%s'}[2h])", tt.blockFinder) + }, time.Now, promclient.QueryOptions{ + Deduplicate: true, + }, 1) + testutil.Ok(t, err) + testutil.Ok(t, querier.Stop()) + }) + } +} + +func createSimpleReplicatedBlocksInS3( + ctx context.Context, + t *testing.T, + dockerEnv *e2e.DockerEnvironment, + logger log.Logger, + bucket *s3.Bucket, + replicas []string, + extReplicaLabel string, + intReplicaLabel string, + blockFinder string, +) { + blockSizes := []struct { + samples int + series int + name string + }{ + {samples: 1, series: 1, name: "simple_series"}, + } + now := time.Now() + dir := filepath.Join(dockerEnv.SharedDir(), "tmp") + testutil.Ok(t, os.MkdirAll(dir, os.ModePerm)) + for _, replica := range replicas { + for _, blockSize := range blockSizes { + series := make([]labels.Labels, blockSize.series) + for i := 0; i < blockSize.series; i++ { + bigSeriesLabels := labels.FromStrings("__name__", blockSize.name, "instance", fmt.Sprintf("foo_%d", i), "block_finder", blockFinder) + if intReplicaLabel != "" { + bigSeriesLabels = append(bigSeriesLabels, labels.Label{Name: intReplicaLabel, Value: replica}) + } + sort.Sort(bigSeriesLabels) + series[i] = bigSeriesLabels + } + extLabels := labels.FromStrings("prometheus", "p1") + if extReplicaLabel != "" { + extLabels = append(extLabels, labels.Label{Name: extReplicaLabel, Value: replica}) + } + blockID, err := e2eutil.CreateBlockWithBlockDelay(ctx, + dir, + series, + blockSize.samples, + timestamp.FromTime(now), + timestamp.FromTime(now.Add(2*time.Hour)), + 30*time.Minute, + extLabels, + 0, + metadata.NoneFunc, + ) + testutil.Ok(t, err) + blockPath := path.Join(dir, blockID.String()) + testutil.Ok(t, objstore.UploadDir(ctx, logger, bucket, blockPath, blockID.String())) + } + } +} + func TestSidecarQueryEvaluation(t *testing.T) { t.Parallel()