From 205bb678e56d2c05606d365fa5f7d18a7ef90ea7 Mon Sep 17 00:00:00 2001 From: Mihai Alexandrescu Date: Mon, 31 Jul 2023 10:31:24 +0300 Subject: [PATCH 1/9] webhook test logic with temporary structure until kubeconfig PR gets merged --- tests/e2e/go.mod | 61 +++--- tests/e2e/kafkatopic_webhook.go | 318 ++++++++++++++++++++++++++++ tests/e2e/koperator_suite_test.go | 1 + tests/e2e/templates/topic.yaml.tmpl | 10 +- 4 files changed, 359 insertions(+), 31 deletions(-) create mode 100644 tests/e2e/kafkatopic_webhook.go diff --git a/tests/e2e/go.mod b/tests/e2e/go.mod index 29b3191f0..772dbda35 100644 --- a/tests/e2e/go.mod +++ b/tests/e2e/go.mod @@ -35,14 +35,43 @@ require ( github.com/eapache/go-resiliency v1.3.0 // indirect github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 // indirect github.com/eapache/queue v1.1.0 // indirect - github.com/emicklei/go-restful/v3 v3.9.0 // indirect - github.com/evanphx/json-patch v5.6.0+incompatible // indirect github.com/evanphx/json-patch/v5 v5.6.0 // indirect github.com/fatih/color v1.13.0 // indirect + github.com/go-logr/zapr v1.2.3 // indirect + github.com/golang/snappy v0.0.4 // indirect + github.com/hashicorp/go-uuid v1.0.3 // indirect + github.com/huandu/xstrings v1.3.3 // indirect + github.com/iancoleman/orderedmap v0.2.0 // indirect + github.com/jcmturner/aescts/v2 v2.0.0 // indirect + github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect + github.com/jcmturner/gofork v1.7.6 // indirect + github.com/jcmturner/gokrb5/v8 v8.4.3 // indirect + github.com/jcmturner/rpc/v2 v2.0.3 // indirect + github.com/klauspost/compress v1.16.3 // indirect + github.com/mattn/go-colorable v0.1.13 // indirect + github.com/mattn/go-isatty v0.0.16 // indirect + github.com/mitchellh/copystructure v1.2.0 // indirect + github.com/onsi/ginkgo v1.16.5 // indirect + github.com/pavlo-v-chernykh/keystore-go/v4 v4.4.1 // indirect + github.com/pierrec/lz4/v4 v4.1.17 // indirect + github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect + github.com/spf13/cast v1.4.1 // indirect + github.com/tidwall/gjson v1.9.3 // indirect + github.com/tidwall/match v1.1.1 // indirect + github.com/tidwall/pretty v1.2.0 // indirect + github.com/twmb/franz-go/pkg/kmsg v1.4.0 // indirect + github.com/wayneashleyberry/terminal-dimensions v1.0.0 // indirect + go.uber.org/zap v1.24.0 // indirect + golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1 // indirect + sigs.k8s.io/controller-runtime v0.14.6 // indirect +) + +require ( + github.com/emicklei/go-restful/v3 v3.9.0 // indirect + github.com/evanphx/json-patch v5.6.0+incompatible // indirect github.com/ghodss/yaml v1.0.1-0.20190212211648-25d852aebe32 // indirect github.com/go-errors/errors v1.0.2-0.20180813162953-d98b870cc4e0 // indirect github.com/go-logr/logr v1.2.4 // indirect - github.com/go-logr/zapr v1.2.3 // indirect github.com/go-openapi/jsonpointer v0.19.5 // indirect github.com/go-openapi/jsonreference v0.20.0 // indirect github.com/go-openapi/swag v0.19.14 // indirect @@ -50,7 +79,6 @@ require ( github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.3 // indirect - github.com/golang/snappy v0.0.4 // indirect github.com/google/gnostic v0.6.9 // indirect github.com/google/go-cmp v0.5.9 // indirect github.com/google/gofuzz v1.2.0 // indirect @@ -59,52 +87,28 @@ require ( github.com/gruntwork-io/go-commons v0.8.0 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect - github.com/hashicorp/go-uuid v1.0.3 // indirect - github.com/huandu/xstrings v1.3.3 // indirect - github.com/iancoleman/orderedmap v0.2.0 // indirect github.com/imdario/mergo v0.3.13 // indirect - github.com/jcmturner/aescts/v2 v2.0.0 // indirect - github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect - github.com/jcmturner/gofork v1.7.6 // indirect - github.com/jcmturner/gokrb5/v8 v8.4.3 // indirect - github.com/jcmturner/rpc/v2 v2.0.3 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect - github.com/klauspost/compress v1.16.3 // indirect github.com/mailru/easyjson v0.7.6 // indirect - github.com/mattn/go-colorable v0.1.13 // indirect - github.com/mattn/go-isatty v0.0.16 // indirect github.com/mattn/go-zglob v0.0.2-0.20190814121620-e3c945676326 // indirect - github.com/mitchellh/copystructure v1.2.0 // indirect github.com/mitchellh/go-homedir v1.1.0 // indirect github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/moby/spdystream v0.2.0 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect - github.com/onsi/ginkgo v1.16.5 // indirect - github.com/pavlo-v-chernykh/keystore-go/v4 v4.4.1 // indirect - github.com/pierrec/lz4/v4 v4.1.17 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/pquerna/otp v1.2.0 // indirect - github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect - github.com/spf13/cast v1.4.1 // indirect github.com/spf13/pflag v1.0.5 // indirect github.com/stretchr/testify v1.8.1 // indirect - github.com/tidwall/gjson v1.9.3 // indirect - github.com/tidwall/match v1.1.1 // indirect - github.com/tidwall/pretty v1.2.0 // indirect - github.com/twmb/franz-go/pkg/kmsg v1.4.0 // indirect github.com/urfave/cli v1.22.2 // indirect - github.com/wayneashleyberry/terminal-dimensions v1.0.0 // indirect go.uber.org/atomic v1.9.0 // indirect go.uber.org/multierr v1.6.0 // indirect - go.uber.org/zap v1.24.0 // indirect golang.org/x/crypto v0.7.0 // indirect - golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1 // indirect golang.org/x/net v0.10.0 // indirect golang.org/x/oauth2 v0.4.0 // indirect golang.org/x/sys v0.8.0 // indirect @@ -122,7 +126,6 @@ require ( k8s.io/klog/v2 v2.80.1 // indirect k8s.io/kube-openapi v0.0.0-20221207184640-f3cff1453715 // indirect k8s.io/utils v0.0.0-20221128185143-99ec85e7a448 // indirect - sigs.k8s.io/controller-runtime v0.14.6 // indirect sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2 // indirect sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect ) diff --git a/tests/e2e/kafkatopic_webhook.go b/tests/e2e/kafkatopic_webhook.go new file mode 100644 index 000000000..825bde6eb --- /dev/null +++ b/tests/e2e/kafkatopic_webhook.go @@ -0,0 +1,318 @@ +// Copyright © 2023 Cisco Systems, Inc. and/or its affiliates +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package e2e + +import ( + "bytes" + "fmt" + "os" + "strings" + "text/template" + + "github.com/Masterminds/sprig" + "github.com/gruntwork-io/terratest/modules/k8s" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +// TODO(mihalexa): move to k8s.go +const ( + dryRunStrategyNone string = "none" + dryRunStrategyClient string = "client" + dryRunStrategyServer string = "server" +) + +// TODO(mihalexa): move to k8s.go +// applyK8sResourceFromTemplateWithDryRun is copy of applyK8sResourceFromTemplate which calls a "--dry-run=" kubectl command +func applyK8sResourceFromTemplateWithDryRun(kubectlOptions k8s.KubectlOptions, templateFile string, values map[string]interface{}, dryRunStrategy string) (string, error) { + By(fmt.Sprintf("Generating K8s manifest from template %s for dry-run apply", templateFile)) + var manifest bytes.Buffer + rawTemplate, err := os.ReadFile(templateFile) + if err != nil { + return "", err + } + t := template.Must(template.New("template").Funcs(sprig.TxtFuncMap()).Parse(string(rawTemplate))) + err = t.Execute(&manifest, values) + if err != nil { + return "", err + } + + By("Replicating terratest's k8s.KubectlApplyFromStringE") + tmpfile, err := k8s.StoreConfigToTempFileE(GinkgoT(), manifest.String()) + if err != nil { + return "", err + } + defer os.Remove(tmpfile) + + return k8s.RunKubectlAndGetOutputE(GinkgoT(), &kubectlOptions, "apply", "-f", tmpfile, "--dry-run="+dryRunStrategy, "--output=yaml") +} + +func testWebhooks() bool { + return When("Testing webhooks", func() { + // temporary section; to be refactored after kubeconfig injection PR + var kubectlOptions k8s.KubectlOptions + var err error + kubectlOptions, err = kubectlOptionsForCurrentContext() + if err != nil { + GinkgoT().Fail() + } + + testWebhookKafkaTopic(kubectlOptions) + }) +} + +func testWebhookKafkaTopic(kubectlOptions k8s.KubectlOptions) { + kubectlOptions.Namespace = koperatorLocalHelmDescriptor.Namespace + requireCreatingKafkaCluster(kubectlOptions, "../../config/samples/simplekafkacluster.yaml") + testWebhookCreateKafkaTopic(kubectlOptions) + testWebhookUpdateKafkaTopic(kubectlOptions) + requireDeleteKafkaCluster(kubectlOptions, kafkaClusterName) +} + +func testWebhookCreateKafkaTopic(kubectlOptions k8s.KubectlOptions) bool { + return When("Testing KafkaTopic Create", func() { + BeforeAll(func() { + Expect(isExistingK8SResource(kubectlOptions, kafkaKind, kafkaClusterName)).To(BeTrue()) + }) + + It("Test non-existent KafkaCluster", func() { + output, err := applyK8sResourceFromTemplateWithDryRun(kubectlOptions, + kafkaTopicTemplate, + map[string]interface{}{ + "Name": testInternalTopicName, + "TopicName": testInternalTopicName, + "Namespace": kubectlOptions.Namespace, + "ClusterRef": map[string]string{ + "Name": kafkaClusterName + "NOT", + "Namespace": kubectlOptions.Namespace, + }, + }, + dryRunStrategyServer, + ) + Expect(err).To(HaveOccurred()) + // Example error: The KafkaTopic "topic-test-internal" is invalid: spec.clusterRef.name: Invalid value: "kafkaNOT": kafkaCluster 'kafkaNOT' in the namespace 'kafka' does not exist + Expect(len(strings.Split(output, "\n"))).To(Equal(1)) + Expect(output).To( + ContainSubstring("spec.clusterRef.name: Invalid value: %[1]q: kafkaCluster '%[1]s' in the namespace '%[2]s' does not exist", + kafkaClusterName+"NOT", kubectlOptions.Namespace)) + }) + + It("Test 0 partitions and replicationFactor", func() { + output, err := applyK8sResourceFromTemplateWithDryRun(kubectlOptions, + kafkaTopicTemplate, + map[string]interface{}{ + "Name": testInternalTopicName, + "TopicName": testInternalTopicName, + "Partition": "0", + "ReplicationFactor": "0", + }, + dryRunStrategyServer, + ) + Expect(err).To(HaveOccurred()) + // Example error: + // The KafkaTopic "topic-test-internal" is invalid: + // * spec.partitions: Invalid value: 0: number of partitions must be larger than 0 (or set it to be -1 to use the broker's default) + // * spec.replicationFactor: Invalid value: 0: replication factor must be larger than 0 (or set it to be -1 to use the broker's default) + Expect(len(strings.Split(output, "\n"))).To(Equal(3)) + Expect(output).To(And( + ContainSubstring("spec.partitions: Invalid value: %s: number of partitions must be larger than 0 (or set it to be -1 to use the broker's default)", "0"), + ContainSubstring("spec.replicationFactor: Invalid value: %s: replication factor must be larger than 0 (or set it to be -1 to use the broker's default)", "0"))) + }) + + // In the current validation webhook implementation, this case can only be encountered on a Create operation + It("Test ReplicationFactor larger than number of brokers", func() { + output, err := applyK8sResourceFromTemplateWithDryRun(kubectlOptions, + kafkaTopicTemplate, + map[string]interface{}{ + "Name": testInternalTopicName, + "TopicName": testInternalTopicName, + "ReplicationFactor": "10", + }, + dryRunStrategyServer, + ) + Expect(err).To(HaveOccurred()) + // Example error: + // The KafkaTopic "topic-test-internal" is invalid: spec.replicationFactor: Invalid value: 10: replication factor is larger than the number of nodes in the kafka cluster (available brokers: 3) + Expect(len(strings.Split(output, "\n"))).To(Equal(1)) + Expect(output).To(And( + ContainSubstring("spec.replicationFactor: Invalid value"), ContainSubstring("replication factor is larger than the number of nodes in the kafka cluster"))) + }) + + // Test case involving existing CRs but not necessarily an Update operation + When("Testing conflicts similar CRs", func() { + requireDeployingKafkaTopic(kubectlOptions, testInternalTopicName) + + It("Testing conflict on spec.name", func() { + By("With managedBy koperator annotation") + output, err := applyK8sResourceFromTemplateWithDryRun(kubectlOptions, + kafkaTopicTemplate, + map[string]interface{}{ + "Name": testInternalTopicName + "-different-cr-name", + "TopicName": testInternalTopicName, + "Namespace": kubectlOptions.Namespace, + "Annotations": []string{"managedBy: koperator"}, + }, + dryRunStrategyServer, + ) + Expect(err).To(HaveOccurred()) + // Example error: + // The KafkaTopic "topic-test-internal-different-cr-name" is invalid: spec.name: Invalid value: "topic-test-internal": kafkaTopic CR 'topic-test-internal' in namesapce 'kafka' is already referencing to Kafka topic 'topic-test-internal' + Expect(len(strings.Split(output, "\n"))).To(Equal(1)) + Expect(output).To( + ContainSubstring("spec.name: Invalid value: %[1]q: kafkaTopic CR '%[1]s' in namesapce '%[2]s' is already referencing to Kafka topic '%[1]s'", + testInternalTopicName, kubectlOptions.Namespace)) + + By("Without managedBy koperator annotation") + output, err = applyK8sResourceFromTemplateWithDryRun(kubectlOptions, + kafkaTopicTemplate, + map[string]interface{}{ + "Name": testInternalTopicName + "-different-cr-name", + "TopicName": testInternalTopicName, + "Namespace": kubectlOptions.Namespace, + }, + dryRunStrategyServer, + ) + Expect(err).To(HaveOccurred()) + // Example error: + // The KafkaTopic "topic-test-internal-different-cr-name" is invalid: + // * spec.name: Invalid value: "topic-test-internal": kafkaTopic CR 'topic-test-internal' in namesapce 'kafka' is already referencing to Kafka topic 'topic-test-internal' + // * spec.name: Invalid value: "topic-test-internal": topic "topic-test-internal" already exists on kafka cluster and it is not managed by Koperator, + // if you want it to be managed by Koperator so you can modify its configurations through a KafkaTopic CR, + // add this "managedBy: koperator" annotation to this KafkaTopic CR + Expect(len(strings.Split(output, "\n"))).To(Equal(5)) + Expect(output).To(And( + ContainSubstring("spec.name: Invalid value: %[1]q: kafkaTopic CR '%[1]s' in namesapce '%[2]s' is already referencing to Kafka topic '%[1]s'", + testInternalTopicName, kubectlOptions.Namespace), + ContainSubstring("spec.name: Invalid value: %[1]q: topic %[1]q already exists on kafka cluster and it is not managed by Koperator", + testInternalTopicName))) + + }) + + requireDeleteKafkaTopic(kubectlOptions, testInternalTopicName) + }) + }) +} + +func testWebhookUpdateKafkaTopic(kubectlOptions k8s.KubectlOptions) bool { + return When("Testing KafkaTopic Update", func() { + BeforeAll(func() { + Expect(isExistingK8SResource(kubectlOptions, kafkaKind, kafkaClusterName)).To(BeTrue()) + }) + + // Update operation implies having a CR with the same name in place + requireDeployingKafkaTopic(kubectlOptions, testInternalTopicName) + + It("Test non-existent KafkaCluster", func() { + output, err := applyK8sResourceFromTemplateWithDryRun(kubectlOptions, + kafkaTopicTemplate, + map[string]interface{}{ + "Name": testInternalTopicName, + "TopicName": testInternalTopicName, + "Namespace": kubectlOptions.Namespace, + "ClusterRef": map[string]string{ + "Name": kafkaClusterName + "NOT", + "Namespace": kubectlOptions.Namespace, + }, + }, + dryRunStrategyServer, + ) + Expect(err).To(HaveOccurred()) + // Example error: The KafkaTopic "topic-test-internal" is invalid: spec.clusterRef.name: Invalid value: "kafkaNOT": kafkaCluster 'kafkaNOT' in the namespace 'kafka' does not exist + Expect(len(strings.Split(output, "\n"))).To(Equal(1)) + Expect(output).To( + ContainSubstring("spec.clusterRef.name: Invalid value: %[1]q: kafkaCluster '%[1]s' in the namespace '%[2]s' does not exist", + kafkaClusterName+"NOT", kubectlOptions.Namespace)) + }) + + // A successfully created KafkaTopic CR cannot have 0 for either Partition or ReplicationFactor. + // At the same time, during an Update, a KafkaTopic cannot have its: + // * spec.partitions decreased + // * spec.replicationFactor changed (not just decreased) + // Consequently, an Update test for 0 values will automatically also cover the decreasing/changing scenarios. + It("Test 0 values partitions and replicationFactor", func() { + output, err := applyK8sResourceFromTemplateWithDryRun(kubectlOptions, + kafkaTopicTemplate, + map[string]interface{}{ + "Name": testInternalTopicName, + "TopicName": testInternalTopicName, + "Partition": "0", + "ReplicationFactor": "0", + }, + dryRunStrategyServer, + ) + Expect(err).To(HaveOccurred()) + // Example error: + // The KafkaTopic "topic-test-internal" is invalid: + // * spec.partitions: Invalid value: 0: number of partitions must be larger than 0 (or set it to be -1 to use the broker's default) + // * spec.replicationFactor: Invalid value: 0: replication factor must be larger than 0 (or set it to be -1 to use the broker's default) + // * spec.partitions: Invalid value: 0: kafka does not support decreasing partition count on an existing topic (from 2 to 0) + // * spec.replicationFactor: Invalid value: 0: kafka does not support changing the replication factor on an existing topic (from 2 to 0) + Expect(len(strings.Split(output, "\n"))).To(Equal(5)) + Expect(output).To(And( + ContainSubstring("spec.partitions: Invalid value: 0: number of partitions must be larger than 0"), + ContainSubstring("spec.replicationFactor: Invalid value: 0: replication factor must be larger than 0"), + ContainSubstring("spec.partitions: Invalid value: %s: kafka does not support decreasing partition count on an existing topic", "0"), + ContainSubstring("spec.replicationFactor: Invalid value: %s: kafka does not support changing the replication factor on an existing topic", "0"))) + }) + + It("Testing conflict on spec.name", func() { + By("With managedBy koperator annotation") + output, err := applyK8sResourceFromTemplateWithDryRun(kubectlOptions, + kafkaTopicTemplate, + map[string]interface{}{ + "Name": testInternalTopicName + "-different-cr-name", + "TopicName": testInternalTopicName, // same spec.name as the KafkaTopic deployed in the beginning of the Update test scenario + "Namespace": kubectlOptions.Namespace, + "Annotations": []string{"managedBy: koperator"}, + }, + dryRunStrategyServer, + ) + Expect(err).To(HaveOccurred()) + // Example error: + // The KafkaTopic "topic-test-internal-different-cr-name" is invalid: spec.name: Invalid value: "topic-test-internal": kafkaTopic CR 'topic-test-internal' in namesapce 'kafka' is already referencing to Kafka topic 'topic-test-internal' + Expect(len(strings.Split(output, "\n"))).To(Equal(1)) + Expect(output).To( + ContainSubstring("spec.name: Invalid value: %q: kafkaTopic CR '%s' in namesapce '%s' is already referencing to Kafka topic '%s'", + testInternalTopicName, testInternalTopicName, kubectlOptions.Namespace, testInternalTopicName)) + + By("Without managedBy koperator annotation") + output, err = applyK8sResourceFromTemplateWithDryRun(kubectlOptions, + kafkaTopicTemplate, + map[string]interface{}{ + "Name": testInternalTopicName + "-different-cr-name", + "TopicName": testInternalTopicName, // same spec.name as the KafkaTopic deployed in the beginning of the Update test scenario + "Namespace": kubectlOptions.Namespace, + }, + dryRunStrategyServer, + ) + Expect(err).To(HaveOccurred()) + // Example error: + // The KafkaTopic "topic-test-internal-different-cr-name" is invalid: + // * spec.name: Invalid value: "topic-test-internal": kafkaTopic CR 'topic-test-internal' in namesapce 'kafka' is already referencing to Kafka topic 'topic-test-internal' + // * spec.name: Invalid value: "topic-test-internal": topic "topic-test-internal" already exists on kafka cluster and it is not managed by Koperator, + // if you want it to be managed by Koperator so you can modify its configurations through a KafkaTopic CR, + // add this "managedBy: koperator" annotation to this KafkaTopic CR + Expect(len(strings.Split(output, "\n"))).To(Equal(5)) + Expect(output).To(And( + ContainSubstring("spec.name: Invalid value: %[1]q: kafkaTopic CR '%[1]s' in namesapce '%[2]s' is already referencing to Kafka topic '%[1]s'", + testInternalTopicName, kubectlOptions.Namespace), + ContainSubstring("spec.name: Invalid value: %[1]q: topic %[1]q already exists on kafka cluster and it is not managed by Koperator", + testInternalTopicName))) + }) + + // Clean up the KafkaTopic set up to test Update operations against + requireDeleteKafkaTopic(kubectlOptions, testInternalTopicName) + }) +} diff --git a/tests/e2e/koperator_suite_test.go b/tests/e2e/koperator_suite_test.go index a7ee2f8c3..303e2674e 100644 --- a/tests/e2e/koperator_suite_test.go +++ b/tests/e2e/koperator_suite_test.go @@ -64,6 +64,7 @@ var _ = When("Testing e2e test altogether", Ordered, func() { testInstallKafkaCluster("../../config/samples/simplekafkacluster_ssl.yaml") testProduceConsumeInternalSSL(defaultTLSSecretName) testUninstallKafkaCluster() + testWebhooks() testUninstallZookeeperCluster() testUninstall() snapshotClusterAndCompare(snapshottedInfo) diff --git a/tests/e2e/templates/topic.yaml.tmpl b/tests/e2e/templates/topic.yaml.tmpl index 991baffe8..3f2680bf8 100644 --- a/tests/e2e/templates/topic.yaml.tmpl +++ b/tests/e2e/templates/topic.yaml.tmpl @@ -3,10 +3,16 @@ kind: KafkaTopic metadata: name: {{ .Name }} namespace: {{or .Namespace "kafka"}} + {{- if .Annotations }} + annotations: + {{- range .Annotations }} + {{ . }} + {{- end }} + {{- end }} spec: clusterRef: - name: kafka - namespace: kafka + name: {{or .ClusterRef.Name "kafka"}} + namespace: {{or .ClusterRef.Namespace "kafka"}} name: {{ .TopicName }} partitions: {{or .Partition 2}} replicationFactor: {{or .ReplicationFactor 2}} From 02700a1b0029e75810f2a51e7656ee0b30aa3339 Mon Sep 17 00:00:00 2001 From: Mihai Alexandrescu Date: Wed, 9 Aug 2023 22:07:13 +0300 Subject: [PATCH 2/9] fix typo in root kafkatopic validator; same typo cannot yet be fixed in e2e tests --- pkg/webhooks/kafkatopic_validator.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/webhooks/kafkatopic_validator.go b/pkg/webhooks/kafkatopic_validator.go index c0c509326..a46e4f742 100644 --- a/pkg/webhooks/kafkatopic_validator.go +++ b/pkg/webhooks/kafkatopic_validator.go @@ -253,7 +253,7 @@ func (s *KafkaTopicValidator) checkExistingKafkaTopicCRs(ctx context.Context, } } if foundKafkaTopic != nil { - logMsg := fmt.Sprintf("kafkaTopic CR '%s' in namesapce '%s' is already referencing to Kafka topic '%s'", + logMsg := fmt.Sprintf("kafkaTopic CR '%s' in namespace '%s' is already referencing to Kafka topic '%s'", foundKafkaTopic.Name, foundKafkaTopic.Namespace, foundKafkaTopic.Spec.Name) return field.Invalid(field.NewPath("spec").Child("name"), foundKafkaTopic.Spec.Name, logMsg), nil } From 7208e1b1a316dfe159711b42d7b6f6f8ae4b1f1a Mon Sep 17 00:00:00 2001 From: Mihai Alexandrescu Date: Fri, 18 Aug 2023 14:09:42 +0300 Subject: [PATCH 3/9] split some function calls into new lines better --- tests/e2e/kafkatopic_webhook.go | 55 ++++++++++++++++++++++----------- 1 file changed, 37 insertions(+), 18 deletions(-) diff --git a/tests/e2e/kafkatopic_webhook.go b/tests/e2e/kafkatopic_webhook.go index 825bde6eb..2729139dd 100644 --- a/tests/e2e/kafkatopic_webhook.go +++ b/tests/e2e/kafkatopic_webhook.go @@ -88,7 +88,8 @@ func testWebhookCreateKafkaTopic(kubectlOptions k8s.KubectlOptions) bool { }) It("Test non-existent KafkaCluster", func() { - output, err := applyK8sResourceFromTemplateWithDryRun(kubectlOptions, + output, err := applyK8sResourceFromTemplateWithDryRun( + kubectlOptions, kafkaTopicTemplate, map[string]interface{}{ "Name": testInternalTopicName, @@ -106,11 +107,13 @@ func testWebhookCreateKafkaTopic(kubectlOptions k8s.KubectlOptions) bool { Expect(len(strings.Split(output, "\n"))).To(Equal(1)) Expect(output).To( ContainSubstring("spec.clusterRef.name: Invalid value: %[1]q: kafkaCluster '%[1]s' in the namespace '%[2]s' does not exist", - kafkaClusterName+"NOT", kubectlOptions.Namespace)) + kafkaClusterName+"NOT", kubectlOptions.Namespace), + ) }) It("Test 0 partitions and replicationFactor", func() { - output, err := applyK8sResourceFromTemplateWithDryRun(kubectlOptions, + output, err := applyK8sResourceFromTemplateWithDryRun( + kubectlOptions, kafkaTopicTemplate, map[string]interface{}{ "Name": testInternalTopicName, @@ -128,12 +131,14 @@ func testWebhookCreateKafkaTopic(kubectlOptions k8s.KubectlOptions) bool { Expect(len(strings.Split(output, "\n"))).To(Equal(3)) Expect(output).To(And( ContainSubstring("spec.partitions: Invalid value: %s: number of partitions must be larger than 0 (or set it to be -1 to use the broker's default)", "0"), - ContainSubstring("spec.replicationFactor: Invalid value: %s: replication factor must be larger than 0 (or set it to be -1 to use the broker's default)", "0"))) + ContainSubstring("spec.replicationFactor: Invalid value: %s: replication factor must be larger than 0 (or set it to be -1 to use the broker's default)", "0"), + )) }) // In the current validation webhook implementation, this case can only be encountered on a Create operation It("Test ReplicationFactor larger than number of brokers", func() { - output, err := applyK8sResourceFromTemplateWithDryRun(kubectlOptions, + output, err := applyK8sResourceFromTemplateWithDryRun( + kubectlOptions, kafkaTopicTemplate, map[string]interface{}{ "Name": testInternalTopicName, @@ -147,7 +152,9 @@ func testWebhookCreateKafkaTopic(kubectlOptions k8s.KubectlOptions) bool { // The KafkaTopic "topic-test-internal" is invalid: spec.replicationFactor: Invalid value: 10: replication factor is larger than the number of nodes in the kafka cluster (available brokers: 3) Expect(len(strings.Split(output, "\n"))).To(Equal(1)) Expect(output).To(And( - ContainSubstring("spec.replicationFactor: Invalid value"), ContainSubstring("replication factor is larger than the number of nodes in the kafka cluster"))) + ContainSubstring("spec.replicationFactor: Invalid value"), + ContainSubstring("replication factor is larger than the number of nodes in the kafka cluster"), + )) }) // Test case involving existing CRs but not necessarily an Update operation @@ -156,7 +163,8 @@ func testWebhookCreateKafkaTopic(kubectlOptions k8s.KubectlOptions) bool { It("Testing conflict on spec.name", func() { By("With managedBy koperator annotation") - output, err := applyK8sResourceFromTemplateWithDryRun(kubectlOptions, + output, err := applyK8sResourceFromTemplateWithDryRun( + kubectlOptions, kafkaTopicTemplate, map[string]interface{}{ "Name": testInternalTopicName + "-different-cr-name", @@ -172,10 +180,12 @@ func testWebhookCreateKafkaTopic(kubectlOptions k8s.KubectlOptions) bool { Expect(len(strings.Split(output, "\n"))).To(Equal(1)) Expect(output).To( ContainSubstring("spec.name: Invalid value: %[1]q: kafkaTopic CR '%[1]s' in namesapce '%[2]s' is already referencing to Kafka topic '%[1]s'", - testInternalTopicName, kubectlOptions.Namespace)) + testInternalTopicName, kubectlOptions.Namespace), + ) By("Without managedBy koperator annotation") - output, err = applyK8sResourceFromTemplateWithDryRun(kubectlOptions, + output, err = applyK8sResourceFromTemplateWithDryRun( + kubectlOptions, kafkaTopicTemplate, map[string]interface{}{ "Name": testInternalTopicName + "-different-cr-name", @@ -196,7 +206,8 @@ func testWebhookCreateKafkaTopic(kubectlOptions k8s.KubectlOptions) bool { ContainSubstring("spec.name: Invalid value: %[1]q: kafkaTopic CR '%[1]s' in namesapce '%[2]s' is already referencing to Kafka topic '%[1]s'", testInternalTopicName, kubectlOptions.Namespace), ContainSubstring("spec.name: Invalid value: %[1]q: topic %[1]q already exists on kafka cluster and it is not managed by Koperator", - testInternalTopicName))) + testInternalTopicName), + )) }) @@ -215,7 +226,8 @@ func testWebhookUpdateKafkaTopic(kubectlOptions k8s.KubectlOptions) bool { requireDeployingKafkaTopic(kubectlOptions, testInternalTopicName) It("Test non-existent KafkaCluster", func() { - output, err := applyK8sResourceFromTemplateWithDryRun(kubectlOptions, + output, err := applyK8sResourceFromTemplateWithDryRun( + kubectlOptions, kafkaTopicTemplate, map[string]interface{}{ "Name": testInternalTopicName, @@ -233,7 +245,8 @@ func testWebhookUpdateKafkaTopic(kubectlOptions k8s.KubectlOptions) bool { Expect(len(strings.Split(output, "\n"))).To(Equal(1)) Expect(output).To( ContainSubstring("spec.clusterRef.name: Invalid value: %[1]q: kafkaCluster '%[1]s' in the namespace '%[2]s' does not exist", - kafkaClusterName+"NOT", kubectlOptions.Namespace)) + kafkaClusterName+"NOT", kubectlOptions.Namespace), + ) }) // A successfully created KafkaTopic CR cannot have 0 for either Partition or ReplicationFactor. @@ -242,7 +255,8 @@ func testWebhookUpdateKafkaTopic(kubectlOptions k8s.KubectlOptions) bool { // * spec.replicationFactor changed (not just decreased) // Consequently, an Update test for 0 values will automatically also cover the decreasing/changing scenarios. It("Test 0 values partitions and replicationFactor", func() { - output, err := applyK8sResourceFromTemplateWithDryRun(kubectlOptions, + output, err := applyK8sResourceFromTemplateWithDryRun( + kubectlOptions, kafkaTopicTemplate, map[string]interface{}{ "Name": testInternalTopicName, @@ -264,12 +278,14 @@ func testWebhookUpdateKafkaTopic(kubectlOptions k8s.KubectlOptions) bool { ContainSubstring("spec.partitions: Invalid value: 0: number of partitions must be larger than 0"), ContainSubstring("spec.replicationFactor: Invalid value: 0: replication factor must be larger than 0"), ContainSubstring("spec.partitions: Invalid value: %s: kafka does not support decreasing partition count on an existing topic", "0"), - ContainSubstring("spec.replicationFactor: Invalid value: %s: kafka does not support changing the replication factor on an existing topic", "0"))) + ContainSubstring("spec.replicationFactor: Invalid value: %s: kafka does not support changing the replication factor on an existing topic", "0"), + )) }) It("Testing conflict on spec.name", func() { By("With managedBy koperator annotation") - output, err := applyK8sResourceFromTemplateWithDryRun(kubectlOptions, + output, err := applyK8sResourceFromTemplateWithDryRun( + kubectlOptions, kafkaTopicTemplate, map[string]interface{}{ "Name": testInternalTopicName + "-different-cr-name", @@ -285,10 +301,12 @@ func testWebhookUpdateKafkaTopic(kubectlOptions k8s.KubectlOptions) bool { Expect(len(strings.Split(output, "\n"))).To(Equal(1)) Expect(output).To( ContainSubstring("spec.name: Invalid value: %q: kafkaTopic CR '%s' in namesapce '%s' is already referencing to Kafka topic '%s'", - testInternalTopicName, testInternalTopicName, kubectlOptions.Namespace, testInternalTopicName)) + testInternalTopicName, testInternalTopicName, kubectlOptions.Namespace, testInternalTopicName), + ) By("Without managedBy koperator annotation") - output, err = applyK8sResourceFromTemplateWithDryRun(kubectlOptions, + output, err = applyK8sResourceFromTemplateWithDryRun( + kubectlOptions, kafkaTopicTemplate, map[string]interface{}{ "Name": testInternalTopicName + "-different-cr-name", @@ -309,7 +327,8 @@ func testWebhookUpdateKafkaTopic(kubectlOptions k8s.KubectlOptions) bool { ContainSubstring("spec.name: Invalid value: %[1]q: kafkaTopic CR '%[1]s' in namesapce '%[2]s' is already referencing to Kafka topic '%[1]s'", testInternalTopicName, kubectlOptions.Namespace), ContainSubstring("spec.name: Invalid value: %[1]q: topic %[1]q already exists on kafka cluster and it is not managed by Koperator", - testInternalTopicName))) + testInternalTopicName), + )) }) // Clean up the KafkaTopic set up to test Update operations against From 111d82c4e6362433d6ae00938dd7d035554f5e06 Mon Sep 17 00:00:00 2001 From: Mihai Alexandrescu Date: Fri, 18 Aug 2023 18:59:41 +0300 Subject: [PATCH 4/9] refactor Apply-type functions --- tests/e2e/install_cluster.go | 3 +- tests/e2e/k8s.go | 84 +++++++++++++-------- tests/e2e/kafkatopic_webhook.go | 129 ++++++++++++-------------------- 3 files changed, 103 insertions(+), 113 deletions(-) diff --git a/tests/e2e/install_cluster.go b/tests/e2e/install_cluster.go index 110c1ba5c..a79e857fe 100644 --- a/tests/e2e/install_cluster.go +++ b/tests/e2e/install_cluster.go @@ -37,7 +37,8 @@ func requireCreatingKafkaCluster(kubectlOptions k8s.KubectlOptions, manifestPath By(fmt.Sprintf("KafkaCluster %s already exists\n", kafkaClusterName)) } else { By("Deploying a KafkaCluster") - applyK8sResourceManifest(kubectlOptions, manifestPath) + err := applyK8sResourceManifest(kubectlOptions, manifestPath) + Expect(err).NotTo(HaveOccurred()) } By("Verifying the KafkaCluster state") diff --git a/tests/e2e/k8s.go b/tests/e2e/k8s.go index a50669473..f6b7a3761 100644 --- a/tests/e2e/k8s.go +++ b/tests/e2e/k8s.go @@ -38,17 +38,62 @@ import ( const ( // allowedCRDByteCount is the limitation of the number of bytes a CRD is // allowed to have when being applied by K8s API server/kubectl. - allowedCRDByteCount = 262144 //nolint:unused // Note: this const is currently only used in helper functions which are not yet called on so this linter transitively fails for this const + allowedCRDByteCount = 262144 //nolint:unused // Note: this const is currently only used in helper functions which are not yet called on so this linter transitively fails for this const. // crdNamePrefix is the prefix of the CRD names when listed through kubectl. - crdNamePrefix = "customresourcedefinition.apiextensions.k8s.io/" //nolint:unused // Note: this const is currently only used in helper functions which are not yet called on so this linter transitively fails for this const + crdNamePrefix = "customresourcedefinition.apiextensions.k8s.io/" //nolint:unused // Note: this const is currently only used in helper functions which are not yet called on so this linter transitively fails for this const. + + // Per the kubectl spec, "dry-run" strategy must be "none", "server", or "client". + // With current use cases in e2e tests, we only expect to use "server" so the other options are commented out. + dryRunStrategyArgServer string = "--dry-run=server" // submit server-side request without persisting the resource. + //dryRunStrategyArgClient string = "--dry-run=client" // the resource is only validated locally but not sent to the Api-server. + //dryRunStrategyArgNone string = "--dry-run=none" // default value; submit server-side request and persist the resource. + ) // applyK8sResourceManifests applies the specified manifest to the provided // kubectl context and namespace. -func applyK8sResourceManifest(kubectlOptions k8s.KubectlOptions, manifestPath string) { //nolint:unused // Note: this might come in handy for manual K8s resource operations. - By(fmt.Sprintf("Applying k8s manifest %s", manifestPath)) - k8s.KubectlApply(GinkgoT(), &kubectlOptions, manifestPath) +func applyK8sResourceManifest(kubectlOptions k8s.KubectlOptions, manifestPath string, extraArgs ...string) error { //nolint:unused // Note: this might come in handy for manual K8s resource operations. + args := []string{"apply", "-f", manifestPath} + logMsg := fmt.Sprintf("Applying k8s manifest from path %s", manifestPath) + logMsg, args = kubectlArgExtender(args, logMsg, "", "", kubectlOptions.Namespace, extraArgs) + By(logMsg) + + return k8s.RunKubectlE(GinkgoT(), &kubectlOptions, args...) +} + +// applyK8sResourceManifestFromString applies the specified manifest in string format to the provided +// kubectl context and namespace. +func applyK8sResourceManifestFromString(kubectlOptions k8s.KubectlOptions, manifest string, extraArgs ...string) error { + // Replicating terratest's k8s.KubectlApplyFromStringE but with the possibility of a variadic argument that allows options like --dry-run + // + // TODO: look for a different implementation for temp files because terratest's version uses the composite test name to generate + // the temp file name which, in our case, includes all the descriptive Ginkgo statements (which are by design quite verbose). + // That can lead to erroring out on temp file creation based on the file name being too long. + tmpfile, err := k8s.StoreConfigToTempFileE(GinkgoT(), manifest) + if err != nil { + return err + } + defer os.Remove(tmpfile) + + return applyK8sResourceManifest(kubectlOptions, tmpfile, extraArgs...) +} + +// applyK8sResourceFromTemplate generates manifest from the specified go-template based on values +// and applies the specified manifest to the provided kubectl context and namespace. +func applyK8sResourceFromTemplate(kubectlOptions k8s.KubectlOptions, templateFile string, values map[string]interface{}, extraArgs ...string) error { + By(fmt.Sprintf("Generating k8s manifest from template %s", templateFile)) + var manifest bytes.Buffer + rawTemplate, err := os.ReadFile(templateFile) + if err != nil { + return err + } + t := template.Must(template.New("template").Funcs(sprig.TxtFuncMap()).Parse(string(rawTemplate))) + err = t.Execute(&manifest, values) + if err != nil { + return err + } + return applyK8sResourceManifestFromString(kubectlOptions, manifest.String(), extraArgs...) } // isExistingK8SResource queries a Resource by it's kind, namespace and name and @@ -191,7 +236,10 @@ func installK8sCRD(kubectlOptions k8s.KubectlOptions, crd []byte, shouldBeValida createOrReplaceK8sResourcesFromManifest(kubectlOptions, "crd", object.GetName(), tempPath, shouldBeValidated) default: // Note: regular CRD. - applyK8sResourceManifest(kubectlOptions, tempPath) + err = applyK8sResourceManifest(kubectlOptions, tempPath) + if err != nil { + return errors.WrapIfWithDetails(err, "applying CRD failed", "crd", string(crd)) + } } return nil @@ -453,30 +501,6 @@ func deleteK8sResourceNoErrNotFound(kubectlOptions k8s.KubectlOptions, timeout t return err } -// applyK8sResourceManifestFromString applies the specified manifest in string format to the provided -// kubectl context and namespace. -func applyK8sResourceManifestFromString(kubectlOptions k8s.KubectlOptions, manifest string) error { - By(fmt.Sprintf("Applying k8s manifest\n%s", manifest)) - return k8s.KubectlApplyFromStringE(GinkgoT(), &kubectlOptions, manifest) -} - -// applyK8sResourceFromTemplate generates manifest from the specified go-template based on values -// and applies the specified manifest to the provided kubectl context and namespace. -func applyK8sResourceFromTemplate(kubectlOptions k8s.KubectlOptions, templateFile string, values map[string]interface{}) error { - By(fmt.Sprintf("Generating K8s manifest from template %s", templateFile)) - var manifest bytes.Buffer - rawTemplate, err := os.ReadFile(templateFile) - if err != nil { - return err - } - t := template.Must(template.New("template").Funcs(sprig.TxtFuncMap()).Parse(string(rawTemplate))) - err = t.Execute(&manifest, values) - if err != nil { - return err - } - return applyK8sResourceManifestFromString(kubectlOptions, manifest.String()) -} - // listK8sResourceKinds lists all of the available resource kinds on the K8s cluster // with the apiGroupSelector parameter the result can be narrowed by the resource group. // extraArgs can be any kubectl api-resources parameter. diff --git a/tests/e2e/kafkatopic_webhook.go b/tests/e2e/kafkatopic_webhook.go index 2729139dd..f3e801f5c 100644 --- a/tests/e2e/kafkatopic_webhook.go +++ b/tests/e2e/kafkatopic_webhook.go @@ -15,50 +15,13 @@ package e2e import ( - "bytes" - "fmt" - "os" "strings" - "text/template" - "github.com/Masterminds/sprig" "github.com/gruntwork-io/terratest/modules/k8s" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" ) -// TODO(mihalexa): move to k8s.go -const ( - dryRunStrategyNone string = "none" - dryRunStrategyClient string = "client" - dryRunStrategyServer string = "server" -) - -// TODO(mihalexa): move to k8s.go -// applyK8sResourceFromTemplateWithDryRun is copy of applyK8sResourceFromTemplate which calls a "--dry-run=" kubectl command -func applyK8sResourceFromTemplateWithDryRun(kubectlOptions k8s.KubectlOptions, templateFile string, values map[string]interface{}, dryRunStrategy string) (string, error) { - By(fmt.Sprintf("Generating K8s manifest from template %s for dry-run apply", templateFile)) - var manifest bytes.Buffer - rawTemplate, err := os.ReadFile(templateFile) - if err != nil { - return "", err - } - t := template.Must(template.New("template").Funcs(sprig.TxtFuncMap()).Parse(string(rawTemplate))) - err = t.Execute(&manifest, values) - if err != nil { - return "", err - } - - By("Replicating terratest's k8s.KubectlApplyFromStringE") - tmpfile, err := k8s.StoreConfigToTempFileE(GinkgoT(), manifest.String()) - if err != nil { - return "", err - } - defer os.Remove(tmpfile) - - return k8s.RunKubectlAndGetOutputE(GinkgoT(), &kubectlOptions, "apply", "-f", tmpfile, "--dry-run="+dryRunStrategy, "--output=yaml") -} - func testWebhooks() bool { return When("Testing webhooks", func() { // temporary section; to be refactored after kubeconfig injection PR @@ -88,7 +51,7 @@ func testWebhookCreateKafkaTopic(kubectlOptions k8s.KubectlOptions) bool { }) It("Test non-existent KafkaCluster", func() { - output, err := applyK8sResourceFromTemplateWithDryRun( + err := applyK8sResourceFromTemplate( kubectlOptions, kafkaTopicTemplate, map[string]interface{}{ @@ -100,19 +63,20 @@ func testWebhookCreateKafkaTopic(kubectlOptions k8s.KubectlOptions) bool { "Namespace": kubectlOptions.Namespace, }, }, - dryRunStrategyServer, + dryRunStrategyArgServer, ) Expect(err).To(HaveOccurred()) - // Example error: The KafkaTopic "topic-test-internal" is invalid: spec.clusterRef.name: Invalid value: "kafkaNOT": kafkaCluster 'kafkaNOT' in the namespace 'kafka' does not exist - Expect(len(strings.Split(output, "\n"))).To(Equal(1)) - Expect(output).To( + // Example error: + // error while running command: exit status 1; The KafkaTopic "topic-test-internal" is invalid: spec.clusterRef.name: Invalid value: "kafkaNOT": kafkaCluster 'kafkaNOT' in the namespace 'kafka' does not exist + Expect(len(strings.Split(err.Error(), "\n"))).To(Equal(1)) + Expect(err.Error()).To( ContainSubstring("spec.clusterRef.name: Invalid value: %[1]q: kafkaCluster '%[1]s' in the namespace '%[2]s' does not exist", kafkaClusterName+"NOT", kubectlOptions.Namespace), ) }) It("Test 0 partitions and replicationFactor", func() { - output, err := applyK8sResourceFromTemplateWithDryRun( + err := applyK8sResourceFromTemplate( kubectlOptions, kafkaTopicTemplate, map[string]interface{}{ @@ -121,15 +85,15 @@ func testWebhookCreateKafkaTopic(kubectlOptions k8s.KubectlOptions) bool { "Partition": "0", "ReplicationFactor": "0", }, - dryRunStrategyServer, + dryRunStrategyArgServer, ) Expect(err).To(HaveOccurred()) // Example error: - // The KafkaTopic "topic-test-internal" is invalid: + // error while running command: exit status 1; The KafkaTopic "topic-test-internal" is invalid: // * spec.partitions: Invalid value: 0: number of partitions must be larger than 0 (or set it to be -1 to use the broker's default) // * spec.replicationFactor: Invalid value: 0: replication factor must be larger than 0 (or set it to be -1 to use the broker's default) - Expect(len(strings.Split(output, "\n"))).To(Equal(3)) - Expect(output).To(And( + Expect(len(strings.Split(err.Error(), "\n"))).To(Equal(3)) + Expect(err.Error()).To(And( ContainSubstring("spec.partitions: Invalid value: %s: number of partitions must be larger than 0 (or set it to be -1 to use the broker's default)", "0"), ContainSubstring("spec.replicationFactor: Invalid value: %s: replication factor must be larger than 0 (or set it to be -1 to use the broker's default)", "0"), )) @@ -137,7 +101,7 @@ func testWebhookCreateKafkaTopic(kubectlOptions k8s.KubectlOptions) bool { // In the current validation webhook implementation, this case can only be encountered on a Create operation It("Test ReplicationFactor larger than number of brokers", func() { - output, err := applyK8sResourceFromTemplateWithDryRun( + err := applyK8sResourceFromTemplate( kubectlOptions, kafkaTopicTemplate, map[string]interface{}{ @@ -145,13 +109,13 @@ func testWebhookCreateKafkaTopic(kubectlOptions k8s.KubectlOptions) bool { "TopicName": testInternalTopicName, "ReplicationFactor": "10", }, - dryRunStrategyServer, + dryRunStrategyArgServer, ) Expect(err).To(HaveOccurred()) // Example error: - // The KafkaTopic "topic-test-internal" is invalid: spec.replicationFactor: Invalid value: 10: replication factor is larger than the number of nodes in the kafka cluster (available brokers: 3) - Expect(len(strings.Split(output, "\n"))).To(Equal(1)) - Expect(output).To(And( + // error while running command: exit status 1; The KafkaTopic "topic-test-internal" is invalid: spec.replicationFactor: Invalid value: 10: replication factor is larger than the number of nodes in the kafka cluster (available brokers: 3) + Expect(len(strings.Split(err.Error(), "\n"))).To(Equal(1)) + Expect(err.Error()).To(And( ContainSubstring("spec.replicationFactor: Invalid value"), ContainSubstring("replication factor is larger than the number of nodes in the kafka cluster"), )) @@ -163,7 +127,7 @@ func testWebhookCreateKafkaTopic(kubectlOptions k8s.KubectlOptions) bool { It("Testing conflict on spec.name", func() { By("With managedBy koperator annotation") - output, err := applyK8sResourceFromTemplateWithDryRun( + err := applyK8sResourceFromTemplate( kubectlOptions, kafkaTopicTemplate, map[string]interface{}{ @@ -172,19 +136,19 @@ func testWebhookCreateKafkaTopic(kubectlOptions k8s.KubectlOptions) bool { "Namespace": kubectlOptions.Namespace, "Annotations": []string{"managedBy: koperator"}, }, - dryRunStrategyServer, + dryRunStrategyArgServer, ) Expect(err).To(HaveOccurred()) // Example error: - // The KafkaTopic "topic-test-internal-different-cr-name" is invalid: spec.name: Invalid value: "topic-test-internal": kafkaTopic CR 'topic-test-internal' in namesapce 'kafka' is already referencing to Kafka topic 'topic-test-internal' - Expect(len(strings.Split(output, "\n"))).To(Equal(1)) - Expect(output).To( + // error while running command: exit status 1; The KafkaTopic "topic-test-internal-different-cr-name" is invalid: spec.name: Invalid value: "topic-test-internal": kafkaTopic CR 'topic-test-internal' in namesapce 'kafka' is already referencing to Kafka topic 'topic-test-internal' + Expect(len(strings.Split(err.Error(), "\n"))).To(Equal(1)) + Expect(err.Error()).To( ContainSubstring("spec.name: Invalid value: %[1]q: kafkaTopic CR '%[1]s' in namesapce '%[2]s' is already referencing to Kafka topic '%[1]s'", testInternalTopicName, kubectlOptions.Namespace), ) By("Without managedBy koperator annotation") - output, err = applyK8sResourceFromTemplateWithDryRun( + err = applyK8sResourceFromTemplate( kubectlOptions, kafkaTopicTemplate, map[string]interface{}{ @@ -192,17 +156,17 @@ func testWebhookCreateKafkaTopic(kubectlOptions k8s.KubectlOptions) bool { "TopicName": testInternalTopicName, "Namespace": kubectlOptions.Namespace, }, - dryRunStrategyServer, + dryRunStrategyArgServer, ) Expect(err).To(HaveOccurred()) // Example error: - // The KafkaTopic "topic-test-internal-different-cr-name" is invalid: + // error while running command: exit status 1; The KafkaTopic "topic-test-internal-different-cr-name" is invalid: // * spec.name: Invalid value: "topic-test-internal": kafkaTopic CR 'topic-test-internal' in namesapce 'kafka' is already referencing to Kafka topic 'topic-test-internal' // * spec.name: Invalid value: "topic-test-internal": topic "topic-test-internal" already exists on kafka cluster and it is not managed by Koperator, // if you want it to be managed by Koperator so you can modify its configurations through a KafkaTopic CR, // add this "managedBy: koperator" annotation to this KafkaTopic CR - Expect(len(strings.Split(output, "\n"))).To(Equal(5)) - Expect(output).To(And( + Expect(len(strings.Split(err.Error(), "\n"))).To(Equal(5)) + Expect(err.Error()).To(And( ContainSubstring("spec.name: Invalid value: %[1]q: kafkaTopic CR '%[1]s' in namesapce '%[2]s' is already referencing to Kafka topic '%[1]s'", testInternalTopicName, kubectlOptions.Namespace), ContainSubstring("spec.name: Invalid value: %[1]q: topic %[1]q already exists on kafka cluster and it is not managed by Koperator", @@ -226,7 +190,7 @@ func testWebhookUpdateKafkaTopic(kubectlOptions k8s.KubectlOptions) bool { requireDeployingKafkaTopic(kubectlOptions, testInternalTopicName) It("Test non-existent KafkaCluster", func() { - output, err := applyK8sResourceFromTemplateWithDryRun( + err := applyK8sResourceFromTemplate( kubectlOptions, kafkaTopicTemplate, map[string]interface{}{ @@ -238,12 +202,13 @@ func testWebhookUpdateKafkaTopic(kubectlOptions k8s.KubectlOptions) bool { "Namespace": kubectlOptions.Namespace, }, }, - dryRunStrategyServer, + dryRunStrategyArgServer, ) Expect(err).To(HaveOccurred()) - // Example error: The KafkaTopic "topic-test-internal" is invalid: spec.clusterRef.name: Invalid value: "kafkaNOT": kafkaCluster 'kafkaNOT' in the namespace 'kafka' does not exist - Expect(len(strings.Split(output, "\n"))).To(Equal(1)) - Expect(output).To( + // Example error: + // error while running command: exit status 1; The KafkaTopic "topic-test-internal" is invalid: spec.clusterRef.name: Invalid value: "kafkaNOT": kafkaCluster 'kafkaNOT' in the namespace 'kafka' does not exist + Expect(len(strings.Split(err.Error(), "\n"))).To(Equal(1)) + Expect(err.Error()).To( ContainSubstring("spec.clusterRef.name: Invalid value: %[1]q: kafkaCluster '%[1]s' in the namespace '%[2]s' does not exist", kafkaClusterName+"NOT", kubectlOptions.Namespace), ) @@ -255,7 +220,7 @@ func testWebhookUpdateKafkaTopic(kubectlOptions k8s.KubectlOptions) bool { // * spec.replicationFactor changed (not just decreased) // Consequently, an Update test for 0 values will automatically also cover the decreasing/changing scenarios. It("Test 0 values partitions and replicationFactor", func() { - output, err := applyK8sResourceFromTemplateWithDryRun( + err := applyK8sResourceFromTemplate( kubectlOptions, kafkaTopicTemplate, map[string]interface{}{ @@ -264,17 +229,17 @@ func testWebhookUpdateKafkaTopic(kubectlOptions k8s.KubectlOptions) bool { "Partition": "0", "ReplicationFactor": "0", }, - dryRunStrategyServer, + dryRunStrategyArgServer, ) Expect(err).To(HaveOccurred()) // Example error: - // The KafkaTopic "topic-test-internal" is invalid: + // error while running command: exit status 1; The KafkaTopic "topic-test-internal" is invalid: // * spec.partitions: Invalid value: 0: number of partitions must be larger than 0 (or set it to be -1 to use the broker's default) // * spec.replicationFactor: Invalid value: 0: replication factor must be larger than 0 (or set it to be -1 to use the broker's default) // * spec.partitions: Invalid value: 0: kafka does not support decreasing partition count on an existing topic (from 2 to 0) // * spec.replicationFactor: Invalid value: 0: kafka does not support changing the replication factor on an existing topic (from 2 to 0) - Expect(len(strings.Split(output, "\n"))).To(Equal(5)) - Expect(output).To(And( + Expect(len(strings.Split(err.Error(), "\n"))).To(Equal(5)) + Expect(err.Error()).To(And( ContainSubstring("spec.partitions: Invalid value: 0: number of partitions must be larger than 0"), ContainSubstring("spec.replicationFactor: Invalid value: 0: replication factor must be larger than 0"), ContainSubstring("spec.partitions: Invalid value: %s: kafka does not support decreasing partition count on an existing topic", "0"), @@ -284,7 +249,7 @@ func testWebhookUpdateKafkaTopic(kubectlOptions k8s.KubectlOptions) bool { It("Testing conflict on spec.name", func() { By("With managedBy koperator annotation") - output, err := applyK8sResourceFromTemplateWithDryRun( + err := applyK8sResourceFromTemplate( kubectlOptions, kafkaTopicTemplate, map[string]interface{}{ @@ -293,19 +258,19 @@ func testWebhookUpdateKafkaTopic(kubectlOptions k8s.KubectlOptions) bool { "Namespace": kubectlOptions.Namespace, "Annotations": []string{"managedBy: koperator"}, }, - dryRunStrategyServer, + dryRunStrategyArgServer, ) Expect(err).To(HaveOccurred()) // Example error: - // The KafkaTopic "topic-test-internal-different-cr-name" is invalid: spec.name: Invalid value: "topic-test-internal": kafkaTopic CR 'topic-test-internal' in namesapce 'kafka' is already referencing to Kafka topic 'topic-test-internal' - Expect(len(strings.Split(output, "\n"))).To(Equal(1)) - Expect(output).To( + // error while running command: exit status 1; The KafkaTopic "topic-test-internal-different-cr-name" is invalid: spec.name: Invalid value: "topic-test-internal": kafkaTopic CR 'topic-test-internal' in namesapce 'kafka' is already referencing to Kafka topic 'topic-test-internal' + Expect(len(strings.Split(err.Error(), "\n"))).To(Equal(1)) + Expect(err.Error()).To( ContainSubstring("spec.name: Invalid value: %q: kafkaTopic CR '%s' in namesapce '%s' is already referencing to Kafka topic '%s'", testInternalTopicName, testInternalTopicName, kubectlOptions.Namespace, testInternalTopicName), ) By("Without managedBy koperator annotation") - output, err = applyK8sResourceFromTemplateWithDryRun( + err = applyK8sResourceFromTemplate( kubectlOptions, kafkaTopicTemplate, map[string]interface{}{ @@ -313,17 +278,17 @@ func testWebhookUpdateKafkaTopic(kubectlOptions k8s.KubectlOptions) bool { "TopicName": testInternalTopicName, // same spec.name as the KafkaTopic deployed in the beginning of the Update test scenario "Namespace": kubectlOptions.Namespace, }, - dryRunStrategyServer, + dryRunStrategyArgServer, ) Expect(err).To(HaveOccurred()) // Example error: - // The KafkaTopic "topic-test-internal-different-cr-name" is invalid: + // error while running command: exit status 1; The KafkaTopic "topic-test-internal-different-cr-name" is invalid: // * spec.name: Invalid value: "topic-test-internal": kafkaTopic CR 'topic-test-internal' in namesapce 'kafka' is already referencing to Kafka topic 'topic-test-internal' // * spec.name: Invalid value: "topic-test-internal": topic "topic-test-internal" already exists on kafka cluster and it is not managed by Koperator, // if you want it to be managed by Koperator so you can modify its configurations through a KafkaTopic CR, // add this "managedBy: koperator" annotation to this KafkaTopic CR - Expect(len(strings.Split(output, "\n"))).To(Equal(5)) - Expect(output).To(And( + Expect(len(strings.Split(err.Error(), "\n"))).To(Equal(5)) + Expect(err.Error()).To(And( ContainSubstring("spec.name: Invalid value: %[1]q: kafkaTopic CR '%[1]s' in namesapce '%[2]s' is already referencing to Kafka topic '%[1]s'", testInternalTopicName, kubectlOptions.Namespace), ContainSubstring("spec.name: Invalid value: %[1]q: topic %[1]q already exists on kafka cluster and it is not managed by Koperator", From 8a0926ffaf41aab3e99636a33f9eddb120a2b2fc Mon Sep 17 00:00:00 2001 From: Mihai Alexandrescu Date: Tue, 22 Aug 2023 10:22:03 +0300 Subject: [PATCH 5/9] refactor kafkatopic webhook test case data --- tests/e2e/kafkatopic_webhook.go | 233 +++++++++++++++++------------- tests/e2e/koperator_suite_test.go | 3 +- 2 files changed, 134 insertions(+), 102 deletions(-) diff --git a/tests/e2e/kafkatopic_webhook.go b/tests/e2e/kafkatopic_webhook.go index f3e801f5c..69f68933f 100644 --- a/tests/e2e/kafkatopic_webhook.go +++ b/tests/e2e/kafkatopic_webhook.go @@ -15,76 +15,74 @@ package e2e import ( + "fmt" "strings" "github.com/gruntwork-io/terratest/modules/k8s" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + "k8s.io/apimachinery/pkg/types" ) -func testWebhooks() bool { - return When("Testing webhooks", func() { - // temporary section; to be refactored after kubeconfig injection PR - var kubectlOptions k8s.KubectlOptions - var err error - kubectlOptions, err = kubectlOptionsForCurrentContext() - if err != nil { - GinkgoT().Fail() - } +func testWebhookKafkaTopic(kafkaCluster types.NamespacedName) { + // temporary section; to be refactored after kubeconfig injection PR + var kubectlOptions k8s.KubectlOptions + var err error + kubectlOptions, err = kubectlOptionsForCurrentContext() + if err != nil { + GinkgoT().Fail() + } - testWebhookKafkaTopic(kubectlOptions) - }) -} + kubectlOptions.Namespace = kafkaCluster.Namespace + + testWebhookCreateKafkaTopic(kubectlOptions, kafkaCluster) + testWebhookUpdateKafkaTopic(kubectlOptions, kafkaCluster) -func testWebhookKafkaTopic(kubectlOptions k8s.KubectlOptions) { - kubectlOptions.Namespace = koperatorLocalHelmDescriptor.Namespace - requireCreatingKafkaCluster(kubectlOptions, "../../config/samples/simplekafkacluster.yaml") - testWebhookCreateKafkaTopic(kubectlOptions) - testWebhookUpdateKafkaTopic(kubectlOptions) - requireDeleteKafkaCluster(kubectlOptions, kafkaClusterName) } -func testWebhookCreateKafkaTopic(kubectlOptions k8s.KubectlOptions) bool { +func testWebhookCreateKafkaTopic(kubectlOptions k8s.KubectlOptions, kafkaCluster types.NamespacedName) bool { return When("Testing KafkaTopic Create", func() { BeforeAll(func() { Expect(isExistingK8SResource(kubectlOptions, kafkaKind, kafkaClusterName)).To(BeTrue()) }) + const nonExistent string = "non-existent" + + baseKafkaTopicTemplateValues := baseKafkaTopicData( + types.NamespacedName{Name: testInternalTopicName, Namespace: kubectlOptions.Namespace}, + kafkaCluster, + ) + It("Test non-existent KafkaCluster", func() { + caseData := copyMapWithStringKeys(baseKafkaTopicTemplateValues) + caseData["ClusterRef"] = map[string]string{ + "Name": nonExistent, + "Namespace": kafkaCluster.Namespace, + } err := applyK8sResourceFromTemplate( kubectlOptions, kafkaTopicTemplate, - map[string]interface{}{ - "Name": testInternalTopicName, - "TopicName": testInternalTopicName, - "Namespace": kubectlOptions.Namespace, - "ClusterRef": map[string]string{ - "Name": kafkaClusterName + "NOT", - "Namespace": kubectlOptions.Namespace, - }, - }, + caseData, dryRunStrategyArgServer, ) Expect(err).To(HaveOccurred()) // Example error: - // error while running command: exit status 1; The KafkaTopic "topic-test-internal" is invalid: spec.clusterRef.name: Invalid value: "kafkaNOT": kafkaCluster 'kafkaNOT' in the namespace 'kafka' does not exist + // error while running command: exit status 1; The KafkaTopic "topic-test-internal" is invalid: spec.clusterRef.name: Invalid value: "NON-EXISTENT": kafkaCluster 'NON-EXISTENT' in the namespace 'kafka' does not exist Expect(len(strings.Split(err.Error(), "\n"))).To(Equal(1)) - Expect(err.Error()).To( - ContainSubstring("spec.clusterRef.name: Invalid value: %[1]q: kafkaCluster '%[1]s' in the namespace '%[2]s' does not exist", - kafkaClusterName+"NOT", kubectlOptions.Namespace), - ) + Expect(err.Error()).To(And( + ContainSubstring("The KafkaTopic %[1]q is invalid: spec.clusterRef.name: Invalid value: %[2]q: kafkaCluster '%[2]s' in the namespace '%[3]s' does not exist", + caseData["Name"], nonExistent, kubectlOptions.Namespace), + )) }) It("Test 0 partitions and replicationFactor", func() { + caseData := copyMapWithStringKeys(baseKafkaTopicTemplateValues) + caseData["Partition"] = "0" + caseData["ReplicationFactor"] = "0" err := applyK8sResourceFromTemplate( kubectlOptions, kafkaTopicTemplate, - map[string]interface{}{ - "Name": testInternalTopicName, - "TopicName": testInternalTopicName, - "Partition": "0", - "ReplicationFactor": "0", - }, + caseData, dryRunStrategyArgServer, ) Expect(err).To(HaveOccurred()) @@ -94,48 +92,54 @@ func testWebhookCreateKafkaTopic(kubectlOptions k8s.KubectlOptions) bool { // * spec.replicationFactor: Invalid value: 0: replication factor must be larger than 0 (or set it to be -1 to use the broker's default) Expect(len(strings.Split(err.Error(), "\n"))).To(Equal(3)) Expect(err.Error()).To(And( - ContainSubstring("spec.partitions: Invalid value: %s: number of partitions must be larger than 0 (or set it to be -1 to use the broker's default)", "0"), - ContainSubstring("spec.replicationFactor: Invalid value: %s: replication factor must be larger than 0 (or set it to be -1 to use the broker's default)", "0"), + ContainSubstring("The KafkaTopic %q is invalid:", caseData["Name"]), + ContainSubstring("spec.partitions: Invalid value: %s: number of partitions must be larger than 0 (or set it to be -1 to use the broker's default)", caseData["Partition"]), + ContainSubstring("spec.replicationFactor: Invalid value: %s: replication factor must be larger than 0 (or set it to be -1 to use the broker's default)", caseData["ReplicationFactor"]), )) }) // In the current validation webhook implementation, this case can only be encountered on a Create operation It("Test ReplicationFactor larger than number of brokers", func() { + caseData := copyMapWithStringKeys(baseKafkaTopicTemplateValues) + caseData["ReplicationFactor"] = "10" err := applyK8sResourceFromTemplate( kubectlOptions, kafkaTopicTemplate, - map[string]interface{}{ - "Name": testInternalTopicName, - "TopicName": testInternalTopicName, - "ReplicationFactor": "10", - }, + caseData, dryRunStrategyArgServer, ) Expect(err).To(HaveOccurred()) // Example error: // error while running command: exit status 1; The KafkaTopic "topic-test-internal" is invalid: spec.replicationFactor: Invalid value: 10: replication factor is larger than the number of nodes in the kafka cluster (available brokers: 3) Expect(len(strings.Split(err.Error(), "\n"))).To(Equal(1)) - Expect(err.Error()).To(And( - ContainSubstring("spec.replicationFactor: Invalid value"), - ContainSubstring("replication factor is larger than the number of nodes in the kafka cluster"), - )) + Expect(err.Error()).To( + ContainSubstring("The KafkaTopic %[1]q is invalid: spec.replicationFactor: Invalid value: %[2]s: replication factor is larger than the number of nodes in the kafka cluster", + caseData["Name"], caseData["ReplicationFactor"]), + ) }) // Test case involving existing CRs but not necessarily an Update operation - When("Testing conflicts similar CRs", func() { + When("Testing conflicts similar CRs", Ordered, func() { requireDeployingKafkaTopic(kubectlOptions, testInternalTopicName) It("Testing conflict on spec.name", func() { + caseData := copyMapWithStringKeys(baseKafkaTopicTemplateValues) + + switch v := caseData["Name"].(type) { + case string: + caseData["Name"] = v + "-different-cr-name" + case fmt.Stringer: + caseData["Name"] = v.String() + "-different-cr-name" + default: + caseData["Name"] = nonExistent + } + By("With managedBy koperator annotation") + caseData["Annotations"] = []string{"managedBy: koperator"} err := applyK8sResourceFromTemplate( kubectlOptions, kafkaTopicTemplate, - map[string]interface{}{ - "Name": testInternalTopicName + "-different-cr-name", - "TopicName": testInternalTopicName, - "Namespace": kubectlOptions.Namespace, - "Annotations": []string{"managedBy: koperator"}, - }, + caseData, dryRunStrategyArgServer, ) Expect(err).To(HaveOccurred()) @@ -143,19 +147,16 @@ func testWebhookCreateKafkaTopic(kubectlOptions k8s.KubectlOptions) bool { // error while running command: exit status 1; The KafkaTopic "topic-test-internal-different-cr-name" is invalid: spec.name: Invalid value: "topic-test-internal": kafkaTopic CR 'topic-test-internal' in namesapce 'kafka' is already referencing to Kafka topic 'topic-test-internal' Expect(len(strings.Split(err.Error(), "\n"))).To(Equal(1)) Expect(err.Error()).To( - ContainSubstring("spec.name: Invalid value: %[1]q: kafkaTopic CR '%[1]s' in namesapce '%[2]s' is already referencing to Kafka topic '%[1]s'", - testInternalTopicName, kubectlOptions.Namespace), + ContainSubstring("The KafkaTopic %[1]q is invalid: spec.name: Invalid value: %[2]q: kafkaTopic CR '%[2]s' in namesapce '%[3]s' is already referencing to Kafka topic '%[2]s'", + caseData["Name"], testInternalTopicName, kubectlOptions.Namespace), ) By("Without managedBy koperator annotation") + caseData["Annotations"] = []string{} err = applyK8sResourceFromTemplate( kubectlOptions, kafkaTopicTemplate, - map[string]interface{}{ - "Name": testInternalTopicName + "-different-cr-name", - "TopicName": testInternalTopicName, - "Namespace": kubectlOptions.Namespace, - }, + caseData, dryRunStrategyArgServer, ) Expect(err).To(HaveOccurred()) @@ -167,50 +168,53 @@ func testWebhookCreateKafkaTopic(kubectlOptions k8s.KubectlOptions) bool { // add this "managedBy: koperator" annotation to this KafkaTopic CR Expect(len(strings.Split(err.Error(), "\n"))).To(Equal(5)) Expect(err.Error()).To(And( + ContainSubstring("The KafkaTopic %q is invalid:", caseData["Name"]), ContainSubstring("spec.name: Invalid value: %[1]q: kafkaTopic CR '%[1]s' in namesapce '%[2]s' is already referencing to Kafka topic '%[1]s'", testInternalTopicName, kubectlOptions.Namespace), ContainSubstring("spec.name: Invalid value: %[1]q: topic %[1]q already exists on kafka cluster and it is not managed by Koperator", testInternalTopicName), )) - }) - requireDeleteKafkaTopic(kubectlOptions, testInternalTopicName) }) }) } -func testWebhookUpdateKafkaTopic(kubectlOptions k8s.KubectlOptions) bool { +func testWebhookUpdateKafkaTopic(kubectlOptions k8s.KubectlOptions, kafkaCluster types.NamespacedName) bool { return When("Testing KafkaTopic Update", func() { BeforeAll(func() { Expect(isExistingK8SResource(kubectlOptions, kafkaKind, kafkaClusterName)).To(BeTrue()) }) + const nonExistent string = "non-existent" + + baseKafkaTopicTemplateValues := baseKafkaTopicData( + types.NamespacedName{Name: testInternalTopicName, Namespace: kubectlOptions.Namespace}, + kafkaCluster, + ) + // Update operation implies having a CR with the same name in place requireDeployingKafkaTopic(kubectlOptions, testInternalTopicName) It("Test non-existent KafkaCluster", func() { + caseData := copyMapWithStringKeys(baseKafkaTopicTemplateValues) + caseData["ClusterRef"] = map[string]string{ + "Name": nonExistent, + "Namespace": kafkaCluster.Namespace, + } err := applyK8sResourceFromTemplate( kubectlOptions, kafkaTopicTemplate, - map[string]interface{}{ - "Name": testInternalTopicName, - "TopicName": testInternalTopicName, - "Namespace": kubectlOptions.Namespace, - "ClusterRef": map[string]string{ - "Name": kafkaClusterName + "NOT", - "Namespace": kubectlOptions.Namespace, - }, - }, + caseData, dryRunStrategyArgServer, ) Expect(err).To(HaveOccurred()) // Example error: - // error while running command: exit status 1; The KafkaTopic "topic-test-internal" is invalid: spec.clusterRef.name: Invalid value: "kafkaNOT": kafkaCluster 'kafkaNOT' in the namespace 'kafka' does not exist + // error while running command: exit status 1; The KafkaTopic "topic-test-internal" is invalid: spec.clusterRef.name: Invalid value: "non-existent": kafkaCluster 'non-existent' in the namespace 'kafka' does not exist Expect(len(strings.Split(err.Error(), "\n"))).To(Equal(1)) Expect(err.Error()).To( - ContainSubstring("spec.clusterRef.name: Invalid value: %[1]q: kafkaCluster '%[1]s' in the namespace '%[2]s' does not exist", - kafkaClusterName+"NOT", kubectlOptions.Namespace), + ContainSubstring("The KafkaTopic %[1]q is invalid: spec.clusterRef.name: Invalid value: %[2]q: kafkaCluster '%[2]s' in the namespace '%[3]s' does not exist", + caseData["Name"], nonExistent, kubectlOptions.Namespace), ) }) @@ -220,15 +224,13 @@ func testWebhookUpdateKafkaTopic(kubectlOptions k8s.KubectlOptions) bool { // * spec.replicationFactor changed (not just decreased) // Consequently, an Update test for 0 values will automatically also cover the decreasing/changing scenarios. It("Test 0 values partitions and replicationFactor", func() { + caseData := copyMapWithStringKeys(baseKafkaTopicTemplateValues) + caseData["Partition"] = "0" + caseData["ReplicationFactor"] = "0" err := applyK8sResourceFromTemplate( kubectlOptions, kafkaTopicTemplate, - map[string]interface{}{ - "Name": testInternalTopicName, - "TopicName": testInternalTopicName, - "Partition": "0", - "ReplicationFactor": "0", - }, + caseData, dryRunStrategyArgServer, ) Expect(err).To(HaveOccurred()) @@ -240,24 +242,32 @@ func testWebhookUpdateKafkaTopic(kubectlOptions k8s.KubectlOptions) bool { // * spec.replicationFactor: Invalid value: 0: kafka does not support changing the replication factor on an existing topic (from 2 to 0) Expect(len(strings.Split(err.Error(), "\n"))).To(Equal(5)) Expect(err.Error()).To(And( + ContainSubstring("The KafkaTopic %q is invalid:", caseData["Name"]), ContainSubstring("spec.partitions: Invalid value: 0: number of partitions must be larger than 0"), ContainSubstring("spec.replicationFactor: Invalid value: 0: replication factor must be larger than 0"), - ContainSubstring("spec.partitions: Invalid value: %s: kafka does not support decreasing partition count on an existing topic", "0"), - ContainSubstring("spec.replicationFactor: Invalid value: %s: kafka does not support changing the replication factor on an existing topic", "0"), + ContainSubstring("spec.partitions: Invalid value: %s: kafka does not support decreasing partition count on an existing topic", caseData["Partition"]), + ContainSubstring("spec.replicationFactor: Invalid value: %s: kafka does not support changing the replication factor on an existing topic", caseData["ReplicationFactor"]), )) }) It("Testing conflict on spec.name", func() { + caseData := copyMapWithStringKeys(baseKafkaTopicTemplateValues) + + switch v := caseData["Name"].(type) { + case string: + caseData["Name"] = v + "-different-cr-name" + case fmt.Stringer: + caseData["Name"] = v.String() + "-different-cr-name" + default: + caseData["Name"] = nonExistent + } + By("With managedBy koperator annotation") + caseData["Annotations"] = []string{"managedBy: koperator"} err := applyK8sResourceFromTemplate( kubectlOptions, kafkaTopicTemplate, - map[string]interface{}{ - "Name": testInternalTopicName + "-different-cr-name", - "TopicName": testInternalTopicName, // same spec.name as the KafkaTopic deployed in the beginning of the Update test scenario - "Namespace": kubectlOptions.Namespace, - "Annotations": []string{"managedBy: koperator"}, - }, + caseData, dryRunStrategyArgServer, ) Expect(err).To(HaveOccurred()) @@ -265,19 +275,16 @@ func testWebhookUpdateKafkaTopic(kubectlOptions k8s.KubectlOptions) bool { // error while running command: exit status 1; The KafkaTopic "topic-test-internal-different-cr-name" is invalid: spec.name: Invalid value: "topic-test-internal": kafkaTopic CR 'topic-test-internal' in namesapce 'kafka' is already referencing to Kafka topic 'topic-test-internal' Expect(len(strings.Split(err.Error(), "\n"))).To(Equal(1)) Expect(err.Error()).To( - ContainSubstring("spec.name: Invalid value: %q: kafkaTopic CR '%s' in namesapce '%s' is already referencing to Kafka topic '%s'", - testInternalTopicName, testInternalTopicName, kubectlOptions.Namespace, testInternalTopicName), + ContainSubstring("The KafkaTopic %[1]q is invalid: spec.name: Invalid value: %[2]q: kafkaTopic CR '%[2]s' in namesapce '%[3]s' is already referencing to Kafka topic '%[2]s'", + caseData["Name"], testInternalTopicName, kubectlOptions.Namespace), ) By("Without managedBy koperator annotation") + caseData["Annotations"] = []string{} err = applyK8sResourceFromTemplate( kubectlOptions, kafkaTopicTemplate, - map[string]interface{}{ - "Name": testInternalTopicName + "-different-cr-name", - "TopicName": testInternalTopicName, // same spec.name as the KafkaTopic deployed in the beginning of the Update test scenario - "Namespace": kubectlOptions.Namespace, - }, + caseData, dryRunStrategyArgServer, ) Expect(err).To(HaveOccurred()) @@ -289,7 +296,8 @@ func testWebhookUpdateKafkaTopic(kubectlOptions k8s.KubectlOptions) bool { // add this "managedBy: koperator" annotation to this KafkaTopic CR Expect(len(strings.Split(err.Error(), "\n"))).To(Equal(5)) Expect(err.Error()).To(And( - ContainSubstring("spec.name: Invalid value: %[1]q: kafkaTopic CR '%[1]s' in namesapce '%[2]s' is already referencing to Kafka topic '%[1]s'", + ContainSubstring("The KafkaTopic %q is invalid:", caseData["Name"]), + ContainSubstring("spec.name: Invalid value: %[1]q: kafkaTopic CR '%[1]s' in namesapce '%[2]s' is already referencing to Kafka topic '%[1]s'\n", testInternalTopicName, kubectlOptions.Namespace), ContainSubstring("spec.name: Invalid value: %[1]q: topic %[1]q already exists on kafka cluster and it is not managed by Koperator", testInternalTopicName), @@ -300,3 +308,26 @@ func testWebhookUpdateKafkaTopic(kubectlOptions k8s.KubectlOptions) bool { requireDeleteKafkaTopic(kubectlOptions, testInternalTopicName) }) } + +func baseKafkaTopicData(kafkaTopic types.NamespacedName, kafkaCluster types.NamespacedName) map[string]interface{} { + return map[string]interface{}{ + "Name": kafkaTopic.Name, + "TopicName": kafkaTopic.Name, + "Namespace": kafkaTopic.Namespace, + "Partition": "2", + "ReplicationFactor": "2", + "ClusterRef": map[string]string{ + "Name": kafkaCluster.Name, + "Namespace": kafkaCluster.Namespace, + }, + "Annotations": []string{"managedBy: koperator"}, + } +} + +func copyMapWithStringKeys(oldMap map[string]interface{}) map[string]interface{} { + var newMap = make(map[string]interface{}) + for k := range oldMap { + newMap[k] = oldMap[k] + } + return newMap +} diff --git a/tests/e2e/koperator_suite_test.go b/tests/e2e/koperator_suite_test.go index 303e2674e..72051b048 100644 --- a/tests/e2e/koperator_suite_test.go +++ b/tests/e2e/koperator_suite_test.go @@ -24,6 +24,7 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" ) func TestKoperator(t *testing.T) { @@ -59,12 +60,12 @@ var _ = When("Testing e2e test altogether", Ordered, func() { testInstall() testInstallZookeeperCluster() testInstallKafkaCluster("../../config/samples/simplekafkacluster.yaml") + testWebhookKafkaTopic(types.NamespacedName{Name: kafkaClusterName, Namespace: koperatorLocalHelmDescriptor.Namespace}) testProduceConsumeInternal() testUninstallKafkaCluster() testInstallKafkaCluster("../../config/samples/simplekafkacluster_ssl.yaml") testProduceConsumeInternalSSL(defaultTLSSecretName) testUninstallKafkaCluster() - testWebhooks() testUninstallZookeeperCluster() testUninstall() snapshotClusterAndCompare(snapshottedInfo) From cf9faefc4dd0bb465127b884ea9eb36fdaee3633 Mon Sep 17 00:00:00 2001 From: Mihai Alexandrescu Date: Tue, 22 Aug 2023 10:26:22 +0300 Subject: [PATCH 6/9] update to go.mod and go.sum in root and tests/e2e after tidy --- tests/e2e/go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/e2e/go.mod b/tests/e2e/go.mod index 772dbda35..2d1b5df6a 100644 --- a/tests/e2e/go.mod +++ b/tests/e2e/go.mod @@ -62,7 +62,6 @@ require ( github.com/twmb/franz-go/pkg/kmsg v1.4.0 // indirect github.com/wayneashleyberry/terminal-dimensions v1.0.0 // indirect go.uber.org/zap v1.24.0 // indirect - golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1 // indirect sigs.k8s.io/controller-runtime v0.14.6 // indirect ) @@ -109,6 +108,7 @@ require ( go.uber.org/atomic v1.9.0 // indirect go.uber.org/multierr v1.6.0 // indirect golang.org/x/crypto v0.7.0 // indirect + golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1 // indirect golang.org/x/net v0.10.0 // indirect golang.org/x/oauth2 v0.4.0 // indirect golang.org/x/sys v0.8.0 // indirect From 34483c40421d05f445fa91fba4d0b9b42cb23ddb Mon Sep 17 00:00:00 2001 From: Mihai Alexandrescu Date: Wed, 23 Aug 2023 16:57:53 +0300 Subject: [PATCH 7/9] update testcase logic; temporary commit - needs cleanup --- tests/e2e/go.mod | 2 +- tests/e2e/go.sum | 4 +- tests/e2e/k8s.go | 20 +- tests/e2e/kafka.go | 50 +++- tests/e2e/kafkatopic_webhook.go | 354 +++++++++++++++++++++++++++- tests/e2e/koperator_suite_test.go | 26 +- tests/e2e/templates/topic.yaml.tmpl | 2 +- 7 files changed, 422 insertions(+), 36 deletions(-) diff --git a/tests/e2e/go.mod b/tests/e2e/go.mod index 2d1b5df6a..802aa0152 100644 --- a/tests/e2e/go.mod +++ b/tests/e2e/go.mod @@ -14,6 +14,7 @@ require ( github.com/twmb/franz-go v1.13.5 k8s.io/apiextensions-apiserver v0.26.4 k8s.io/apimachinery v0.26.4 + k8s.io/utils v0.0.0-20230726121419-3b25d923346b sigs.k8s.io/yaml v1.3.0 ) @@ -125,7 +126,6 @@ require ( k8s.io/client-go v0.26.4 // indirect k8s.io/klog/v2 v2.80.1 // indirect k8s.io/kube-openapi v0.0.0-20221207184640-f3cff1453715 // indirect - k8s.io/utils v0.0.0-20221128185143-99ec85e7a448 // indirect sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2 // indirect sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect ) diff --git a/tests/e2e/go.sum b/tests/e2e/go.sum index c69c724d9..98c2ace1f 100644 --- a/tests/e2e/go.sum +++ b/tests/e2e/go.sum @@ -845,8 +845,8 @@ k8s.io/klog/v2 v2.80.1 h1:atnLQ121W371wYYFawwYx1aEY2eUfs4l3J72wtgAwV4= k8s.io/klog/v2 v2.80.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0= k8s.io/kube-openapi v0.0.0-20221207184640-f3cff1453715 h1:tBEbstoM+K0FiBV5KGAKQ0kuvf54v/hwpldiJt69w1s= k8s.io/kube-openapi v0.0.0-20221207184640-f3cff1453715/go.mod h1:+Axhij7bCpeqhklhUTe3xmOn6bWxolyZEeyaFpjGtl4= -k8s.io/utils v0.0.0-20221128185143-99ec85e7a448 h1:KTgPnR10d5zhztWptI952TNtt/4u5h3IzDXkdIMuo2Y= -k8s.io/utils v0.0.0-20221128185143-99ec85e7a448/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= +k8s.io/utils v0.0.0-20230726121419-3b25d923346b h1:sgn3ZU783SCgtaSJjpcVVlRqd6GSnlTLKgpAAttJvpI= +k8s.io/utils v0.0.0-20230726121419-3b25d923346b/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0= rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA= diff --git a/tests/e2e/k8s.go b/tests/e2e/k8s.go index f6b7a3761..c1bd7b740 100644 --- a/tests/e2e/k8s.go +++ b/tests/e2e/k8s.go @@ -81,7 +81,7 @@ func applyK8sResourceManifestFromString(kubectlOptions k8s.KubectlOptions, manif // applyK8sResourceFromTemplate generates manifest from the specified go-template based on values // and applies the specified manifest to the provided kubectl context and namespace. -func applyK8sResourceFromTemplate(kubectlOptions k8s.KubectlOptions, templateFile string, values map[string]interface{}, extraArgs ...string) error { +func applyK8sResourceFromTemplate(kubectlOptions k8s.KubectlOptions, templateFile string, values map[string]any, extraArgs ...string) error { By(fmt.Sprintf("Generating k8s manifest from template %s", templateFile)) var manifest bytes.Buffer rawTemplate, err := os.ReadFile(templateFile) @@ -96,6 +96,24 @@ func applyK8sResourceFromTemplate(kubectlOptions k8s.KubectlOptions, templateFil return applyK8sResourceManifestFromString(kubectlOptions, manifest.String(), extraArgs...) } +// applyK8sResourceFromTemplate generates manifest from the specified go-template based on values +// and applies the specified manifest to the provided kubectl context and namespace. +func applyK8sResourceFromTemplate_2(kubectlOptions k8s.KubectlOptions, templateFile string, values any, extraArgs ...string) error { + By(fmt.Sprintf("Generating k8s manifest from template %s", templateFile)) + var manifest bytes.Buffer + rawTemplate, err := os.ReadFile(templateFile) + if err != nil { + return err + } + t := template.Must(template.New("template").Funcs(sprig.TxtFuncMap()).Parse(string(rawTemplate))) + err = t.Execute(&manifest, values) + if err != nil { + return err + } + fmt.Printf("###\nManifest is :\n%s\n###\n", manifest.String()) + return applyK8sResourceManifestFromString(kubectlOptions, manifest.String(), extraArgs...) +} + // isExistingK8SResource queries a Resource by it's kind, namespace and name and // returns true if it's found, false otherwise func isExistingK8SResource( diff --git a/tests/e2e/kafka.go b/tests/e2e/kafka.go index 54b7cfd9b..316c95fcc 100644 --- a/tests/e2e/kafka.go +++ b/tests/e2e/kafka.go @@ -21,6 +21,7 @@ import ( "github.com/gruntwork-io/terratest/modules/k8s" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + "k8s.io/utils/ptr" ) // requireDeleteKafkaTopic deletes kafkaTopic resource. @@ -34,14 +35,29 @@ func requireDeleteKafkaTopic(kubectlOptions k8s.KubectlOptions, topicName string // requireDeployingKafkaTopic deploys a kafkaTopic resource from a template func requireDeployingKafkaTopic(kubectlOptions k8s.KubectlOptions, topicName string) { It("Deploying KafkaTopic CR", func() { - err := applyK8sResourceFromTemplate(kubectlOptions, - kafkaTopicTemplate, - map[string]interface{}{ - "Name": topicName, - "TopicName": topicName, - "Namespace": kubectlOptions.Namespace, + // err := applyK8sResourceFromTemplate(kubectlOptions, + // kafkaTopicTemplate, + // map[string]interface{}{ + // "Name": topicName, + // "TopicName": topicName, + // "Namespace": kubectlOptions.Namespace, + // }, + // ) + values := kafkaTopicTemplateData{ + Annotations: []string{"managedBy: koperator"}, + ClusterRef: kafkaTopicClusterRef{ + Name: kafkaClusterName, + Namespace: kubectlOptions.Namespace, }, - ) + Name: topicName, + Namespace: kubectlOptions.Namespace, + Partitions: ptr.To(int32(2)), + ReplicationFactor: ptr.To(int32(2)), + TopicName: topicName, + } + + err := applyK8sResourceFromTemplate_2(kubectlOptions, kafkaTopicTemplate, values) + Expect(err).ShouldNot(HaveOccurred()) err = waitK8sResourceCondition(kubectlOptions, kafkaTopicKind, @@ -82,3 +98,23 @@ func requireDeployingKafkaUser(kubectlOptions k8s.KubectlOptions, userName strin }, defaultUserCreationWaitTime, 3*time.Second).Should(Equal(true)) }) } + +// kafkaTopicTemplateData is a struct that holds the relevant information and structure +// to fill out the template used to generate KafkaTopics. +// TODO: long term we should use the structs in the api module instead of these local structs. +type kafkaTopicTemplateData struct { + Annotations []string + ClusterRef kafkaTopicClusterRef + Name string + Namespace string + Partitions *int32 + ReplicationFactor *int32 + TopicName string +} + +// kafkaTopicClusterRef holds the information relevant to identifying a KafkaCluster within a KafkaTopic CR. +// TODO: Long term, we should use the structs in the api module instead of these local structs. +type kafkaTopicClusterRef struct { + Name string + Namespace string +} diff --git a/tests/e2e/kafkatopic_webhook.go b/tests/e2e/kafkatopic_webhook.go index 69f68933f..be44c3e12 100644 --- a/tests/e2e/kafkatopic_webhook.go +++ b/tests/e2e/kafkatopic_webhook.go @@ -22,6 +22,7 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "k8s.io/apimachinery/pkg/types" + "k8s.io/utils/ptr" ) func testWebhookKafkaTopic(kafkaCluster types.NamespacedName) { @@ -35,9 +36,339 @@ func testWebhookKafkaTopic(kafkaCluster types.NamespacedName) { kubectlOptions.Namespace = kafkaCluster.Namespace - testWebhookCreateKafkaTopic(kubectlOptions, kafkaCluster) - testWebhookUpdateKafkaTopic(kubectlOptions, kafkaCluster) + //testWebhookCreateKafkaTopic(kubectlOptions, kafkaCluster) + //testWebhookUpdateKafkaTopic(kubectlOptions, kafkaCluster) + testWebhookCreateKafkaTopic_2(kubectlOptions, kafkaCluster) + testWebhookUpdateKafkaTopic_2(kubectlOptions, kafkaCluster) +} + +func testWebhookCreateKafkaTopic_2(kubectlOptions k8s.KubectlOptions, kafkaCluster types.NamespacedName) bool { + return When("Testing KafkaTopic Create", func() { + BeforeAll(func() { + Expect(isExistingK8SResource(kubectlOptions, kafkaKind, kafkaCluster.Name)).To(BeTrue()) + }) + + const nonExistent string = "non-existent" + + It("Test non-existent KafkaCluster", func() { + caseData := kafkaTopicTemplateData{ + Annotations: []string{"managedBy: koperator"}, + ClusterRef: kafkaTopicClusterRef{ + Name: nonExistent, // Note: This is a deliberately inserted error for this test case. + Namespace: kafkaCluster.Namespace, + }, + Name: testInternalTopicName, + Namespace: kubectlOptions.Namespace, + Partitions: ptr.To(int32(2)), + ReplicationFactor: ptr.To(int32(2)), + TopicName: testInternalTopicName, + } + err := applyK8sResourceFromTemplate_2( + kubectlOptions, + kafkaTopicTemplate, + caseData, + dryRunStrategyArgServer, + ) + Expect(err).To(HaveOccurred()) + // Example error: + // error while running command: exit status 1; The KafkaTopic "topic-test-internal" is invalid: spec.clusterRef.name: Invalid value: "non-existent": kafkaCluster 'non-existent' in the namespace 'kafka' does not exist + Expect(len(strings.Split(err.Error(), "\n"))).To(Equal(1)) + Expect(err.Error()).To( + ContainSubstring("The KafkaTopic %[1]q is invalid: spec.clusterRef.name: Invalid value: %[2]q: kafkaCluster '%[2]s' in the namespace '%[3]s' does not exist", + caseData.Name, caseData.ClusterRef.Name, caseData.ClusterRef.Namespace), + ) + }) + + It("Test 0 partitions and replicationFactor", func() { + caseData := kafkaTopicTemplateData{ + Annotations: []string{"managedBy: koperator"}, + ClusterRef: kafkaTopicClusterRef{ + Name: kafkaCluster.Name, + Namespace: kafkaCluster.Namespace, + }, + Name: testInternalTopicName, + Namespace: kubectlOptions.Namespace, + Partitions: ptr.To(int32(0)), // Note: This is a deliberately inserted error for this test case. + ReplicationFactor: ptr.To(int32(0)), // Note: This is a deliberately inserted error for this test case. + TopicName: testInternalTopicName, + } + err := applyK8sResourceFromTemplate_2( + kubectlOptions, + kafkaTopicTemplate, + caseData, + dryRunStrategyArgServer, + ) + Expect(err).To(HaveOccurred()) + // Example error: + // error while running command: exit status 1; The KafkaTopic "topic-test-internal" is invalid: + // * spec.partitions: Invalid value: 0: number of partitions must be larger than 0 (or set it to be -1 to use the broker's default) + // * spec.replicationFactor: Invalid value: 0: replication factor must be larger than 0 (or set it to be -1 to use the broker's default) + Expect(len(strings.Split(err.Error(), "\n"))).To(Equal(3)) + Expect(err.Error()).To(And( + ContainSubstring("The KafkaTopic %q is invalid:", caseData.Name), + ContainSubstring("spec.partitions: Invalid value: %d: number of partitions must be larger than 0 (or set it to be -1 to use the broker's default)", ptr.Deref(caseData.Partitions, -100)), + ContainSubstring("spec.replicationFactor: Invalid value: %d: replication factor must be larger than 0 (or set it to be -1 to use the broker's default)", ptr.Deref(caseData.ReplicationFactor, -100)), + )) + }) + + // In the current validation webhook implementation, this case can only be encountered on a Create operation + It("Test ReplicationFactor larger than number of brokers", func() { + caseData := kafkaTopicTemplateData{ + Annotations: []string{"managedBy: koperator"}, + ClusterRef: kafkaTopicClusterRef{ + Name: kafkaCluster.Name, + Namespace: kafkaCluster.Namespace, + }, + Name: testInternalTopicName, + Namespace: kubectlOptions.Namespace, + Partitions: ptr.To(int32(2)), + ReplicationFactor: ptr.To(int32(10)), // Note: This is a deliberately inserted error for this test case. + TopicName: testInternalTopicName, + } + err := applyK8sResourceFromTemplate_2( + kubectlOptions, + kafkaTopicTemplate, + caseData, + dryRunStrategyArgServer, + ) + Expect(err).To(HaveOccurred()) + // Example error: + // error while running command: exit status 1; The KafkaTopic "topic-test-internal" is invalid: spec.replicationFactor: Invalid value: 10: replication factor is larger than the number of nodes in the kafka cluster (available brokers: 3) + Expect(len(strings.Split(err.Error(), "\n"))).To(Equal(1)) + Expect(err.Error()).To( + ContainSubstring("The KafkaTopic %[1]q is invalid: spec.replicationFactor: Invalid value: %[2]d: replication factor is larger than the number of nodes in the kafka cluster", + caseData.Name, ptr.Deref(caseData.ReplicationFactor, -100)), + ) + }) + // Test case involving existing CRs but not necessarily an Update operation + When("Testing conflicts similar CRs", Ordered, func() { + var overlappingTopicName string = testInternalTopicName + requireDeployingKafkaTopic(kubectlOptions, overlappingTopicName) + + It("Testing conflict on spec.name", func() { + var caseData kafkaTopicTemplateData + + By("With managedBy koperator annotation") + caseData = kafkaTopicTemplateData{ + Annotations: []string{"managedBy: koperator"}, // Note: This information is relevant to this particular test case. + ClusterRef: kafkaTopicClusterRef{ + Name: kafkaCluster.Name, + Namespace: kafkaCluster.Namespace, + }, + Name: overlappingTopicName + "different-cr-name", // Note: This information is relevant to this particular test case. + Namespace: kubectlOptions.Namespace, + Partitions: ptr.To(int32(2)), + ReplicationFactor: ptr.To(int32(2)), + TopicName: overlappingTopicName, // Note: This is a deliberately inserted error for this test case. + } + err := applyK8sResourceFromTemplate_2( + kubectlOptions, + kafkaTopicTemplate, + caseData, + dryRunStrategyArgServer, + ) + Expect(err).To(HaveOccurred()) + // Example error: + // error while running command: exit status 1; The KafkaTopic "topic-test-internal-different-cr-name" is invalid: spec.name: Invalid value: "topic-test-internal": kafkaTopic CR 'topic-test-internal' in namesapce 'kafka' is already referencing to Kafka topic 'topic-test-internal' + Expect(len(strings.Split(err.Error(), "\n"))).To(Equal(1)) + Expect(err.Error()).To( + ContainSubstring("The KafkaTopic %[1]q is invalid: spec.name: Invalid value: %[2]q: kafkaTopic CR '%[2]s' in namesapce '%[3]s' is already referencing to Kafka topic '%[2]s'", + caseData.Name, overlappingTopicName, caseData.Namespace), + ) + + By("Without managedBy koperator annotation") + caseData = kafkaTopicTemplateData{ + Annotations: nil, // Note: This is a deliberately inserted error for this test case. + ClusterRef: kafkaTopicClusterRef{ + Name: kafkaCluster.Name, + Namespace: kafkaCluster.Namespace, + }, + Name: overlappingTopicName + "different-cr-name", // Note: This information is relevant to this particular test case. + Namespace: kubectlOptions.Namespace, + Partitions: ptr.To(int32(2)), + ReplicationFactor: ptr.To(int32(2)), + TopicName: overlappingTopicName, // Note: This is a deliberately inserted error for this test case. + } + err = applyK8sResourceFromTemplate_2( + kubectlOptions, + kafkaTopicTemplate, + caseData, + dryRunStrategyArgServer, + ) + Expect(err).To(HaveOccurred()) + // Example error: + // error while running command: exit status 1; The KafkaTopic "topic-test-internal-different-cr-name" is invalid: + // * spec.name: Invalid value: "topic-test-internal": kafkaTopic CR 'topic-test-internal' in namesapce 'kafka' is already referencing to Kafka topic 'topic-test-internal' + // * spec.name: Invalid value: "topic-test-internal": topic "topic-test-internal" already exists on kafka cluster and it is not managed by Koperator, + // if you want it to be managed by Koperator so you can modify its configurations through a KafkaTopic CR, + // add this "managedBy: koperator" annotation to this KafkaTopic CR + Expect(len(strings.Split(err.Error(), "\n"))).To(Equal(5)) + Expect(err.Error()).To(And( + ContainSubstring("The KafkaTopic %q is invalid:", caseData.Name), + ContainSubstring("spec.name: Invalid value: %[1]q: kafkaTopic CR '%[1]s' in namesapce '%[2]s' is already referencing to Kafka topic '%[1]s'", + overlappingTopicName, caseData.Namespace), + ContainSubstring("spec.name: Invalid value: %[1]q: topic %[1]q already exists on kafka cluster and it is not managed by Koperator", + overlappingTopicName), + )) + }) + requireDeleteKafkaTopic(kubectlOptions, overlappingTopicName) + }) + }) +} + +func testWebhookUpdateKafkaTopic_2(kubectlOptions k8s.KubectlOptions, kafkaCluster types.NamespacedName) bool { + return When("Testing KafkaTopic Update", func() { + BeforeAll(func() { + Expect(isExistingK8SResource(kubectlOptions, kafkaKind, kafkaCluster.Name)).To(BeTrue()) + }) + + const nonExistent string = "non-existent" + + // Update operation implies having a CR with the same name in place + requireDeployingKafkaTopic(kubectlOptions, testInternalTopicName) + + It("Test non-existent KafkaCluster", func() { + caseData := kafkaTopicTemplateData{ + Annotations: []string{"managedBy: koperator"}, + ClusterRef: kafkaTopicClusterRef{ + Name: nonExistent, // Note: This is a deliberately inserted error for this test case. + Namespace: kafkaCluster.Namespace, + }, + Name: testInternalTopicName, + Namespace: kubectlOptions.Namespace, + Partitions: ptr.To(int32(2)), + ReplicationFactor: ptr.To(int32(2)), + TopicName: testInternalTopicName, + } + err := applyK8sResourceFromTemplate_2( + kubectlOptions, + kafkaTopicTemplate, + caseData, + dryRunStrategyArgServer, + ) + Expect(err).To(HaveOccurred()) + // Example error: + // error while running command: exit status 1; The KafkaTopic "topic-test-internal" is invalid: spec.clusterRef.name: Invalid value: "non-existent": kafkaCluster 'non-existent' in the namespace 'kafka' does not exist + Expect(len(strings.Split(err.Error(), "\n"))).To(Equal(1)) + Expect(err.Error()).To( + ContainSubstring("The KafkaTopic %[1]q is invalid: spec.clusterRef.name: Invalid value: %[2]q: kafkaCluster '%[2]s' in the namespace '%[3]s' does not exist", + caseData.Name, caseData.ClusterRef.Name, caseData.Namespace), + ) + }) + + // A successfully created KafkaTopic CR cannot have 0 for either Partition or ReplicationFactor. + // At the same time, during an Update, a KafkaTopic cannot have its: + // * spec.partitions decreased + // * spec.replicationFactor changed (not just decreased) + // Consequently, an Update test for 0 values will automatically also cover the decreasing/changing scenarios. + It("Test 0 values partitions and replicationFactor", func() { + caseData := kafkaTopicTemplateData{ + Annotations: []string{"managedBy: koperator"}, + ClusterRef: kafkaTopicClusterRef{ + Name: kafkaCluster.Name, + Namespace: kafkaCluster.Namespace, + }, + Name: testInternalTopicName, + Namespace: kubectlOptions.Namespace, + Partitions: ptr.To(int32(0)), // Note: This is a deliberately inserted error for this test case. + ReplicationFactor: ptr.To(int32(0)), // Note: This is a deliberately inserted error for this test case. + TopicName: testInternalTopicName, + } + err := applyK8sResourceFromTemplate_2( + kubectlOptions, + kafkaTopicTemplate, + caseData, + dryRunStrategyArgServer, + ) + Expect(err).To(HaveOccurred()) + // Example error: + // error while running command: exit status 1; The KafkaTopic "topic-test-internal" is invalid: + // * spec.partitions: Invalid value: 0: number of partitions must be larger than 0 (or set it to be -1 to use the broker's default) + // * spec.replicationFactor: Invalid value: 0: replication factor must be larger than 0 (or set it to be -1 to use the broker's default) + // * spec.partitions: Invalid value: 0: kafka does not support decreasing partition count on an existing topic (from 2 to 0) + // * spec.replicationFactor: Invalid value: 0: kafka does not support changing the replication factor on an existing topic (from 2 to 0) + Expect(len(strings.Split(err.Error(), "\n"))).To(Equal(5)) + Expect(err.Error()).To(And( + ContainSubstring("The KafkaTopic %q is invalid:", caseData.Name), + ContainSubstring("spec.partitions: Invalid value: 0: number of partitions must be larger than 0"), + ContainSubstring("spec.replicationFactor: Invalid value: 0: replication factor must be larger than 0"), + ContainSubstring("spec.partitions: Invalid value: %d: kafka does not support decreasing partition count on an existing topic", ptr.Deref(caseData.Partitions, -100)), + ContainSubstring("spec.replicationFactor: Invalid value: %d: kafka does not support changing the replication factor on an existing topic", ptr.Deref(caseData.ReplicationFactor, -100)), + )) + }) + + It("Testing conflict on spec.name", func() { + var overlappingTopicName string = testInternalTopicName + var caseData kafkaTopicTemplateData + + By("With managedBy koperator annotation") + caseData = kafkaTopicTemplateData{ + Annotations: []string{"managedBy: koperator"}, // Note: This information is relevant to this particular test case. + ClusterRef: kafkaTopicClusterRef{ + Name: kafkaCluster.Name, + Namespace: kafkaCluster.Namespace, + }, + Name: overlappingTopicName + "different-cr-name", // Note: This information is relevant to this particular test case. + Namespace: kubectlOptions.Namespace, + Partitions: ptr.To(int32(2)), + ReplicationFactor: ptr.To(int32(2)), + TopicName: overlappingTopicName, // Note: This is a deliberately inserted error for this test case. + } + err := applyK8sResourceFromTemplate_2( + kubectlOptions, + kafkaTopicTemplate, + caseData, + dryRunStrategyArgServer, + ) + Expect(err).To(HaveOccurred()) + // Example error: + // error while running command: exit status 1; The KafkaTopic "topic-test-internal-different-cr-name" is invalid: spec.name: Invalid value: "topic-test-internal": kafkaTopic CR 'topic-test-internal' in namesapce 'kafka' is already referencing to Kafka topic 'topic-test-internal' + Expect(len(strings.Split(err.Error(), "\n"))).To(Equal(1)) + Expect(err.Error()).To( + ContainSubstring("The KafkaTopic %[1]q is invalid: spec.name: Invalid value: %[2]q: kafkaTopic CR '%[2]s' in namesapce '%[3]s' is already referencing to Kafka topic '%[2]s'", + caseData.Name, overlappingTopicName, caseData.Namespace), + ) + + By("Without managedBy koperator annotation") + caseData = kafkaTopicTemplateData{ + Annotations: nil, // Note: This is a deliberately inserted error for this test case. + ClusterRef: kafkaTopicClusterRef{ + Name: kafkaCluster.Name, + Namespace: kafkaCluster.Namespace, + }, + Name: overlappingTopicName + "different-cr-name", // Note: This information is relevant to this particular test case. + Namespace: kubectlOptions.Namespace, + Partitions: ptr.To(int32(2)), + ReplicationFactor: ptr.To(int32(2)), + TopicName: overlappingTopicName, // Note: This is a deliberately inserted error for this test case. + } + err = applyK8sResourceFromTemplate_2( + kubectlOptions, + kafkaTopicTemplate, + caseData, + dryRunStrategyArgServer, + ) + Expect(err).To(HaveOccurred()) + // Example error: + // error while running command: exit status 1; The KafkaTopic "topic-test-internal-different-cr-name" is invalid: + // * spec.name: Invalid value: "topic-test-internal": kafkaTopic CR 'topic-test-internal' in namesapce 'kafka' is already referencing to Kafka topic 'topic-test-internal' + // * spec.name: Invalid value: "topic-test-internal": topic "topic-test-internal" already exists on kafka cluster and it is not managed by Koperator, + // if you want it to be managed by Koperator so you can modify its configurations through a KafkaTopic CR, + // add this "managedBy: koperator" annotation to this KafkaTopic CR + Expect(len(strings.Split(err.Error(), "\n"))).To(Equal(5)) + Expect(err.Error()).To(And( + ContainSubstring("The KafkaTopic %q is invalid:", caseData.Name), + ContainSubstring("spec.name: Invalid value: %[1]q: kafkaTopic CR '%[1]s' in namesapce '%[2]s' is already referencing to Kafka topic '%[1]s'\n", + overlappingTopicName, caseData.Namespace), + ContainSubstring("spec.name: Invalid value: %[1]q: topic %[1]q already exists on kafka cluster and it is not managed by Koperator", + overlappingTopicName), + )) + }) + + // Clean up the KafkaTopic set up to test Update operations against + requireDeleteKafkaTopic(kubectlOptions, testInternalTopicName) + }) } func testWebhookCreateKafkaTopic(kubectlOptions k8s.KubectlOptions, kafkaCluster types.NamespacedName) bool { @@ -67,17 +398,17 @@ func testWebhookCreateKafkaTopic(kubectlOptions k8s.KubectlOptions, kafkaCluster ) Expect(err).To(HaveOccurred()) // Example error: - // error while running command: exit status 1; The KafkaTopic "topic-test-internal" is invalid: spec.clusterRef.name: Invalid value: "NON-EXISTENT": kafkaCluster 'NON-EXISTENT' in the namespace 'kafka' does not exist + // error while running command: exit status 1; The KafkaTopic "topic-test-internal" is invalid: spec.clusterRef.name: Invalid value: "non-existent": kafkaCluster 'non-existent' in the namespace 'kafka' does not exist Expect(len(strings.Split(err.Error(), "\n"))).To(Equal(1)) - Expect(err.Error()).To(And( + Expect(err.Error()).To( ContainSubstring("The KafkaTopic %[1]q is invalid: spec.clusterRef.name: Invalid value: %[2]q: kafkaCluster '%[2]s' in the namespace '%[3]s' does not exist", caseData["Name"], nonExistent, kubectlOptions.Namespace), - )) + ) }) It("Test 0 partitions and replicationFactor", func() { caseData := copyMapWithStringKeys(baseKafkaTopicTemplateValues) - caseData["Partition"] = "0" + caseData["Partitions"] = "0" caseData["ReplicationFactor"] = "0" err := applyK8sResourceFromTemplate( kubectlOptions, @@ -93,7 +424,7 @@ func testWebhookCreateKafkaTopic(kubectlOptions k8s.KubectlOptions, kafkaCluster Expect(len(strings.Split(err.Error(), "\n"))).To(Equal(3)) Expect(err.Error()).To(And( ContainSubstring("The KafkaTopic %q is invalid:", caseData["Name"]), - ContainSubstring("spec.partitions: Invalid value: %s: number of partitions must be larger than 0 (or set it to be -1 to use the broker's default)", caseData["Partition"]), + ContainSubstring("spec.partitions: Invalid value: %s: number of partitions must be larger than 0 (or set it to be -1 to use the broker's default)", caseData["Partitions"]), ContainSubstring("spec.replicationFactor: Invalid value: %s: replication factor must be larger than 0 (or set it to be -1 to use the broker's default)", caseData["ReplicationFactor"]), )) }) @@ -202,6 +533,7 @@ func testWebhookUpdateKafkaTopic(kubectlOptions k8s.KubectlOptions, kafkaCluster "Name": nonExistent, "Namespace": kafkaCluster.Namespace, } + err := applyK8sResourceFromTemplate( kubectlOptions, kafkaTopicTemplate, @@ -225,7 +557,7 @@ func testWebhookUpdateKafkaTopic(kubectlOptions k8s.KubectlOptions, kafkaCluster // Consequently, an Update test for 0 values will automatically also cover the decreasing/changing scenarios. It("Test 0 values partitions and replicationFactor", func() { caseData := copyMapWithStringKeys(baseKafkaTopicTemplateValues) - caseData["Partition"] = "0" + caseData["Partitions"] = "0" caseData["ReplicationFactor"] = "0" err := applyK8sResourceFromTemplate( kubectlOptions, @@ -245,7 +577,7 @@ func testWebhookUpdateKafkaTopic(kubectlOptions k8s.KubectlOptions, kafkaCluster ContainSubstring("The KafkaTopic %q is invalid:", caseData["Name"]), ContainSubstring("spec.partitions: Invalid value: 0: number of partitions must be larger than 0"), ContainSubstring("spec.replicationFactor: Invalid value: 0: replication factor must be larger than 0"), - ContainSubstring("spec.partitions: Invalid value: %s: kafka does not support decreasing partition count on an existing topic", caseData["Partition"]), + ContainSubstring("spec.partitions: Invalid value: %s: kafka does not support decreasing partition count on an existing topic", caseData["Partitions"]), ContainSubstring("spec.replicationFactor: Invalid value: %s: kafka does not support changing the replication factor on an existing topic", caseData["ReplicationFactor"]), )) }) @@ -309,12 +641,12 @@ func testWebhookUpdateKafkaTopic(kubectlOptions k8s.KubectlOptions, kafkaCluster }) } -func baseKafkaTopicData(kafkaTopic types.NamespacedName, kafkaCluster types.NamespacedName) map[string]interface{} { +func baseKafkaTopicData(kafkaTopic types.NamespacedName, kafkaCluster types.NamespacedName) map[string]interface{} { // string topic name first return map[string]interface{}{ "Name": kafkaTopic.Name, "TopicName": kafkaTopic.Name, "Namespace": kafkaTopic.Namespace, - "Partition": "2", + "Partitions": "2", "ReplicationFactor": "2", "ClusterRef": map[string]string{ "Name": kafkaCluster.Name, diff --git a/tests/e2e/koperator_suite_test.go b/tests/e2e/koperator_suite_test.go index 72051b048..57a9b15c7 100644 --- a/tests/e2e/koperator_suite_test.go +++ b/tests/e2e/koperator_suite_test.go @@ -55,18 +55,18 @@ var _ = BeforeSuite(func() { }) var _ = When("Testing e2e test altogether", Ordered, func() { - var snapshottedInfo = &clusterSnapshot{} - snapshotCluster(snapshottedInfo) - testInstall() - testInstallZookeeperCluster() - testInstallKafkaCluster("../../config/samples/simplekafkacluster.yaml") + // var snapshottedInfo = &clusterSnapshot{} + // snapshotCluster(snapshottedInfo) + // testInstall() + // testInstallZookeeperCluster() + // testInstallKafkaCluster("../../config/samples/simplekafkacluster.yaml") testWebhookKafkaTopic(types.NamespacedName{Name: kafkaClusterName, Namespace: koperatorLocalHelmDescriptor.Namespace}) - testProduceConsumeInternal() - testUninstallKafkaCluster() - testInstallKafkaCluster("../../config/samples/simplekafkacluster_ssl.yaml") - testProduceConsumeInternalSSL(defaultTLSSecretName) - testUninstallKafkaCluster() - testUninstallZookeeperCluster() - testUninstall() - snapshotClusterAndCompare(snapshottedInfo) + // testProduceConsumeInternal() + // testUninstallKafkaCluster() + // testInstallKafkaCluster("../../config/samples/simplekafkacluster_ssl.yaml") + // testProduceConsumeInternalSSL(defaultTLSSecretName) + // testUninstallKafkaCluster() + // testUninstallZookeeperCluster() + // testUninstall() + // snapshotClusterAndCompare(snapshottedInfo) }) diff --git a/tests/e2e/templates/topic.yaml.tmpl b/tests/e2e/templates/topic.yaml.tmpl index 3f2680bf8..50dd2035d 100644 --- a/tests/e2e/templates/topic.yaml.tmpl +++ b/tests/e2e/templates/topic.yaml.tmpl @@ -14,7 +14,7 @@ spec: name: {{or .ClusterRef.Name "kafka"}} namespace: {{or .ClusterRef.Namespace "kafka"}} name: {{ .TopicName }} - partitions: {{or .Partition 2}} + partitions: {{or .Partitions 2}} replicationFactor: {{or .ReplicationFactor 2}} config: "retention.ms": "604800000" From 651a2c8b51a32b161901bafaafdd30a14c32a3a0 Mon Sep 17 00:00:00 2001 From: Mihai Alexandrescu Date: Mon, 28 Aug 2023 16:46:23 +0300 Subject: [PATCH 8/9] update testcase logic; use local kafkatopic struct; use local tempfile function --- tests/e2e/k8s.go | 28 +-- tests/e2e/kafka.go | 10 +- tests/e2e/kafkatopic_webhook.go | 323 ++---------------------------- tests/e2e/koperator_suite_test.go | 26 +-- 4 files changed, 30 insertions(+), 357 deletions(-) diff --git a/tests/e2e/k8s.go b/tests/e2e/k8s.go index c1bd7b740..ccd7e8209 100644 --- a/tests/e2e/k8s.go +++ b/tests/e2e/k8s.go @@ -65,14 +65,9 @@ func applyK8sResourceManifest(kubectlOptions k8s.KubectlOptions, manifestPath st // applyK8sResourceManifestFromString applies the specified manifest in string format to the provided // kubectl context and namespace. func applyK8sResourceManifestFromString(kubectlOptions k8s.KubectlOptions, manifest string, extraArgs ...string) error { - // Replicating terratest's k8s.KubectlApplyFromStringE but with the possibility of a variadic argument that allows options like --dry-run - // - // TODO: look for a different implementation for temp files because terratest's version uses the composite test name to generate - // the temp file name which, in our case, includes all the descriptive Ginkgo statements (which are by design quite verbose). - // That can lead to erroring out on temp file creation based on the file name being too long. - tmpfile, err := k8s.StoreConfigToTempFileE(GinkgoT(), manifest) + tmpfile, err := createTempFileFromBytes([]byte(manifest), "", "", 0) if err != nil { - return err + return fmt.Errorf("storing provided manifest data into temp file failed: %w", err) } defer os.Remove(tmpfile) @@ -81,7 +76,7 @@ func applyK8sResourceManifestFromString(kubectlOptions k8s.KubectlOptions, manif // applyK8sResourceFromTemplate generates manifest from the specified go-template based on values // and applies the specified manifest to the provided kubectl context and namespace. -func applyK8sResourceFromTemplate(kubectlOptions k8s.KubectlOptions, templateFile string, values map[string]any, extraArgs ...string) error { +func applyK8sResourceFromTemplate(kubectlOptions k8s.KubectlOptions, templateFile string, values any, extraArgs ...string) error { By(fmt.Sprintf("Generating k8s manifest from template %s", templateFile)) var manifest bytes.Buffer rawTemplate, err := os.ReadFile(templateFile) @@ -93,24 +88,7 @@ func applyK8sResourceFromTemplate(kubectlOptions k8s.KubectlOptions, templateFil if err != nil { return err } - return applyK8sResourceManifestFromString(kubectlOptions, manifest.String(), extraArgs...) -} -// applyK8sResourceFromTemplate generates manifest from the specified go-template based on values -// and applies the specified manifest to the provided kubectl context and namespace. -func applyK8sResourceFromTemplate_2(kubectlOptions k8s.KubectlOptions, templateFile string, values any, extraArgs ...string) error { - By(fmt.Sprintf("Generating k8s manifest from template %s", templateFile)) - var manifest bytes.Buffer - rawTemplate, err := os.ReadFile(templateFile) - if err != nil { - return err - } - t := template.Must(template.New("template").Funcs(sprig.TxtFuncMap()).Parse(string(rawTemplate))) - err = t.Execute(&manifest, values) - if err != nil { - return err - } - fmt.Printf("###\nManifest is :\n%s\n###\n", manifest.String()) return applyK8sResourceManifestFromString(kubectlOptions, manifest.String(), extraArgs...) } diff --git a/tests/e2e/kafka.go b/tests/e2e/kafka.go index 316c95fcc..1561a45fa 100644 --- a/tests/e2e/kafka.go +++ b/tests/e2e/kafka.go @@ -35,14 +35,6 @@ func requireDeleteKafkaTopic(kubectlOptions k8s.KubectlOptions, topicName string // requireDeployingKafkaTopic deploys a kafkaTopic resource from a template func requireDeployingKafkaTopic(kubectlOptions k8s.KubectlOptions, topicName string) { It("Deploying KafkaTopic CR", func() { - // err := applyK8sResourceFromTemplate(kubectlOptions, - // kafkaTopicTemplate, - // map[string]interface{}{ - // "Name": topicName, - // "TopicName": topicName, - // "Namespace": kubectlOptions.Namespace, - // }, - // ) values := kafkaTopicTemplateData{ Annotations: []string{"managedBy: koperator"}, ClusterRef: kafkaTopicClusterRef{ @@ -56,7 +48,7 @@ func requireDeployingKafkaTopic(kubectlOptions k8s.KubectlOptions, topicName str TopicName: topicName, } - err := applyK8sResourceFromTemplate_2(kubectlOptions, kafkaTopicTemplate, values) + err := applyK8sResourceFromTemplate(kubectlOptions, kafkaTopicTemplate, values) Expect(err).ShouldNot(HaveOccurred()) diff --git a/tests/e2e/kafkatopic_webhook.go b/tests/e2e/kafkatopic_webhook.go index be44c3e12..d4e880ee3 100644 --- a/tests/e2e/kafkatopic_webhook.go +++ b/tests/e2e/kafkatopic_webhook.go @@ -15,7 +15,6 @@ package e2e import ( - "fmt" "strings" "github.com/gruntwork-io/terratest/modules/k8s" @@ -36,14 +35,11 @@ func testWebhookKafkaTopic(kafkaCluster types.NamespacedName) { kubectlOptions.Namespace = kafkaCluster.Namespace - //testWebhookCreateKafkaTopic(kubectlOptions, kafkaCluster) - //testWebhookUpdateKafkaTopic(kubectlOptions, kafkaCluster) - - testWebhookCreateKafkaTopic_2(kubectlOptions, kafkaCluster) - testWebhookUpdateKafkaTopic_2(kubectlOptions, kafkaCluster) + testWebhookCreateKafkaTopic(kubectlOptions, kafkaCluster) + testWebhookUpdateKafkaTopic(kubectlOptions, kafkaCluster) } -func testWebhookCreateKafkaTopic_2(kubectlOptions k8s.KubectlOptions, kafkaCluster types.NamespacedName) bool { +func testWebhookCreateKafkaTopic(kubectlOptions k8s.KubectlOptions, kafkaCluster types.NamespacedName) bool { return When("Testing KafkaTopic Create", func() { BeforeAll(func() { Expect(isExistingK8SResource(kubectlOptions, kafkaKind, kafkaCluster.Name)).To(BeTrue()) @@ -64,7 +60,7 @@ func testWebhookCreateKafkaTopic_2(kubectlOptions k8s.KubectlOptions, kafkaClust ReplicationFactor: ptr.To(int32(2)), TopicName: testInternalTopicName, } - err := applyK8sResourceFromTemplate_2( + err := applyK8sResourceFromTemplate( kubectlOptions, kafkaTopicTemplate, caseData, @@ -93,7 +89,7 @@ func testWebhookCreateKafkaTopic_2(kubectlOptions k8s.KubectlOptions, kafkaClust ReplicationFactor: ptr.To(int32(0)), // Note: This is a deliberately inserted error for this test case. TopicName: testInternalTopicName, } - err := applyK8sResourceFromTemplate_2( + err := applyK8sResourceFromTemplate( kubectlOptions, kafkaTopicTemplate, caseData, @@ -126,7 +122,7 @@ func testWebhookCreateKafkaTopic_2(kubectlOptions k8s.KubectlOptions, kafkaClust ReplicationFactor: ptr.To(int32(10)), // Note: This is a deliberately inserted error for this test case. TopicName: testInternalTopicName, } - err := applyK8sResourceFromTemplate_2( + err := applyK8sResourceFromTemplate( kubectlOptions, kafkaTopicTemplate, caseData, @@ -162,7 +158,7 @@ func testWebhookCreateKafkaTopic_2(kubectlOptions k8s.KubectlOptions, kafkaClust ReplicationFactor: ptr.To(int32(2)), TopicName: overlappingTopicName, // Note: This is a deliberately inserted error for this test case. } - err := applyK8sResourceFromTemplate_2( + err := applyK8sResourceFromTemplate( kubectlOptions, kafkaTopicTemplate, caseData, @@ -190,7 +186,7 @@ func testWebhookCreateKafkaTopic_2(kubectlOptions k8s.KubectlOptions, kafkaClust ReplicationFactor: ptr.To(int32(2)), TopicName: overlappingTopicName, // Note: This is a deliberately inserted error for this test case. } - err = applyK8sResourceFromTemplate_2( + err = applyK8sResourceFromTemplate( kubectlOptions, kafkaTopicTemplate, caseData, @@ -217,7 +213,7 @@ func testWebhookCreateKafkaTopic_2(kubectlOptions k8s.KubectlOptions, kafkaClust }) } -func testWebhookUpdateKafkaTopic_2(kubectlOptions k8s.KubectlOptions, kafkaCluster types.NamespacedName) bool { +func testWebhookUpdateKafkaTopic(kubectlOptions k8s.KubectlOptions, kafkaCluster types.NamespacedName) bool { return When("Testing KafkaTopic Update", func() { BeforeAll(func() { Expect(isExistingK8SResource(kubectlOptions, kafkaKind, kafkaCluster.Name)).To(BeTrue()) @@ -241,7 +237,7 @@ func testWebhookUpdateKafkaTopic_2(kubectlOptions k8s.KubectlOptions, kafkaClust ReplicationFactor: ptr.To(int32(2)), TopicName: testInternalTopicName, } - err := applyK8sResourceFromTemplate_2( + err := applyK8sResourceFromTemplate( kubectlOptions, kafkaTopicTemplate, caseData, @@ -275,7 +271,7 @@ func testWebhookUpdateKafkaTopic_2(kubectlOptions k8s.KubectlOptions, kafkaClust ReplicationFactor: ptr.To(int32(0)), // Note: This is a deliberately inserted error for this test case. TopicName: testInternalTopicName, } - err := applyK8sResourceFromTemplate_2( + err := applyK8sResourceFromTemplate( kubectlOptions, kafkaTopicTemplate, caseData, @@ -315,7 +311,7 @@ func testWebhookUpdateKafkaTopic_2(kubectlOptions k8s.KubectlOptions, kafkaClust ReplicationFactor: ptr.To(int32(2)), TopicName: overlappingTopicName, // Note: This is a deliberately inserted error for this test case. } - err := applyK8sResourceFromTemplate_2( + err := applyK8sResourceFromTemplate( kubectlOptions, kafkaTopicTemplate, caseData, @@ -343,7 +339,7 @@ func testWebhookUpdateKafkaTopic_2(kubectlOptions k8s.KubectlOptions, kafkaClust ReplicationFactor: ptr.To(int32(2)), TopicName: overlappingTopicName, // Note: This is a deliberately inserted error for this test case. } - err = applyK8sResourceFromTemplate_2( + err = applyK8sResourceFromTemplate( kubectlOptions, kafkaTopicTemplate, caseData, @@ -370,296 +366,3 @@ func testWebhookUpdateKafkaTopic_2(kubectlOptions k8s.KubectlOptions, kafkaClust requireDeleteKafkaTopic(kubectlOptions, testInternalTopicName) }) } - -func testWebhookCreateKafkaTopic(kubectlOptions k8s.KubectlOptions, kafkaCluster types.NamespacedName) bool { - return When("Testing KafkaTopic Create", func() { - BeforeAll(func() { - Expect(isExistingK8SResource(kubectlOptions, kafkaKind, kafkaClusterName)).To(BeTrue()) - }) - - const nonExistent string = "non-existent" - - baseKafkaTopicTemplateValues := baseKafkaTopicData( - types.NamespacedName{Name: testInternalTopicName, Namespace: kubectlOptions.Namespace}, - kafkaCluster, - ) - - It("Test non-existent KafkaCluster", func() { - caseData := copyMapWithStringKeys(baseKafkaTopicTemplateValues) - caseData["ClusterRef"] = map[string]string{ - "Name": nonExistent, - "Namespace": kafkaCluster.Namespace, - } - err := applyK8sResourceFromTemplate( - kubectlOptions, - kafkaTopicTemplate, - caseData, - dryRunStrategyArgServer, - ) - Expect(err).To(HaveOccurred()) - // Example error: - // error while running command: exit status 1; The KafkaTopic "topic-test-internal" is invalid: spec.clusterRef.name: Invalid value: "non-existent": kafkaCluster 'non-existent' in the namespace 'kafka' does not exist - Expect(len(strings.Split(err.Error(), "\n"))).To(Equal(1)) - Expect(err.Error()).To( - ContainSubstring("The KafkaTopic %[1]q is invalid: spec.clusterRef.name: Invalid value: %[2]q: kafkaCluster '%[2]s' in the namespace '%[3]s' does not exist", - caseData["Name"], nonExistent, kubectlOptions.Namespace), - ) - }) - - It("Test 0 partitions and replicationFactor", func() { - caseData := copyMapWithStringKeys(baseKafkaTopicTemplateValues) - caseData["Partitions"] = "0" - caseData["ReplicationFactor"] = "0" - err := applyK8sResourceFromTemplate( - kubectlOptions, - kafkaTopicTemplate, - caseData, - dryRunStrategyArgServer, - ) - Expect(err).To(HaveOccurred()) - // Example error: - // error while running command: exit status 1; The KafkaTopic "topic-test-internal" is invalid: - // * spec.partitions: Invalid value: 0: number of partitions must be larger than 0 (or set it to be -1 to use the broker's default) - // * spec.replicationFactor: Invalid value: 0: replication factor must be larger than 0 (or set it to be -1 to use the broker's default) - Expect(len(strings.Split(err.Error(), "\n"))).To(Equal(3)) - Expect(err.Error()).To(And( - ContainSubstring("The KafkaTopic %q is invalid:", caseData["Name"]), - ContainSubstring("spec.partitions: Invalid value: %s: number of partitions must be larger than 0 (or set it to be -1 to use the broker's default)", caseData["Partitions"]), - ContainSubstring("spec.replicationFactor: Invalid value: %s: replication factor must be larger than 0 (or set it to be -1 to use the broker's default)", caseData["ReplicationFactor"]), - )) - }) - - // In the current validation webhook implementation, this case can only be encountered on a Create operation - It("Test ReplicationFactor larger than number of brokers", func() { - caseData := copyMapWithStringKeys(baseKafkaTopicTemplateValues) - caseData["ReplicationFactor"] = "10" - err := applyK8sResourceFromTemplate( - kubectlOptions, - kafkaTopicTemplate, - caseData, - dryRunStrategyArgServer, - ) - Expect(err).To(HaveOccurred()) - // Example error: - // error while running command: exit status 1; The KafkaTopic "topic-test-internal" is invalid: spec.replicationFactor: Invalid value: 10: replication factor is larger than the number of nodes in the kafka cluster (available brokers: 3) - Expect(len(strings.Split(err.Error(), "\n"))).To(Equal(1)) - Expect(err.Error()).To( - ContainSubstring("The KafkaTopic %[1]q is invalid: spec.replicationFactor: Invalid value: %[2]s: replication factor is larger than the number of nodes in the kafka cluster", - caseData["Name"], caseData["ReplicationFactor"]), - ) - }) - - // Test case involving existing CRs but not necessarily an Update operation - When("Testing conflicts similar CRs", Ordered, func() { - requireDeployingKafkaTopic(kubectlOptions, testInternalTopicName) - - It("Testing conflict on spec.name", func() { - caseData := copyMapWithStringKeys(baseKafkaTopicTemplateValues) - - switch v := caseData["Name"].(type) { - case string: - caseData["Name"] = v + "-different-cr-name" - case fmt.Stringer: - caseData["Name"] = v.String() + "-different-cr-name" - default: - caseData["Name"] = nonExistent - } - - By("With managedBy koperator annotation") - caseData["Annotations"] = []string{"managedBy: koperator"} - err := applyK8sResourceFromTemplate( - kubectlOptions, - kafkaTopicTemplate, - caseData, - dryRunStrategyArgServer, - ) - Expect(err).To(HaveOccurred()) - // Example error: - // error while running command: exit status 1; The KafkaTopic "topic-test-internal-different-cr-name" is invalid: spec.name: Invalid value: "topic-test-internal": kafkaTopic CR 'topic-test-internal' in namesapce 'kafka' is already referencing to Kafka topic 'topic-test-internal' - Expect(len(strings.Split(err.Error(), "\n"))).To(Equal(1)) - Expect(err.Error()).To( - ContainSubstring("The KafkaTopic %[1]q is invalid: spec.name: Invalid value: %[2]q: kafkaTopic CR '%[2]s' in namesapce '%[3]s' is already referencing to Kafka topic '%[2]s'", - caseData["Name"], testInternalTopicName, kubectlOptions.Namespace), - ) - - By("Without managedBy koperator annotation") - caseData["Annotations"] = []string{} - err = applyK8sResourceFromTemplate( - kubectlOptions, - kafkaTopicTemplate, - caseData, - dryRunStrategyArgServer, - ) - Expect(err).To(HaveOccurred()) - // Example error: - // error while running command: exit status 1; The KafkaTopic "topic-test-internal-different-cr-name" is invalid: - // * spec.name: Invalid value: "topic-test-internal": kafkaTopic CR 'topic-test-internal' in namesapce 'kafka' is already referencing to Kafka topic 'topic-test-internal' - // * spec.name: Invalid value: "topic-test-internal": topic "topic-test-internal" already exists on kafka cluster and it is not managed by Koperator, - // if you want it to be managed by Koperator so you can modify its configurations through a KafkaTopic CR, - // add this "managedBy: koperator" annotation to this KafkaTopic CR - Expect(len(strings.Split(err.Error(), "\n"))).To(Equal(5)) - Expect(err.Error()).To(And( - ContainSubstring("The KafkaTopic %q is invalid:", caseData["Name"]), - ContainSubstring("spec.name: Invalid value: %[1]q: kafkaTopic CR '%[1]s' in namesapce '%[2]s' is already referencing to Kafka topic '%[1]s'", - testInternalTopicName, kubectlOptions.Namespace), - ContainSubstring("spec.name: Invalid value: %[1]q: topic %[1]q already exists on kafka cluster and it is not managed by Koperator", - testInternalTopicName), - )) - }) - requireDeleteKafkaTopic(kubectlOptions, testInternalTopicName) - }) - }) -} - -func testWebhookUpdateKafkaTopic(kubectlOptions k8s.KubectlOptions, kafkaCluster types.NamespacedName) bool { - return When("Testing KafkaTopic Update", func() { - BeforeAll(func() { - Expect(isExistingK8SResource(kubectlOptions, kafkaKind, kafkaClusterName)).To(BeTrue()) - }) - - const nonExistent string = "non-existent" - - baseKafkaTopicTemplateValues := baseKafkaTopicData( - types.NamespacedName{Name: testInternalTopicName, Namespace: kubectlOptions.Namespace}, - kafkaCluster, - ) - - // Update operation implies having a CR with the same name in place - requireDeployingKafkaTopic(kubectlOptions, testInternalTopicName) - - It("Test non-existent KafkaCluster", func() { - caseData := copyMapWithStringKeys(baseKafkaTopicTemplateValues) - caseData["ClusterRef"] = map[string]string{ - "Name": nonExistent, - "Namespace": kafkaCluster.Namespace, - } - - err := applyK8sResourceFromTemplate( - kubectlOptions, - kafkaTopicTemplate, - caseData, - dryRunStrategyArgServer, - ) - Expect(err).To(HaveOccurred()) - // Example error: - // error while running command: exit status 1; The KafkaTopic "topic-test-internal" is invalid: spec.clusterRef.name: Invalid value: "non-existent": kafkaCluster 'non-existent' in the namespace 'kafka' does not exist - Expect(len(strings.Split(err.Error(), "\n"))).To(Equal(1)) - Expect(err.Error()).To( - ContainSubstring("The KafkaTopic %[1]q is invalid: spec.clusterRef.name: Invalid value: %[2]q: kafkaCluster '%[2]s' in the namespace '%[3]s' does not exist", - caseData["Name"], nonExistent, kubectlOptions.Namespace), - ) - }) - - // A successfully created KafkaTopic CR cannot have 0 for either Partition or ReplicationFactor. - // At the same time, during an Update, a KafkaTopic cannot have its: - // * spec.partitions decreased - // * spec.replicationFactor changed (not just decreased) - // Consequently, an Update test for 0 values will automatically also cover the decreasing/changing scenarios. - It("Test 0 values partitions and replicationFactor", func() { - caseData := copyMapWithStringKeys(baseKafkaTopicTemplateValues) - caseData["Partitions"] = "0" - caseData["ReplicationFactor"] = "0" - err := applyK8sResourceFromTemplate( - kubectlOptions, - kafkaTopicTemplate, - caseData, - dryRunStrategyArgServer, - ) - Expect(err).To(HaveOccurred()) - // Example error: - // error while running command: exit status 1; The KafkaTopic "topic-test-internal" is invalid: - // * spec.partitions: Invalid value: 0: number of partitions must be larger than 0 (or set it to be -1 to use the broker's default) - // * spec.replicationFactor: Invalid value: 0: replication factor must be larger than 0 (or set it to be -1 to use the broker's default) - // * spec.partitions: Invalid value: 0: kafka does not support decreasing partition count on an existing topic (from 2 to 0) - // * spec.replicationFactor: Invalid value: 0: kafka does not support changing the replication factor on an existing topic (from 2 to 0) - Expect(len(strings.Split(err.Error(), "\n"))).To(Equal(5)) - Expect(err.Error()).To(And( - ContainSubstring("The KafkaTopic %q is invalid:", caseData["Name"]), - ContainSubstring("spec.partitions: Invalid value: 0: number of partitions must be larger than 0"), - ContainSubstring("spec.replicationFactor: Invalid value: 0: replication factor must be larger than 0"), - ContainSubstring("spec.partitions: Invalid value: %s: kafka does not support decreasing partition count on an existing topic", caseData["Partitions"]), - ContainSubstring("spec.replicationFactor: Invalid value: %s: kafka does not support changing the replication factor on an existing topic", caseData["ReplicationFactor"]), - )) - }) - - It("Testing conflict on spec.name", func() { - caseData := copyMapWithStringKeys(baseKafkaTopicTemplateValues) - - switch v := caseData["Name"].(type) { - case string: - caseData["Name"] = v + "-different-cr-name" - case fmt.Stringer: - caseData["Name"] = v.String() + "-different-cr-name" - default: - caseData["Name"] = nonExistent - } - - By("With managedBy koperator annotation") - caseData["Annotations"] = []string{"managedBy: koperator"} - err := applyK8sResourceFromTemplate( - kubectlOptions, - kafkaTopicTemplate, - caseData, - dryRunStrategyArgServer, - ) - Expect(err).To(HaveOccurred()) - // Example error: - // error while running command: exit status 1; The KafkaTopic "topic-test-internal-different-cr-name" is invalid: spec.name: Invalid value: "topic-test-internal": kafkaTopic CR 'topic-test-internal' in namesapce 'kafka' is already referencing to Kafka topic 'topic-test-internal' - Expect(len(strings.Split(err.Error(), "\n"))).To(Equal(1)) - Expect(err.Error()).To( - ContainSubstring("The KafkaTopic %[1]q is invalid: spec.name: Invalid value: %[2]q: kafkaTopic CR '%[2]s' in namesapce '%[3]s' is already referencing to Kafka topic '%[2]s'", - caseData["Name"], testInternalTopicName, kubectlOptions.Namespace), - ) - - By("Without managedBy koperator annotation") - caseData["Annotations"] = []string{} - err = applyK8sResourceFromTemplate( - kubectlOptions, - kafkaTopicTemplate, - caseData, - dryRunStrategyArgServer, - ) - Expect(err).To(HaveOccurred()) - // Example error: - // error while running command: exit status 1; The KafkaTopic "topic-test-internal-different-cr-name" is invalid: - // * spec.name: Invalid value: "topic-test-internal": kafkaTopic CR 'topic-test-internal' in namesapce 'kafka' is already referencing to Kafka topic 'topic-test-internal' - // * spec.name: Invalid value: "topic-test-internal": topic "topic-test-internal" already exists on kafka cluster and it is not managed by Koperator, - // if you want it to be managed by Koperator so you can modify its configurations through a KafkaTopic CR, - // add this "managedBy: koperator" annotation to this KafkaTopic CR - Expect(len(strings.Split(err.Error(), "\n"))).To(Equal(5)) - Expect(err.Error()).To(And( - ContainSubstring("The KafkaTopic %q is invalid:", caseData["Name"]), - ContainSubstring("spec.name: Invalid value: %[1]q: kafkaTopic CR '%[1]s' in namesapce '%[2]s' is already referencing to Kafka topic '%[1]s'\n", - testInternalTopicName, kubectlOptions.Namespace), - ContainSubstring("spec.name: Invalid value: %[1]q: topic %[1]q already exists on kafka cluster and it is not managed by Koperator", - testInternalTopicName), - )) - }) - - // Clean up the KafkaTopic set up to test Update operations against - requireDeleteKafkaTopic(kubectlOptions, testInternalTopicName) - }) -} - -func baseKafkaTopicData(kafkaTopic types.NamespacedName, kafkaCluster types.NamespacedName) map[string]interface{} { // string topic name first - return map[string]interface{}{ - "Name": kafkaTopic.Name, - "TopicName": kafkaTopic.Name, - "Namespace": kafkaTopic.Namespace, - "Partitions": "2", - "ReplicationFactor": "2", - "ClusterRef": map[string]string{ - "Name": kafkaCluster.Name, - "Namespace": kafkaCluster.Namespace, - }, - "Annotations": []string{"managedBy: koperator"}, - } -} - -func copyMapWithStringKeys(oldMap map[string]interface{}) map[string]interface{} { - var newMap = make(map[string]interface{}) - for k := range oldMap { - newMap[k] = oldMap[k] - } - return newMap -} diff --git a/tests/e2e/koperator_suite_test.go b/tests/e2e/koperator_suite_test.go index 57a9b15c7..72051b048 100644 --- a/tests/e2e/koperator_suite_test.go +++ b/tests/e2e/koperator_suite_test.go @@ -55,18 +55,18 @@ var _ = BeforeSuite(func() { }) var _ = When("Testing e2e test altogether", Ordered, func() { - // var snapshottedInfo = &clusterSnapshot{} - // snapshotCluster(snapshottedInfo) - // testInstall() - // testInstallZookeeperCluster() - // testInstallKafkaCluster("../../config/samples/simplekafkacluster.yaml") + var snapshottedInfo = &clusterSnapshot{} + snapshotCluster(snapshottedInfo) + testInstall() + testInstallZookeeperCluster() + testInstallKafkaCluster("../../config/samples/simplekafkacluster.yaml") testWebhookKafkaTopic(types.NamespacedName{Name: kafkaClusterName, Namespace: koperatorLocalHelmDescriptor.Namespace}) - // testProduceConsumeInternal() - // testUninstallKafkaCluster() - // testInstallKafkaCluster("../../config/samples/simplekafkacluster_ssl.yaml") - // testProduceConsumeInternalSSL(defaultTLSSecretName) - // testUninstallKafkaCluster() - // testUninstallZookeeperCluster() - // testUninstall() - // snapshotClusterAndCompare(snapshottedInfo) + testProduceConsumeInternal() + testUninstallKafkaCluster() + testInstallKafkaCluster("../../config/samples/simplekafkacluster_ssl.yaml") + testProduceConsumeInternalSSL(defaultTLSSecretName) + testUninstallKafkaCluster() + testUninstallZookeeperCluster() + testUninstall() + snapshotClusterAndCompare(snapshottedInfo) }) From d21676d7830ac254be0829538ab7e09db37f125a Mon Sep 17 00:00:00 2001 From: Mihai Alexandrescu Date: Thu, 31 Aug 2023 11:53:46 +0300 Subject: [PATCH 9/9] remove kafkaCluster parameter from testWebhookKafkaTopic() --- tests/e2e/kafkatopic_webhook.go | 25 +++++++++++++------------ tests/e2e/koperator_suite_test.go | 3 +-- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/tests/e2e/kafkatopic_webhook.go b/tests/e2e/kafkatopic_webhook.go index d4e880ee3..859faf02a 100644 --- a/tests/e2e/kafkatopic_webhook.go +++ b/tests/e2e/kafkatopic_webhook.go @@ -24,7 +24,7 @@ import ( "k8s.io/utils/ptr" ) -func testWebhookKafkaTopic(kafkaCluster types.NamespacedName) { +func testWebhookKafkaTopic() { // temporary section; to be refactored after kubeconfig injection PR var kubectlOptions k8s.KubectlOptions var err error @@ -33,7 +33,8 @@ func testWebhookKafkaTopic(kafkaCluster types.NamespacedName) { GinkgoT().Fail() } - kubectlOptions.Namespace = kafkaCluster.Namespace + kubectlOptions.Namespace = koperatorLocalHelmDescriptor.Namespace + kafkaCluster := types.NamespacedName{Name: kafkaClusterName, Namespace: koperatorLocalHelmDescriptor.Namespace} testWebhookCreateKafkaTopic(kubectlOptions, kafkaCluster) testWebhookUpdateKafkaTopic(kubectlOptions, kafkaCluster) @@ -152,7 +153,7 @@ func testWebhookCreateKafkaTopic(kubectlOptions k8s.KubectlOptions, kafkaCluster Name: kafkaCluster.Name, Namespace: kafkaCluster.Namespace, }, - Name: overlappingTopicName + "different-cr-name", // Note: This information is relevant to this particular test case. + Name: overlappingTopicName + "-different-cr-name", // Note: This information is relevant to this particular test case. Namespace: kubectlOptions.Namespace, Partitions: ptr.To(int32(2)), ReplicationFactor: ptr.To(int32(2)), @@ -166,10 +167,10 @@ func testWebhookCreateKafkaTopic(kubectlOptions k8s.KubectlOptions, kafkaCluster ) Expect(err).To(HaveOccurred()) // Example error: - // error while running command: exit status 1; The KafkaTopic "topic-test-internal-different-cr-name" is invalid: spec.name: Invalid value: "topic-test-internal": kafkaTopic CR 'topic-test-internal' in namesapce 'kafka' is already referencing to Kafka topic 'topic-test-internal' + // error while running command: exit status 1; The KafkaTopic "topic-test-internal-different-cr-name" is invalid: spec.name: Invalid value: "topic-test-internal": kafkaTopic CR 'topic-test-internal' in namespace 'kafka' is already referencing to Kafka topic 'topic-test-internal' Expect(len(strings.Split(err.Error(), "\n"))).To(Equal(1)) Expect(err.Error()).To( - ContainSubstring("The KafkaTopic %[1]q is invalid: spec.name: Invalid value: %[2]q: kafkaTopic CR '%[2]s' in namesapce '%[3]s' is already referencing to Kafka topic '%[2]s'", + ContainSubstring("The KafkaTopic %[1]q is invalid: spec.name: Invalid value: %[2]q: kafkaTopic CR '%[2]s' in namespace '%[3]s' is already referencing to Kafka topic '%[2]s'", caseData.Name, overlappingTopicName, caseData.Namespace), ) @@ -180,7 +181,7 @@ func testWebhookCreateKafkaTopic(kubectlOptions k8s.KubectlOptions, kafkaCluster Name: kafkaCluster.Name, Namespace: kafkaCluster.Namespace, }, - Name: overlappingTopicName + "different-cr-name", // Note: This information is relevant to this particular test case. + Name: overlappingTopicName + "-different-cr-name", // Note: This information is relevant to this particular test case. Namespace: kubectlOptions.Namespace, Partitions: ptr.To(int32(2)), ReplicationFactor: ptr.To(int32(2)), @@ -195,14 +196,14 @@ func testWebhookCreateKafkaTopic(kubectlOptions k8s.KubectlOptions, kafkaCluster Expect(err).To(HaveOccurred()) // Example error: // error while running command: exit status 1; The KafkaTopic "topic-test-internal-different-cr-name" is invalid: - // * spec.name: Invalid value: "topic-test-internal": kafkaTopic CR 'topic-test-internal' in namesapce 'kafka' is already referencing to Kafka topic 'topic-test-internal' + // * spec.name: Invalid value: "topic-test-internal": kafkaTopic CR 'topic-test-internal' in namespace 'kafka' is already referencing to Kafka topic 'topic-test-internal' // * spec.name: Invalid value: "topic-test-internal": topic "topic-test-internal" already exists on kafka cluster and it is not managed by Koperator, // if you want it to be managed by Koperator so you can modify its configurations through a KafkaTopic CR, // add this "managedBy: koperator" annotation to this KafkaTopic CR Expect(len(strings.Split(err.Error(), "\n"))).To(Equal(5)) Expect(err.Error()).To(And( ContainSubstring("The KafkaTopic %q is invalid:", caseData.Name), - ContainSubstring("spec.name: Invalid value: %[1]q: kafkaTopic CR '%[1]s' in namesapce '%[2]s' is already referencing to Kafka topic '%[1]s'", + ContainSubstring("spec.name: Invalid value: %[1]q: kafkaTopic CR '%[1]s' in namespace '%[2]s' is already referencing to Kafka topic '%[1]s'", overlappingTopicName, caseData.Namespace), ContainSubstring("spec.name: Invalid value: %[1]q: topic %[1]q already exists on kafka cluster and it is not managed by Koperator", overlappingTopicName), @@ -319,10 +320,10 @@ func testWebhookUpdateKafkaTopic(kubectlOptions k8s.KubectlOptions, kafkaCluster ) Expect(err).To(HaveOccurred()) // Example error: - // error while running command: exit status 1; The KafkaTopic "topic-test-internal-different-cr-name" is invalid: spec.name: Invalid value: "topic-test-internal": kafkaTopic CR 'topic-test-internal' in namesapce 'kafka' is already referencing to Kafka topic 'topic-test-internal' + // error while running command: exit status 1; The KafkaTopic "topic-test-internal-different-cr-name" is invalid: spec.name: Invalid value: "topic-test-internal": kafkaTopic CR 'topic-test-internal' in namespace 'kafka' is already referencing to Kafka topic 'topic-test-internal' Expect(len(strings.Split(err.Error(), "\n"))).To(Equal(1)) Expect(err.Error()).To( - ContainSubstring("The KafkaTopic %[1]q is invalid: spec.name: Invalid value: %[2]q: kafkaTopic CR '%[2]s' in namesapce '%[3]s' is already referencing to Kafka topic '%[2]s'", + ContainSubstring("The KafkaTopic %[1]q is invalid: spec.name: Invalid value: %[2]q: kafkaTopic CR '%[2]s' in namespace '%[3]s' is already referencing to Kafka topic '%[2]s'", caseData.Name, overlappingTopicName, caseData.Namespace), ) @@ -348,14 +349,14 @@ func testWebhookUpdateKafkaTopic(kubectlOptions k8s.KubectlOptions, kafkaCluster Expect(err).To(HaveOccurred()) // Example error: // error while running command: exit status 1; The KafkaTopic "topic-test-internal-different-cr-name" is invalid: - // * spec.name: Invalid value: "topic-test-internal": kafkaTopic CR 'topic-test-internal' in namesapce 'kafka' is already referencing to Kafka topic 'topic-test-internal' + // * spec.name: Invalid value: "topic-test-internal": kafkaTopic CR 'topic-test-internal' in namespace 'kafka' is already referencing to Kafka topic 'topic-test-internal' // * spec.name: Invalid value: "topic-test-internal": topic "topic-test-internal" already exists on kafka cluster and it is not managed by Koperator, // if you want it to be managed by Koperator so you can modify its configurations through a KafkaTopic CR, // add this "managedBy: koperator" annotation to this KafkaTopic CR Expect(len(strings.Split(err.Error(), "\n"))).To(Equal(5)) Expect(err.Error()).To(And( ContainSubstring("The KafkaTopic %q is invalid:", caseData.Name), - ContainSubstring("spec.name: Invalid value: %[1]q: kafkaTopic CR '%[1]s' in namesapce '%[2]s' is already referencing to Kafka topic '%[1]s'\n", + ContainSubstring("spec.name: Invalid value: %[1]q: kafkaTopic CR '%[1]s' in namespace '%[2]s' is already referencing to Kafka topic '%[1]s'\n", overlappingTopicName, caseData.Namespace), ContainSubstring("spec.name: Invalid value: %[1]q: topic %[1]q already exists on kafka cluster and it is not managed by Koperator", overlappingTopicName), diff --git a/tests/e2e/koperator_suite_test.go b/tests/e2e/koperator_suite_test.go index 72051b048..6d406dd54 100644 --- a/tests/e2e/koperator_suite_test.go +++ b/tests/e2e/koperator_suite_test.go @@ -24,7 +24,6 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" ) func TestKoperator(t *testing.T) { @@ -60,7 +59,7 @@ var _ = When("Testing e2e test altogether", Ordered, func() { testInstall() testInstallZookeeperCluster() testInstallKafkaCluster("../../config/samples/simplekafkacluster.yaml") - testWebhookKafkaTopic(types.NamespacedName{Name: kafkaClusterName, Namespace: koperatorLocalHelmDescriptor.Namespace}) + testWebhookKafkaTopic() testProduceConsumeInternal() testUninstallKafkaCluster() testInstallKafkaCluster("../../config/samples/simplekafkacluster_ssl.yaml")