diff --git a/test/e2e/compact_test.go b/test/e2e/compact_test.go index c9311133a4..b8af42b5e7 100644 --- a/test/e2e/compact_test.go +++ b/test/e2e/compact_test.go @@ -56,6 +56,8 @@ type blockDesc struct { extLset labels.Labels mint int64 maxt int64 + + markedForNoCompact bool } func (b *blockDesc) Create(ctx context.Context, dir string, delay time.Duration) (ulid.ULID, error) { @@ -131,6 +133,39 @@ func TestCompactWithStoreGateway(t *testing.T) { mint: timestamp.FromTime(now.Add(8 * time.Hour)), maxt: timestamp.FromTime(now.Add(10 * time.Hour)), }, + // Non overlapping blocks, ready for compaction, with one blocked marked for no-compact (no-compact-mark.json) + blockDesc{ + series: []labels.Labels{labels.FromStrings("a", "1", "b", "2")}, + extLset: labels.FromStrings("case", "compaction-ready-one-block-marked-for-no-compact", "replica", "1"), + mint: timestamp.FromTime(now), + maxt: timestamp.FromTime(now.Add(2 * time.Hour)), + }, + blockDesc{ + series: []labels.Labels{labels.FromStrings("a", "1", "b", "3")}, + extLset: labels.FromStrings("case", "compaction-ready-one-block-marked-for-no-compact", "replica", "1"), + mint: timestamp.FromTime(now.Add(2 * time.Hour)), + maxt: timestamp.FromTime(now.Add(4 * time.Hour)), + }, + blockDesc{ + series: []labels.Labels{labels.FromStrings("a", "1", "b", "4")}, + extLset: labels.FromStrings("case", "compaction-ready-one-block-marked-for-no-compact", "replica", "1"), + mint: timestamp.FromTime(now.Add(4 * time.Hour)), + maxt: timestamp.FromTime(now.Add(6 * time.Hour)), + + markedForNoCompact: true, + }, + blockDesc{ + series: []labels.Labels{labels.FromStrings("a", "1", "b", "5")}, + extLset: labels.FromStrings("case", "compaction-ready-one-block-marked-for-no-compact", "replica", "1"), + mint: timestamp.FromTime(now.Add(6 * time.Hour)), + maxt: timestamp.FromTime(now.Add(8 * time.Hour)), + }, + blockDesc{ + series: []labels.Labels{labels.FromStrings("a", "1", "b", "6")}, + extLset: labels.FromStrings("case", "compaction-ready-one-block-marked-for-no-compact", "replica", "1"), + mint: timestamp.FromTime(now.Add(8 * time.Hour)), + maxt: timestamp.FromTime(now.Add(10 * time.Hour)), + }, // Non overlapping blocks, ready for compaction, only after deduplication. blockDesc{ @@ -314,6 +349,9 @@ func TestCompactWithStoreGateway(t *testing.T) { testutil.Ok(t, err) testutil.Ok(t, objstore.UploadDir(ctx, logger, bkt, path.Join(dir, id.String()), id.String())) rawBlockIDs[id] = struct{}{} + if b.markedForNoCompact { + testutil.Ok(t, block.MarkForNoCompact(ctx, logger, bkt, id, metadata.ManualNoCompactReason, "why not", promauto.With(nil).NewCounter(prometheus.CounterOpts{}))) + } } { // On top of that, add couple of other tricky cases with different meta. @@ -415,6 +453,11 @@ func TestCompactWithStoreGateway(t *testing.T) { {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "4", "case": "compaction-ready", "replica": "1"}}, {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "5", "case": "compaction-ready", "replica": "1"}}, {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "6", "case": "compaction-ready", "replica": "1"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "2", "case": "compaction-ready-one-block-marked-for-no-compact", "replica": "1"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "3", "case": "compaction-ready-one-block-marked-for-no-compact", "replica": "1"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "4", "case": "compaction-ready-one-block-marked-for-no-compact", "replica": "1"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "5", "case": "compaction-ready-one-block-marked-for-no-compact", "replica": "1"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "6", "case": "compaction-ready-one-block-marked-for-no-compact", "replica": "1"}}, {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "2", "case": "compaction-ready-after-dedup", "replica": "1"}}, {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "3", "case": "compaction-ready-after-dedup", "replica": "1"}}, {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "4", "case": "compaction-ready-after-dedup", "replica": "1"}}, @@ -455,6 +498,11 @@ func TestCompactWithStoreGateway(t *testing.T) { {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "4", "case": "compaction-ready"}}, {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "5", "case": "compaction-ready"}}, {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "6", "case": "compaction-ready", "replica": "1"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "2", "case": "compaction-ready-one-block-marked-for-no-compact"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "3", "case": "compaction-ready-one-block-marked-for-no-compact"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "4", "case": "compaction-ready-one-block-marked-for-no-compact", "replica": "1"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "5", "case": "compaction-ready-one-block-marked-for-no-compact", "replica": "1"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "6", "case": "compaction-ready-one-block-marked-for-no-compact", "replica": "1"}}, {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "2", "case": "compaction-ready-after-dedup"}}, {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "3", "case": "compaction-ready-after-dedup"}}, {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "4", "case": "compaction-ready-after-dedup"}}, @@ -532,20 +580,20 @@ func TestCompactWithStoreGateway(t *testing.T) { testutil.Ok(t, c.WaitSumMetrics(e2e.Greater(0), "thanos_compactor_iterations_total")) testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_blocks_cleaned_total")) testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_block_cleanup_failures_total")) - testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(2*4+2+2*3), "thanos_compactor_blocks_marked_total")) + testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(2*4+2+2*3+2), "thanos_compactor_blocks_marked_total")) // 18. testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_aborted_partial_uploads_deletion_attempts_total")) - testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(5), "thanos_compact_group_compactions_total")) + testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(6), "thanos_compact_group_compactions_total")) testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(3), "thanos_compact_group_vertical_compactions_total")) testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compact_group_compactions_failures_total")) - testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(12), "thanos_compact_group_compaction_runs_started_total")) - testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(12), "thanos_compact_group_compaction_runs_completed_total")) + testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(14), "thanos_compact_group_compaction_runs_started_total")) + testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(14), "thanos_compact_group_compaction_runs_completed_total")) testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compact_downsample_total")) testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compact_downsample_failures_total")) testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(float64( len(rawBlockIDs)+7+ - 5+ // 5 compactions, 5 newly added blocks. + 6+ // 6 compactions, 6 newly added blocks. -2, // Partial block removed. )), "thanos_blocks_meta_synced")) testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(0), "thanos_blocks_meta_sync_failures_total")) @@ -566,7 +614,7 @@ func TestCompactWithStoreGateway(t *testing.T) { expectedEndVector, ) // Store view: - testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(float64(len(rawBlockIDs)+7+5-2)), "thanos_blocks_meta_synced")) + testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(float64(len(rawBlockIDs)+7+6-2)), "thanos_blocks_meta_synced")) testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(0), "thanos_blocks_meta_sync_failures_total")) testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(0), "thanos_blocks_meta_modified")) }) @@ -579,20 +627,20 @@ func TestCompactWithStoreGateway(t *testing.T) { // NOTE: We cannot assert on intermediate `thanos_blocks_meta_` metrics as those are gauge and change dynamically due to many // compaction groups. Wait for at least first compaction iteration (next is in 5m). testutil.Ok(t, c.WaitSumMetrics(e2e.Greater(0), "thanos_compactor_iterations_total")) - testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(16), "thanos_compactor_blocks_cleaned_total")) + testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(18), "thanos_compactor_blocks_cleaned_total")) testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_block_cleanup_failures_total")) - testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_blocks_marked_for_deletion_total")) + testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_blocks_marked_total")) testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_aborted_partial_uploads_deletion_attempts_total")) testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compact_group_compactions_total")) testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compact_group_vertical_compactions_total")) testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compact_group_compactions_failures_total")) - testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(6), "thanos_compact_group_compaction_runs_started_total")) - testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(6), "thanos_compact_group_compaction_runs_completed_total")) + testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(7), "thanos_compact_group_compaction_runs_started_total")) + testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(7), "thanos_compact_group_compaction_runs_completed_total")) testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compact_downsample_total")) testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compact_downsample_failures_total")) - testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(float64(len(rawBlockIDs)+7+5-16-2)), "thanos_blocks_meta_synced")) + testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(float64(len(rawBlockIDs)+7+6-18-2)), "thanos_blocks_meta_synced")) testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(0), "thanos_blocks_meta_sync_failures_total")) testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_halted")) @@ -612,7 +660,7 @@ func TestCompactWithStoreGateway(t *testing.T) { ) // Store view: - testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(float64(len(rawBlockIDs)+7-16+5-2)), "thanos_blocks_meta_synced")) + testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(float64(len(rawBlockIDs)+7-18+6-2)), "thanos_blocks_meta_synced")) testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(0), "thanos_blocks_meta_sync_failures_total")) testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(0), "thanos_blocks_meta_modified")) }) diff --git a/test/e2e/query_test.go b/test/e2e/query_test.go index ef7f5bf211..90db116f03 100644 --- a/test/e2e/query_test.go +++ b/test/e2e/query_test.go @@ -212,7 +212,7 @@ func TestQueryExternalPrefix(t *testing.T) { testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(q)) - querierURL := urlParse(t, "http://"+q.HTTPEndpoint()+"/"+externalPrefix) + querierURL := mustURLParse(t, "http://"+q.HTTPEndpoint()+"/"+externalPrefix) querierProxy := httptest.NewServer(e2ethanos.NewSingleHostReverseProxy(querierURL, externalPrefix)) t.Cleanup(querierProxy.Close) @@ -241,7 +241,7 @@ func TestQueryExternalPrefixAndRoutePrefix(t *testing.T) { testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(q)) - querierURL := urlParse(t, "http://"+q.HTTPEndpoint()+"/"+routePrefix) + querierURL := mustURLParse(t, "http://"+q.HTTPEndpoint()+"/"+routePrefix) querierProxy := httptest.NewServer(e2ethanos.NewSingleHostReverseProxy(querierURL, externalPrefix)) t.Cleanup(querierProxy.Close) @@ -368,7 +368,7 @@ func checkNetworkRequests(t *testing.T, addr string) { testutil.Ok(t, err) } -func urlParse(t *testing.T, addr string) *url.URL { +func mustURLParse(t *testing.T, addr string) *url.URL { u, err := url.Parse(addr) testutil.Ok(t, err) @@ -384,7 +384,7 @@ func instantQuery(t *testing.T, ctx context.Context, addr string, q string, opts logger := log.NewLogfmtLogger(os.Stdout) logger = log.With(logger, "ts", log.DefaultTimestampUTC) testutil.Ok(t, runutil.RetryWithLog(logger, time.Second, ctx.Done(), func() error { - res, warnings, err := promclient.NewDefaultClient().QueryInstant(ctx, urlParse(t, "http://"+addr), q, time.Now(), opts) + res, warnings, err := promclient.NewDefaultClient().QueryInstant(ctx, mustURLParse(t, "http://"+addr), q, time.Now(), opts) if err != nil { return err } @@ -429,7 +429,7 @@ func labelNames(t *testing.T, ctx context.Context, addr string, start, end int64 logger := log.NewLogfmtLogger(os.Stdout) logger = log.With(logger, "ts", log.DefaultTimestampUTC) testutil.Ok(t, runutil.RetryWithLog(logger, time.Second, ctx.Done(), func() error { - res, err := promclient.NewDefaultClient().LabelNamesInGRPC(ctx, urlParse(t, "http://"+addr), start, end) + res, err := promclient.NewDefaultClient().LabelNamesInGRPC(ctx, mustURLParse(t, "http://"+addr), start, end) if err != nil { return err } @@ -448,7 +448,7 @@ func labelValues(t *testing.T, ctx context.Context, addr, label string, start, e logger := log.NewLogfmtLogger(os.Stdout) logger = log.With(logger, "ts", log.DefaultTimestampUTC) testutil.Ok(t, runutil.RetryWithLog(logger, time.Second, ctx.Done(), func() error { - res, err := promclient.NewDefaultClient().LabelValuesInGRPC(ctx, urlParse(t, "http://"+addr), label, start, end) + res, err := promclient.NewDefaultClient().LabelValuesInGRPC(ctx, mustURLParse(t, "http://"+addr), label, start, end) if err != nil { return err } @@ -466,7 +466,7 @@ func series(t *testing.T, ctx context.Context, addr string, matchers []storepb.L logger := log.NewLogfmtLogger(os.Stdout) logger = log.With(logger, "ts", log.DefaultTimestampUTC) testutil.Ok(t, runutil.RetryWithLog(logger, time.Second, ctx.Done(), func() error { - res, err := promclient.NewDefaultClient().SeriesInGRPC(ctx, urlParse(t, "http://"+addr), matchers, start, end) + res, err := promclient.NewDefaultClient().SeriesInGRPC(ctx, mustURLParse(t, "http://"+addr), matchers, start, end) if err != nil { return err } @@ -485,7 +485,7 @@ func rangeQuery(t *testing.T, ctx context.Context, addr string, q string, start, logger := log.NewLogfmtLogger(os.Stdout) logger = log.With(logger, "ts", log.DefaultTimestampUTC) testutil.Ok(t, runutil.RetryWithLog(logger, time.Second, ctx.Done(), func() error { - res, warnings, err := promclient.NewDefaultClient().QueryRange(ctx, urlParse(t, "http://"+addr), q, start, end, step, opts) + res, warnings, err := promclient.NewDefaultClient().QueryRange(ctx, mustURLParse(t, "http://"+addr), q, start, end, step, opts) if err != nil { return err } diff --git a/test/e2e/rule_test.go b/test/e2e/rule_test.go index ba9b244fbe..fd119edc4c 100644 --- a/test/e2e/rule_test.go +++ b/test/e2e/rule_test.go @@ -12,7 +12,6 @@ import ( "io/ioutil" "net/http" "net/http/httptest" - "net/url" "os" "path/filepath" "sync" @@ -322,6 +321,7 @@ func TestRule_AlertmanagerHTTPClient(t *testing.T) { } func TestRule(t *testing.T) { + t.Skip("Flaky test. Fix it. See: https://github.com/thanos-io/thanos/issues/3425.") t.Parallel() s, err := e2e.NewScenario("e2e_test_rule") @@ -547,7 +547,7 @@ func TestRule(t *testing.T) { }, } - alrts, err := promclient.NewDefaultClient().AlertmanagerAlerts(ctx, mustUrlParse(t, "http://"+am2.HTTPEndpoint())) + alrts, err := promclient.NewDefaultClient().AlertmanagerAlerts(ctx, mustURLParse(t, "http://"+am2.HTTPEndpoint())) testutil.Ok(t, err) testutil.Equals(t, len(expAlertLabels), len(alrts)) @@ -556,12 +556,6 @@ func TestRule(t *testing.T) { } } -func mustUrlParse(t *testing.T, addr string) *url.URL { - u, err := url.Parse(addr) - testutil.Ok(t, err) - return u -} - // Test Ruler behavior on different storepb.PartialResponseStrategy when having partial response from single `failingStoreAPI`. func TestRulePartialResponse(t *testing.T) { t.Skip("TODO: Allow HTTP ports from binaries running on host to be accessible.") diff --git a/test/e2e/rules_api_test.go b/test/e2e/rules_api_test.go index 164e5bfbfd..d67c775e3a 100644 --- a/test/e2e/rules_api_test.go +++ b/test/e2e/rules_api_test.go @@ -135,7 +135,7 @@ func ruleAndAssert(t *testing.T, ctx context.Context, addr string, typ string, w fmt.Println("ruleAndAssert: Waiting for results for rules type", typ) var result []*rulespb.RuleGroup testutil.Ok(t, runutil.Retry(time.Second, ctx.Done(), func() error { - res, err := promclient.NewDefaultClient().RulesInGRPC(ctx, urlParse(t, "http://"+addr), typ) + res, err := promclient.NewDefaultClient().RulesInGRPC(ctx, mustURLParse(t, "http://"+addr), typ) if err != nil { return err }