diff --git a/pkg/webhooks/kafkatopic_validator.go b/pkg/webhooks/kafkatopic_validator.go index c0c509326..a46e4f742 100644 --- a/pkg/webhooks/kafkatopic_validator.go +++ b/pkg/webhooks/kafkatopic_validator.go @@ -253,7 +253,7 @@ func (s *KafkaTopicValidator) checkExistingKafkaTopicCRs(ctx context.Context, } } if foundKafkaTopic != nil { - logMsg := fmt.Sprintf("kafkaTopic CR '%s' in namesapce '%s' is already referencing to Kafka topic '%s'", + logMsg := fmt.Sprintf("kafkaTopic CR '%s' in namespace '%s' is already referencing to Kafka topic '%s'", foundKafkaTopic.Name, foundKafkaTopic.Namespace, foundKafkaTopic.Spec.Name) return field.Invalid(field.NewPath("spec").Child("name"), foundKafkaTopic.Spec.Name, logMsg), nil } diff --git a/tests/e2e/go.mod b/tests/e2e/go.mod index 29b3191f0..802aa0152 100644 --- a/tests/e2e/go.mod +++ b/tests/e2e/go.mod @@ -14,6 +14,7 @@ require ( github.com/twmb/franz-go v1.13.5 k8s.io/apiextensions-apiserver v0.26.4 k8s.io/apimachinery v0.26.4 + k8s.io/utils v0.0.0-20230726121419-3b25d923346b sigs.k8s.io/yaml v1.3.0 ) @@ -35,14 +36,42 @@ require ( github.com/eapache/go-resiliency v1.3.0 // indirect github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 // indirect github.com/eapache/queue v1.1.0 // indirect - github.com/emicklei/go-restful/v3 v3.9.0 // indirect - github.com/evanphx/json-patch v5.6.0+incompatible // indirect github.com/evanphx/json-patch/v5 v5.6.0 // indirect github.com/fatih/color v1.13.0 // indirect + github.com/go-logr/zapr v1.2.3 // indirect + github.com/golang/snappy v0.0.4 // indirect + github.com/hashicorp/go-uuid v1.0.3 // indirect + github.com/huandu/xstrings v1.3.3 // indirect + github.com/iancoleman/orderedmap v0.2.0 // indirect + github.com/jcmturner/aescts/v2 v2.0.0 // indirect + github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect + github.com/jcmturner/gofork v1.7.6 // indirect + github.com/jcmturner/gokrb5/v8 v8.4.3 // indirect + github.com/jcmturner/rpc/v2 v2.0.3 // indirect + github.com/klauspost/compress v1.16.3 // indirect + github.com/mattn/go-colorable v0.1.13 // indirect + github.com/mattn/go-isatty v0.0.16 // indirect + github.com/mitchellh/copystructure v1.2.0 // indirect + github.com/onsi/ginkgo v1.16.5 // indirect + github.com/pavlo-v-chernykh/keystore-go/v4 v4.4.1 // indirect + github.com/pierrec/lz4/v4 v4.1.17 // indirect + github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect + github.com/spf13/cast v1.4.1 // indirect + github.com/tidwall/gjson v1.9.3 // indirect + github.com/tidwall/match v1.1.1 // indirect + github.com/tidwall/pretty v1.2.0 // indirect + github.com/twmb/franz-go/pkg/kmsg v1.4.0 // indirect + github.com/wayneashleyberry/terminal-dimensions v1.0.0 // indirect + go.uber.org/zap v1.24.0 // indirect + sigs.k8s.io/controller-runtime v0.14.6 // indirect +) + +require ( + github.com/emicklei/go-restful/v3 v3.9.0 // indirect + github.com/evanphx/json-patch v5.6.0+incompatible // indirect github.com/ghodss/yaml v1.0.1-0.20190212211648-25d852aebe32 // indirect github.com/go-errors/errors v1.0.2-0.20180813162953-d98b870cc4e0 // indirect github.com/go-logr/logr v1.2.4 // indirect - github.com/go-logr/zapr v1.2.3 // indirect github.com/go-openapi/jsonpointer v0.19.5 // indirect github.com/go-openapi/jsonreference v0.20.0 // indirect github.com/go-openapi/swag v0.19.14 // indirect @@ -50,7 +79,6 @@ require ( github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.3 // indirect - github.com/golang/snappy v0.0.4 // indirect github.com/google/gnostic v0.6.9 // indirect github.com/google/go-cmp v0.5.9 // indirect github.com/google/gofuzz v1.2.0 // indirect @@ -59,50 +87,27 @@ require ( github.com/gruntwork-io/go-commons v0.8.0 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect - github.com/hashicorp/go-uuid v1.0.3 // indirect - github.com/huandu/xstrings v1.3.3 // indirect - github.com/iancoleman/orderedmap v0.2.0 // indirect github.com/imdario/mergo v0.3.13 // indirect - github.com/jcmturner/aescts/v2 v2.0.0 // indirect - github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect - github.com/jcmturner/gofork v1.7.6 // indirect - github.com/jcmturner/gokrb5/v8 v8.4.3 // indirect - github.com/jcmturner/rpc/v2 v2.0.3 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect - github.com/klauspost/compress v1.16.3 // indirect github.com/mailru/easyjson v0.7.6 // indirect - github.com/mattn/go-colorable v0.1.13 // indirect - github.com/mattn/go-isatty v0.0.16 // indirect github.com/mattn/go-zglob v0.0.2-0.20190814121620-e3c945676326 // indirect - github.com/mitchellh/copystructure v1.2.0 // indirect github.com/mitchellh/go-homedir v1.1.0 // indirect github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/moby/spdystream v0.2.0 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect - github.com/onsi/ginkgo v1.16.5 // indirect - github.com/pavlo-v-chernykh/keystore-go/v4 v4.4.1 // indirect - github.com/pierrec/lz4/v4 v4.1.17 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/pquerna/otp v1.2.0 // indirect - github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect - github.com/spf13/cast v1.4.1 // indirect github.com/spf13/pflag v1.0.5 // indirect github.com/stretchr/testify v1.8.1 // indirect - github.com/tidwall/gjson v1.9.3 // indirect - github.com/tidwall/match v1.1.1 // indirect - github.com/tidwall/pretty v1.2.0 // indirect - github.com/twmb/franz-go/pkg/kmsg v1.4.0 // indirect github.com/urfave/cli v1.22.2 // indirect - github.com/wayneashleyberry/terminal-dimensions v1.0.0 // indirect go.uber.org/atomic v1.9.0 // indirect go.uber.org/multierr v1.6.0 // indirect - go.uber.org/zap v1.24.0 // indirect golang.org/x/crypto v0.7.0 // indirect golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1 // indirect golang.org/x/net v0.10.0 // indirect @@ -121,8 +126,6 @@ require ( k8s.io/client-go v0.26.4 // indirect k8s.io/klog/v2 v2.80.1 // indirect k8s.io/kube-openapi v0.0.0-20221207184640-f3cff1453715 // indirect - k8s.io/utils v0.0.0-20221128185143-99ec85e7a448 // indirect - sigs.k8s.io/controller-runtime v0.14.6 // indirect sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2 // indirect sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect ) diff --git a/tests/e2e/go.sum b/tests/e2e/go.sum index c69c724d9..98c2ace1f 100644 --- a/tests/e2e/go.sum +++ b/tests/e2e/go.sum @@ -845,8 +845,8 @@ k8s.io/klog/v2 v2.80.1 h1:atnLQ121W371wYYFawwYx1aEY2eUfs4l3J72wtgAwV4= k8s.io/klog/v2 v2.80.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0= k8s.io/kube-openapi v0.0.0-20221207184640-f3cff1453715 h1:tBEbstoM+K0FiBV5KGAKQ0kuvf54v/hwpldiJt69w1s= k8s.io/kube-openapi v0.0.0-20221207184640-f3cff1453715/go.mod h1:+Axhij7bCpeqhklhUTe3xmOn6bWxolyZEeyaFpjGtl4= -k8s.io/utils v0.0.0-20221128185143-99ec85e7a448 h1:KTgPnR10d5zhztWptI952TNtt/4u5h3IzDXkdIMuo2Y= -k8s.io/utils v0.0.0-20221128185143-99ec85e7a448/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= +k8s.io/utils v0.0.0-20230726121419-3b25d923346b h1:sgn3ZU783SCgtaSJjpcVVlRqd6GSnlTLKgpAAttJvpI= +k8s.io/utils v0.0.0-20230726121419-3b25d923346b/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0= rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA= diff --git a/tests/e2e/install_cluster.go b/tests/e2e/install_cluster.go index 110c1ba5c..a79e857fe 100644 --- a/tests/e2e/install_cluster.go +++ b/tests/e2e/install_cluster.go @@ -37,7 +37,8 @@ func requireCreatingKafkaCluster(kubectlOptions k8s.KubectlOptions, manifestPath By(fmt.Sprintf("KafkaCluster %s already exists\n", kafkaClusterName)) } else { By("Deploying a KafkaCluster") - applyK8sResourceManifest(kubectlOptions, manifestPath) + err := applyK8sResourceManifest(kubectlOptions, manifestPath) + Expect(err).NotTo(HaveOccurred()) } By("Verifying the KafkaCluster state") diff --git a/tests/e2e/k8s.go b/tests/e2e/k8s.go index a50669473..ccd7e8209 100644 --- a/tests/e2e/k8s.go +++ b/tests/e2e/k8s.go @@ -38,17 +38,58 @@ import ( const ( // allowedCRDByteCount is the limitation of the number of bytes a CRD is // allowed to have when being applied by K8s API server/kubectl. - allowedCRDByteCount = 262144 //nolint:unused // Note: this const is currently only used in helper functions which are not yet called on so this linter transitively fails for this const + allowedCRDByteCount = 262144 //nolint:unused // Note: this const is currently only used in helper functions which are not yet called on so this linter transitively fails for this const. // crdNamePrefix is the prefix of the CRD names when listed through kubectl. - crdNamePrefix = "customresourcedefinition.apiextensions.k8s.io/" //nolint:unused // Note: this const is currently only used in helper functions which are not yet called on so this linter transitively fails for this const + crdNamePrefix = "customresourcedefinition.apiextensions.k8s.io/" //nolint:unused // Note: this const is currently only used in helper functions which are not yet called on so this linter transitively fails for this const. + + // Per the kubectl spec, "dry-run" strategy must be "none", "server", or "client". + // With current use cases in e2e tests, we only expect to use "server" so the other options are commented out. + dryRunStrategyArgServer string = "--dry-run=server" // submit server-side request without persisting the resource. + //dryRunStrategyArgClient string = "--dry-run=client" // the resource is only validated locally but not sent to the Api-server. + //dryRunStrategyArgNone string = "--dry-run=none" // default value; submit server-side request and persist the resource. + ) // applyK8sResourceManifests applies the specified manifest to the provided // kubectl context and namespace. -func applyK8sResourceManifest(kubectlOptions k8s.KubectlOptions, manifestPath string) { //nolint:unused // Note: this might come in handy for manual K8s resource operations. - By(fmt.Sprintf("Applying k8s manifest %s", manifestPath)) - k8s.KubectlApply(GinkgoT(), &kubectlOptions, manifestPath) +func applyK8sResourceManifest(kubectlOptions k8s.KubectlOptions, manifestPath string, extraArgs ...string) error { //nolint:unused // Note: this might come in handy for manual K8s resource operations. + args := []string{"apply", "-f", manifestPath} + logMsg := fmt.Sprintf("Applying k8s manifest from path %s", manifestPath) + logMsg, args = kubectlArgExtender(args, logMsg, "", "", kubectlOptions.Namespace, extraArgs) + By(logMsg) + + return k8s.RunKubectlE(GinkgoT(), &kubectlOptions, args...) +} + +// applyK8sResourceManifestFromString applies the specified manifest in string format to the provided +// kubectl context and namespace. +func applyK8sResourceManifestFromString(kubectlOptions k8s.KubectlOptions, manifest string, extraArgs ...string) error { + tmpfile, err := createTempFileFromBytes([]byte(manifest), "", "", 0) + if err != nil { + return fmt.Errorf("storing provided manifest data into temp file failed: %w", err) + } + defer os.Remove(tmpfile) + + return applyK8sResourceManifest(kubectlOptions, tmpfile, extraArgs...) +} + +// applyK8sResourceFromTemplate generates manifest from the specified go-template based on values +// and applies the specified manifest to the provided kubectl context and namespace. +func applyK8sResourceFromTemplate(kubectlOptions k8s.KubectlOptions, templateFile string, values any, extraArgs ...string) error { + By(fmt.Sprintf("Generating k8s manifest from template %s", templateFile)) + var manifest bytes.Buffer + rawTemplate, err := os.ReadFile(templateFile) + if err != nil { + return err + } + t := template.Must(template.New("template").Funcs(sprig.TxtFuncMap()).Parse(string(rawTemplate))) + err = t.Execute(&manifest, values) + if err != nil { + return err + } + + return applyK8sResourceManifestFromString(kubectlOptions, manifest.String(), extraArgs...) } // isExistingK8SResource queries a Resource by it's kind, namespace and name and @@ -191,7 +232,10 @@ func installK8sCRD(kubectlOptions k8s.KubectlOptions, crd []byte, shouldBeValida createOrReplaceK8sResourcesFromManifest(kubectlOptions, "crd", object.GetName(), tempPath, shouldBeValidated) default: // Note: regular CRD. - applyK8sResourceManifest(kubectlOptions, tempPath) + err = applyK8sResourceManifest(kubectlOptions, tempPath) + if err != nil { + return errors.WrapIfWithDetails(err, "applying CRD failed", "crd", string(crd)) + } } return nil @@ -453,30 +497,6 @@ func deleteK8sResourceNoErrNotFound(kubectlOptions k8s.KubectlOptions, timeout t return err } -// applyK8sResourceManifestFromString applies the specified manifest in string format to the provided -// kubectl context and namespace. -func applyK8sResourceManifestFromString(kubectlOptions k8s.KubectlOptions, manifest string) error { - By(fmt.Sprintf("Applying k8s manifest\n%s", manifest)) - return k8s.KubectlApplyFromStringE(GinkgoT(), &kubectlOptions, manifest) -} - -// applyK8sResourceFromTemplate generates manifest from the specified go-template based on values -// and applies the specified manifest to the provided kubectl context and namespace. -func applyK8sResourceFromTemplate(kubectlOptions k8s.KubectlOptions, templateFile string, values map[string]interface{}) error { - By(fmt.Sprintf("Generating K8s manifest from template %s", templateFile)) - var manifest bytes.Buffer - rawTemplate, err := os.ReadFile(templateFile) - if err != nil { - return err - } - t := template.Must(template.New("template").Funcs(sprig.TxtFuncMap()).Parse(string(rawTemplate))) - err = t.Execute(&manifest, values) - if err != nil { - return err - } - return applyK8sResourceManifestFromString(kubectlOptions, manifest.String()) -} - // listK8sResourceKinds lists all of the available resource kinds on the K8s cluster // with the apiGroupSelector parameter the result can be narrowed by the resource group. // extraArgs can be any kubectl api-resources parameter. diff --git a/tests/e2e/kafka.go b/tests/e2e/kafka.go index 54b7cfd9b..1561a45fa 100644 --- a/tests/e2e/kafka.go +++ b/tests/e2e/kafka.go @@ -21,6 +21,7 @@ import ( "github.com/gruntwork-io/terratest/modules/k8s" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + "k8s.io/utils/ptr" ) // requireDeleteKafkaTopic deletes kafkaTopic resource. @@ -34,14 +35,21 @@ func requireDeleteKafkaTopic(kubectlOptions k8s.KubectlOptions, topicName string // requireDeployingKafkaTopic deploys a kafkaTopic resource from a template func requireDeployingKafkaTopic(kubectlOptions k8s.KubectlOptions, topicName string) { It("Deploying KafkaTopic CR", func() { - err := applyK8sResourceFromTemplate(kubectlOptions, - kafkaTopicTemplate, - map[string]interface{}{ - "Name": topicName, - "TopicName": topicName, - "Namespace": kubectlOptions.Namespace, + values := kafkaTopicTemplateData{ + Annotations: []string{"managedBy: koperator"}, + ClusterRef: kafkaTopicClusterRef{ + Name: kafkaClusterName, + Namespace: kubectlOptions.Namespace, }, - ) + Name: topicName, + Namespace: kubectlOptions.Namespace, + Partitions: ptr.To(int32(2)), + ReplicationFactor: ptr.To(int32(2)), + TopicName: topicName, + } + + err := applyK8sResourceFromTemplate(kubectlOptions, kafkaTopicTemplate, values) + Expect(err).ShouldNot(HaveOccurred()) err = waitK8sResourceCondition(kubectlOptions, kafkaTopicKind, @@ -82,3 +90,23 @@ func requireDeployingKafkaUser(kubectlOptions k8s.KubectlOptions, userName strin }, defaultUserCreationWaitTime, 3*time.Second).Should(Equal(true)) }) } + +// kafkaTopicTemplateData is a struct that holds the relevant information and structure +// to fill out the template used to generate KafkaTopics. +// TODO: long term we should use the structs in the api module instead of these local structs. +type kafkaTopicTemplateData struct { + Annotations []string + ClusterRef kafkaTopicClusterRef + Name string + Namespace string + Partitions *int32 + ReplicationFactor *int32 + TopicName string +} + +// kafkaTopicClusterRef holds the information relevant to identifying a KafkaCluster within a KafkaTopic CR. +// TODO: Long term, we should use the structs in the api module instead of these local structs. +type kafkaTopicClusterRef struct { + Name string + Namespace string +} diff --git a/tests/e2e/kafkatopic_webhook.go b/tests/e2e/kafkatopic_webhook.go new file mode 100644 index 000000000..859faf02a --- /dev/null +++ b/tests/e2e/kafkatopic_webhook.go @@ -0,0 +1,369 @@ +// Copyright © 2023 Cisco Systems, Inc. and/or its affiliates +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package e2e + +import ( + "strings" + + "github.com/gruntwork-io/terratest/modules/k8s" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "k8s.io/apimachinery/pkg/types" + "k8s.io/utils/ptr" +) + +func testWebhookKafkaTopic() { + // temporary section; to be refactored after kubeconfig injection PR + var kubectlOptions k8s.KubectlOptions + var err error + kubectlOptions, err = kubectlOptionsForCurrentContext() + if err != nil { + GinkgoT().Fail() + } + + kubectlOptions.Namespace = koperatorLocalHelmDescriptor.Namespace + kafkaCluster := types.NamespacedName{Name: kafkaClusterName, Namespace: koperatorLocalHelmDescriptor.Namespace} + + testWebhookCreateKafkaTopic(kubectlOptions, kafkaCluster) + testWebhookUpdateKafkaTopic(kubectlOptions, kafkaCluster) +} + +func testWebhookCreateKafkaTopic(kubectlOptions k8s.KubectlOptions, kafkaCluster types.NamespacedName) bool { + return When("Testing KafkaTopic Create", func() { + BeforeAll(func() { + Expect(isExistingK8SResource(kubectlOptions, kafkaKind, kafkaCluster.Name)).To(BeTrue()) + }) + + const nonExistent string = "non-existent" + + It("Test non-existent KafkaCluster", func() { + caseData := kafkaTopicTemplateData{ + Annotations: []string{"managedBy: koperator"}, + ClusterRef: kafkaTopicClusterRef{ + Name: nonExistent, // Note: This is a deliberately inserted error for this test case. + Namespace: kafkaCluster.Namespace, + }, + Name: testInternalTopicName, + Namespace: kubectlOptions.Namespace, + Partitions: ptr.To(int32(2)), + ReplicationFactor: ptr.To(int32(2)), + TopicName: testInternalTopicName, + } + err := applyK8sResourceFromTemplate( + kubectlOptions, + kafkaTopicTemplate, + caseData, + dryRunStrategyArgServer, + ) + Expect(err).To(HaveOccurred()) + // Example error: + // error while running command: exit status 1; The KafkaTopic "topic-test-internal" is invalid: spec.clusterRef.name: Invalid value: "non-existent": kafkaCluster 'non-existent' in the namespace 'kafka' does not exist + Expect(len(strings.Split(err.Error(), "\n"))).To(Equal(1)) + Expect(err.Error()).To( + ContainSubstring("The KafkaTopic %[1]q is invalid: spec.clusterRef.name: Invalid value: %[2]q: kafkaCluster '%[2]s' in the namespace '%[3]s' does not exist", + caseData.Name, caseData.ClusterRef.Name, caseData.ClusterRef.Namespace), + ) + }) + + It("Test 0 partitions and replicationFactor", func() { + caseData := kafkaTopicTemplateData{ + Annotations: []string{"managedBy: koperator"}, + ClusterRef: kafkaTopicClusterRef{ + Name: kafkaCluster.Name, + Namespace: kafkaCluster.Namespace, + }, + Name: testInternalTopicName, + Namespace: kubectlOptions.Namespace, + Partitions: ptr.To(int32(0)), // Note: This is a deliberately inserted error for this test case. + ReplicationFactor: ptr.To(int32(0)), // Note: This is a deliberately inserted error for this test case. + TopicName: testInternalTopicName, + } + err := applyK8sResourceFromTemplate( + kubectlOptions, + kafkaTopicTemplate, + caseData, + dryRunStrategyArgServer, + ) + Expect(err).To(HaveOccurred()) + // Example error: + // error while running command: exit status 1; The KafkaTopic "topic-test-internal" is invalid: + // * spec.partitions: Invalid value: 0: number of partitions must be larger than 0 (or set it to be -1 to use the broker's default) + // * spec.replicationFactor: Invalid value: 0: replication factor must be larger than 0 (or set it to be -1 to use the broker's default) + Expect(len(strings.Split(err.Error(), "\n"))).To(Equal(3)) + Expect(err.Error()).To(And( + ContainSubstring("The KafkaTopic %q is invalid:", caseData.Name), + ContainSubstring("spec.partitions: Invalid value: %d: number of partitions must be larger than 0 (or set it to be -1 to use the broker's default)", ptr.Deref(caseData.Partitions, -100)), + ContainSubstring("spec.replicationFactor: Invalid value: %d: replication factor must be larger than 0 (or set it to be -1 to use the broker's default)", ptr.Deref(caseData.ReplicationFactor, -100)), + )) + }) + + // In the current validation webhook implementation, this case can only be encountered on a Create operation + It("Test ReplicationFactor larger than number of brokers", func() { + caseData := kafkaTopicTemplateData{ + Annotations: []string{"managedBy: koperator"}, + ClusterRef: kafkaTopicClusterRef{ + Name: kafkaCluster.Name, + Namespace: kafkaCluster.Namespace, + }, + Name: testInternalTopicName, + Namespace: kubectlOptions.Namespace, + Partitions: ptr.To(int32(2)), + ReplicationFactor: ptr.To(int32(10)), // Note: This is a deliberately inserted error for this test case. + TopicName: testInternalTopicName, + } + err := applyK8sResourceFromTemplate( + kubectlOptions, + kafkaTopicTemplate, + caseData, + dryRunStrategyArgServer, + ) + Expect(err).To(HaveOccurred()) + // Example error: + // error while running command: exit status 1; The KafkaTopic "topic-test-internal" is invalid: spec.replicationFactor: Invalid value: 10: replication factor is larger than the number of nodes in the kafka cluster (available brokers: 3) + Expect(len(strings.Split(err.Error(), "\n"))).To(Equal(1)) + Expect(err.Error()).To( + ContainSubstring("The KafkaTopic %[1]q is invalid: spec.replicationFactor: Invalid value: %[2]d: replication factor is larger than the number of nodes in the kafka cluster", + caseData.Name, ptr.Deref(caseData.ReplicationFactor, -100)), + ) + }) + // Test case involving existing CRs but not necessarily an Update operation + When("Testing conflicts similar CRs", Ordered, func() { + var overlappingTopicName string = testInternalTopicName + requireDeployingKafkaTopic(kubectlOptions, overlappingTopicName) + + It("Testing conflict on spec.name", func() { + var caseData kafkaTopicTemplateData + + By("With managedBy koperator annotation") + caseData = kafkaTopicTemplateData{ + Annotations: []string{"managedBy: koperator"}, // Note: This information is relevant to this particular test case. + ClusterRef: kafkaTopicClusterRef{ + Name: kafkaCluster.Name, + Namespace: kafkaCluster.Namespace, + }, + Name: overlappingTopicName + "-different-cr-name", // Note: This information is relevant to this particular test case. + Namespace: kubectlOptions.Namespace, + Partitions: ptr.To(int32(2)), + ReplicationFactor: ptr.To(int32(2)), + TopicName: overlappingTopicName, // Note: This is a deliberately inserted error for this test case. + } + err := applyK8sResourceFromTemplate( + kubectlOptions, + kafkaTopicTemplate, + caseData, + dryRunStrategyArgServer, + ) + Expect(err).To(HaveOccurred()) + // Example error: + // error while running command: exit status 1; The KafkaTopic "topic-test-internal-different-cr-name" is invalid: spec.name: Invalid value: "topic-test-internal": kafkaTopic CR 'topic-test-internal' in namespace 'kafka' is already referencing to Kafka topic 'topic-test-internal' + Expect(len(strings.Split(err.Error(), "\n"))).To(Equal(1)) + Expect(err.Error()).To( + ContainSubstring("The KafkaTopic %[1]q is invalid: spec.name: Invalid value: %[2]q: kafkaTopic CR '%[2]s' in namespace '%[3]s' is already referencing to Kafka topic '%[2]s'", + caseData.Name, overlappingTopicName, caseData.Namespace), + ) + + By("Without managedBy koperator annotation") + caseData = kafkaTopicTemplateData{ + Annotations: nil, // Note: This is a deliberately inserted error for this test case. + ClusterRef: kafkaTopicClusterRef{ + Name: kafkaCluster.Name, + Namespace: kafkaCluster.Namespace, + }, + Name: overlappingTopicName + "-different-cr-name", // Note: This information is relevant to this particular test case. + Namespace: kubectlOptions.Namespace, + Partitions: ptr.To(int32(2)), + ReplicationFactor: ptr.To(int32(2)), + TopicName: overlappingTopicName, // Note: This is a deliberately inserted error for this test case. + } + err = applyK8sResourceFromTemplate( + kubectlOptions, + kafkaTopicTemplate, + caseData, + dryRunStrategyArgServer, + ) + Expect(err).To(HaveOccurred()) + // Example error: + // error while running command: exit status 1; The KafkaTopic "topic-test-internal-different-cr-name" is invalid: + // * spec.name: Invalid value: "topic-test-internal": kafkaTopic CR 'topic-test-internal' in namespace 'kafka' is already referencing to Kafka topic 'topic-test-internal' + // * spec.name: Invalid value: "topic-test-internal": topic "topic-test-internal" already exists on kafka cluster and it is not managed by Koperator, + // if you want it to be managed by Koperator so you can modify its configurations through a KafkaTopic CR, + // add this "managedBy: koperator" annotation to this KafkaTopic CR + Expect(len(strings.Split(err.Error(), "\n"))).To(Equal(5)) + Expect(err.Error()).To(And( + ContainSubstring("The KafkaTopic %q is invalid:", caseData.Name), + ContainSubstring("spec.name: Invalid value: %[1]q: kafkaTopic CR '%[1]s' in namespace '%[2]s' is already referencing to Kafka topic '%[1]s'", + overlappingTopicName, caseData.Namespace), + ContainSubstring("spec.name: Invalid value: %[1]q: topic %[1]q already exists on kafka cluster and it is not managed by Koperator", + overlappingTopicName), + )) + }) + requireDeleteKafkaTopic(kubectlOptions, overlappingTopicName) + }) + }) +} + +func testWebhookUpdateKafkaTopic(kubectlOptions k8s.KubectlOptions, kafkaCluster types.NamespacedName) bool { + return When("Testing KafkaTopic Update", func() { + BeforeAll(func() { + Expect(isExistingK8SResource(kubectlOptions, kafkaKind, kafkaCluster.Name)).To(BeTrue()) + }) + + const nonExistent string = "non-existent" + + // Update operation implies having a CR with the same name in place + requireDeployingKafkaTopic(kubectlOptions, testInternalTopicName) + + It("Test non-existent KafkaCluster", func() { + caseData := kafkaTopicTemplateData{ + Annotations: []string{"managedBy: koperator"}, + ClusterRef: kafkaTopicClusterRef{ + Name: nonExistent, // Note: This is a deliberately inserted error for this test case. + Namespace: kafkaCluster.Namespace, + }, + Name: testInternalTopicName, + Namespace: kubectlOptions.Namespace, + Partitions: ptr.To(int32(2)), + ReplicationFactor: ptr.To(int32(2)), + TopicName: testInternalTopicName, + } + err := applyK8sResourceFromTemplate( + kubectlOptions, + kafkaTopicTemplate, + caseData, + dryRunStrategyArgServer, + ) + Expect(err).To(HaveOccurred()) + // Example error: + // error while running command: exit status 1; The KafkaTopic "topic-test-internal" is invalid: spec.clusterRef.name: Invalid value: "non-existent": kafkaCluster 'non-existent' in the namespace 'kafka' does not exist + Expect(len(strings.Split(err.Error(), "\n"))).To(Equal(1)) + Expect(err.Error()).To( + ContainSubstring("The KafkaTopic %[1]q is invalid: spec.clusterRef.name: Invalid value: %[2]q: kafkaCluster '%[2]s' in the namespace '%[3]s' does not exist", + caseData.Name, caseData.ClusterRef.Name, caseData.Namespace), + ) + }) + + // A successfully created KafkaTopic CR cannot have 0 for either Partition or ReplicationFactor. + // At the same time, during an Update, a KafkaTopic cannot have its: + // * spec.partitions decreased + // * spec.replicationFactor changed (not just decreased) + // Consequently, an Update test for 0 values will automatically also cover the decreasing/changing scenarios. + It("Test 0 values partitions and replicationFactor", func() { + caseData := kafkaTopicTemplateData{ + Annotations: []string{"managedBy: koperator"}, + ClusterRef: kafkaTopicClusterRef{ + Name: kafkaCluster.Name, + Namespace: kafkaCluster.Namespace, + }, + Name: testInternalTopicName, + Namespace: kubectlOptions.Namespace, + Partitions: ptr.To(int32(0)), // Note: This is a deliberately inserted error for this test case. + ReplicationFactor: ptr.To(int32(0)), // Note: This is a deliberately inserted error for this test case. + TopicName: testInternalTopicName, + } + err := applyK8sResourceFromTemplate( + kubectlOptions, + kafkaTopicTemplate, + caseData, + dryRunStrategyArgServer, + ) + Expect(err).To(HaveOccurred()) + // Example error: + // error while running command: exit status 1; The KafkaTopic "topic-test-internal" is invalid: + // * spec.partitions: Invalid value: 0: number of partitions must be larger than 0 (or set it to be -1 to use the broker's default) + // * spec.replicationFactor: Invalid value: 0: replication factor must be larger than 0 (or set it to be -1 to use the broker's default) + // * spec.partitions: Invalid value: 0: kafka does not support decreasing partition count on an existing topic (from 2 to 0) + // * spec.replicationFactor: Invalid value: 0: kafka does not support changing the replication factor on an existing topic (from 2 to 0) + Expect(len(strings.Split(err.Error(), "\n"))).To(Equal(5)) + Expect(err.Error()).To(And( + ContainSubstring("The KafkaTopic %q is invalid:", caseData.Name), + ContainSubstring("spec.partitions: Invalid value: 0: number of partitions must be larger than 0"), + ContainSubstring("spec.replicationFactor: Invalid value: 0: replication factor must be larger than 0"), + ContainSubstring("spec.partitions: Invalid value: %d: kafka does not support decreasing partition count on an existing topic", ptr.Deref(caseData.Partitions, -100)), + ContainSubstring("spec.replicationFactor: Invalid value: %d: kafka does not support changing the replication factor on an existing topic", ptr.Deref(caseData.ReplicationFactor, -100)), + )) + }) + + It("Testing conflict on spec.name", func() { + var overlappingTopicName string = testInternalTopicName + var caseData kafkaTopicTemplateData + + By("With managedBy koperator annotation") + caseData = kafkaTopicTemplateData{ + Annotations: []string{"managedBy: koperator"}, // Note: This information is relevant to this particular test case. + ClusterRef: kafkaTopicClusterRef{ + Name: kafkaCluster.Name, + Namespace: kafkaCluster.Namespace, + }, + Name: overlappingTopicName + "different-cr-name", // Note: This information is relevant to this particular test case. + Namespace: kubectlOptions.Namespace, + Partitions: ptr.To(int32(2)), + ReplicationFactor: ptr.To(int32(2)), + TopicName: overlappingTopicName, // Note: This is a deliberately inserted error for this test case. + } + err := applyK8sResourceFromTemplate( + kubectlOptions, + kafkaTopicTemplate, + caseData, + dryRunStrategyArgServer, + ) + Expect(err).To(HaveOccurred()) + // Example error: + // error while running command: exit status 1; The KafkaTopic "topic-test-internal-different-cr-name" is invalid: spec.name: Invalid value: "topic-test-internal": kafkaTopic CR 'topic-test-internal' in namespace 'kafka' is already referencing to Kafka topic 'topic-test-internal' + Expect(len(strings.Split(err.Error(), "\n"))).To(Equal(1)) + Expect(err.Error()).To( + ContainSubstring("The KafkaTopic %[1]q is invalid: spec.name: Invalid value: %[2]q: kafkaTopic CR '%[2]s' in namespace '%[3]s' is already referencing to Kafka topic '%[2]s'", + caseData.Name, overlappingTopicName, caseData.Namespace), + ) + + By("Without managedBy koperator annotation") + caseData = kafkaTopicTemplateData{ + Annotations: nil, // Note: This is a deliberately inserted error for this test case. + ClusterRef: kafkaTopicClusterRef{ + Name: kafkaCluster.Name, + Namespace: kafkaCluster.Namespace, + }, + Name: overlappingTopicName + "different-cr-name", // Note: This information is relevant to this particular test case. + Namespace: kubectlOptions.Namespace, + Partitions: ptr.To(int32(2)), + ReplicationFactor: ptr.To(int32(2)), + TopicName: overlappingTopicName, // Note: This is a deliberately inserted error for this test case. + } + err = applyK8sResourceFromTemplate( + kubectlOptions, + kafkaTopicTemplate, + caseData, + dryRunStrategyArgServer, + ) + Expect(err).To(HaveOccurred()) + // Example error: + // error while running command: exit status 1; The KafkaTopic "topic-test-internal-different-cr-name" is invalid: + // * spec.name: Invalid value: "topic-test-internal": kafkaTopic CR 'topic-test-internal' in namespace 'kafka' is already referencing to Kafka topic 'topic-test-internal' + // * spec.name: Invalid value: "topic-test-internal": topic "topic-test-internal" already exists on kafka cluster and it is not managed by Koperator, + // if you want it to be managed by Koperator so you can modify its configurations through a KafkaTopic CR, + // add this "managedBy: koperator" annotation to this KafkaTopic CR + Expect(len(strings.Split(err.Error(), "\n"))).To(Equal(5)) + Expect(err.Error()).To(And( + ContainSubstring("The KafkaTopic %q is invalid:", caseData.Name), + ContainSubstring("spec.name: Invalid value: %[1]q: kafkaTopic CR '%[1]s' in namespace '%[2]s' is already referencing to Kafka topic '%[1]s'\n", + overlappingTopicName, caseData.Namespace), + ContainSubstring("spec.name: Invalid value: %[1]q: topic %[1]q already exists on kafka cluster and it is not managed by Koperator", + overlappingTopicName), + )) + }) + + // Clean up the KafkaTopic set up to test Update operations against + requireDeleteKafkaTopic(kubectlOptions, testInternalTopicName) + }) +} diff --git a/tests/e2e/koperator_suite_test.go b/tests/e2e/koperator_suite_test.go index a7ee2f8c3..6d406dd54 100644 --- a/tests/e2e/koperator_suite_test.go +++ b/tests/e2e/koperator_suite_test.go @@ -59,6 +59,7 @@ var _ = When("Testing e2e test altogether", Ordered, func() { testInstall() testInstallZookeeperCluster() testInstallKafkaCluster("../../config/samples/simplekafkacluster.yaml") + testWebhookKafkaTopic() testProduceConsumeInternal() testUninstallKafkaCluster() testInstallKafkaCluster("../../config/samples/simplekafkacluster_ssl.yaml") diff --git a/tests/e2e/templates/topic.yaml.tmpl b/tests/e2e/templates/topic.yaml.tmpl index 991baffe8..50dd2035d 100644 --- a/tests/e2e/templates/topic.yaml.tmpl +++ b/tests/e2e/templates/topic.yaml.tmpl @@ -3,12 +3,18 @@ kind: KafkaTopic metadata: name: {{ .Name }} namespace: {{or .Namespace "kafka"}} + {{- if .Annotations }} + annotations: + {{- range .Annotations }} + {{ . }} + {{- end }} + {{- end }} spec: clusterRef: - name: kafka - namespace: kafka + name: {{or .ClusterRef.Name "kafka"}} + namespace: {{or .ClusterRef.Namespace "kafka"}} name: {{ .TopicName }} - partitions: {{or .Partition 2}} + partitions: {{or .Partitions 2}} replicationFactor: {{or .ReplicationFactor 2}} config: "retention.ms": "604800000"