diff --git a/functional_consumer_test.go b/functional_consumer_test.go index 135ee791b..8b31b45c5 100644 --- a/functional_consumer_test.go +++ b/functional_consumer_test.go @@ -106,6 +106,21 @@ func TestVersionMatrixLZ4(t *testing.T) { consumeMsgs(t, testVersions, producedMessages) } +// Support for zstd codec was introduced in v2.1.0.0 +func TestVersionMatrixZstd(t *testing.T) { + setupFunctionalTest(t) + defer teardownFunctionalTest(t) + + // Produce lot's of message with all possible combinations of supported + // protocol versions starting with v2.1.0.0 (first where zstd was supported) + testVersions := versionRange(V2_1_0_0) + allCodecs := []CompressionCodec{CompressionZSTD} + producedMessages := produceMsgs(t, testVersions, allCodecs, 17, 100, false) + + // When/Then + consumeMsgs(t, testVersions, producedMessages) +} + func TestVersionMatrixIdempotent(t *testing.T) { setupFunctionalTest(t) defer teardownFunctionalTest(t) diff --git a/zstd_test.go b/zstd_test.go deleted file mode 100644 index 8ebdec58c..000000000 --- a/zstd_test.go +++ /dev/null @@ -1,51 +0,0 @@ -package sarama - -import ( - "testing" -) - -func TestSaramaZSTD(t *testing.T) { - cfg := NewConfig() - cfg.ClientID = "sarama-zstd-test" - cfg.Producer.Return.Errors = true - cfg.Producer.Return.Successes = true - cfg.Producer.Retry.Max = 0 - cfg.Producer.Compression = CompressionZSTD - cfg.Version = V2_1_0_0 - kafkaHome := []string{"localhost:9092"} - topic := "my-zstd-topic" - - admin, err := NewClusterAdmin(kafkaHome, cfg) - if err != nil { - t.Fatal(err) - } - _ = admin.DeleteTopic(topic) - err = admin.CreateTopic(topic, &TopicDetail{NumPartitions: 1, ReplicationFactor: 1}, false) - if err.Error() != "kafka server: Topic with this name already exists. - Topic 'my-zstd-topic' already exists." { - t.Fatal(err) - } - defer func() { - if err := admin.Close(); err != nil { - t.Error(err) - } - }() - - // producer - producer, err := NewSyncProducer(kafkaHome, cfg) - defer func() { - if err := producer.Close(); err != nil { - t.Error(err) - } - }() - if err != nil { - t.Fatalf("NewSyncProducer failed: %v", err) - } - _, _, err = producer.SendMessage( - &ProducerMessage{ - Topic: topic, - Value: StringEncoder("hello world!"), - }) - if err != nil { - t.Errorf("TEST: sending message failed: %v\n", err) - } -}