Skip to content

Commit

Permalink
distributor: add logs to flaky TestDistributor_Push_ShouldSupportWrit…
Browse files Browse the repository at this point in the history
…eBothToIngestersAndPartitions (#9323)

* distributor: add logs to flaky TestDistributor_Push_ShouldSupportWriteBothToIngestersAndPartitions

I suspect the 1s timeout can expire in CI. Let's inspect kafka in CI to check if that's the case or we have a bug.

Signed-off-by: Dimitar Dimitrov <[email protected]>

* More verbose

Signed-off-by: Dimitar Dimitrov <[email protected]>

---------

Signed-off-by: Dimitar Dimitrov <[email protected]>
  • Loading branch information
dimitarvdimitrov authored Sep 20, 2024
1 parent 61c1080 commit 3c4f00e
Showing 1 changed file with 10 additions and 1 deletion.
11 changes: 10 additions & 1 deletion pkg/distributor/distributor_ingest_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kerr"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/kmsg"
Expand Down Expand Up @@ -452,7 +453,15 @@ func TestDistributor_Push_ShouldSupportWriteBothToIngestersAndPartitions(t *test

// Ensure series has been correctly sharded to partitions.
actualSeriesByPartition := readAllMetricNamesByPartitionFromKafka(t, kafkaCluster.ListenAddrs(), testConfig.ingestStoragePartitions, time.Second)
assert.Equal(t, testData.expectedMetricsByPartition, actualSeriesByPartition)
if !assert.Equal(t, testData.expectedMetricsByPartition, actualSeriesByPartition, "please report this failure in https://github.com/grafana/mimir/issues/9299") {
// This test is sometimes flaky. Add a log line to help debug it.
// Inspect the offsets of partitions in Kafka. There may be records, but we couldn't fetch them in the 1s timeout above.
kafkaClient, err := kgo.NewClient(kgo.SeedBrokers(kafkaCluster.ListenAddrs()...))
assert.NoError(t, err)
offsets, err := kadm.NewClient(kafkaClient).ListEndOffsets(context.Background(), kafkaTopic)
assert.NoError(t, err)
t.Logf("Kafka topic %s end offsets: %#v", kafkaTopic, offsets)
}

// Ensure series have been correctly sharded to ingesters.
for _, ingester := range ingesters {
Expand Down

0 comments on commit 3c4f00e

Please sign in to comment.