From 6de14064b78ea8fb2f20115240a4134981cee22f Mon Sep 17 00:00:00 2001 From: Joel Takvorian Date: Thu, 7 Dec 2023 14:59:42 +0100 Subject: [PATCH 1/2] Split tests per controller --- .../flowcollector_controller_console_test.go | 12 +- .../flowcollector_controller_ebpf_test.go | 4 +- .../flowcollector_controller_iso_test.go | 3 +- controllers/flowcollector_controller_test.go | 795 +---------------- controllers/flp/flp_controller_test.go | 827 ++++++++++++++++++ controllers/flp/flp_test.go | 1 - controllers/flp/suite_test.go | 45 + .../monitoring/monitoring_controller_test.go | 169 ++++ controllers/monitoring/suite_test.go | 45 + controllers/suite_test.go | 131 +-- pkg/test/envtest.go | 161 ++++ 11 files changed, 1275 insertions(+), 918 deletions(-) create mode 100644 controllers/flp/flp_controller_test.go create mode 100644 controllers/flp/suite_test.go create mode 100644 controllers/monitoring/monitoring_controller_test.go create mode 100644 controllers/monitoring/suite_test.go create mode 100644 pkg/test/envtest.go diff --git a/controllers/flowcollector_controller_console_test.go b/controllers/flowcollector_controller_console_test.go index 059600135..70a08b396 100644 --- a/controllers/flowcollector_controller_console_test.go +++ b/controllers/flowcollector_controller_console_test.go @@ -188,7 +188,7 @@ func flowCollectorConsolePluginSpecs() { }, timeout, interval).Should(Succeed()) // Do a dummy change that will trigger reconcile, and make sure SM is created again - UpdateCR(crKey, func(fc *flowslatest.FlowCollector) { + updateCR(crKey, func(fc *flowslatest.FlowCollector) { fc.Spec.Processor.LogLevel = "trace" }) By("Expecting ServiceMonitor to exist") @@ -207,14 +207,14 @@ func flowCollectorConsolePluginSpecs() { timeout, interval).Should(ContainSubstring("url: http://loki:3100/")) }) It("Should update the Loki URL in the Console Plugin if it changes in the Spec", func() { - UpdateCR(crKey, func(fc *flowslatest.FlowCollector) { + updateCR(crKey, func(fc *flowslatest.FlowCollector) { fc.Spec.Loki.Monolithic.URL = "http://loki.namespace:8888" }) Eventually(getConfigMapData(configKey), timeout, interval).Should(ContainSubstring("url: http://loki.namespace:8888")) }) It("Should use the Loki Querier URL instead of the Loki URL, when switching to manual mode", func() { - UpdateCR(crKey, func(fc *flowslatest.FlowCollector) { + updateCR(crKey, func(fc *flowslatest.FlowCollector) { fc.Spec.Loki.Mode = flowslatest.LokiModeManual fc.Spec.Loki.Manual.QuerierURL = "http://loki-querier:6789" }) @@ -236,7 +236,7 @@ func flowCollectorConsolePluginSpecs() { It("Should be unregistered", func() { By("Update CR to unregister") - UpdateCR(crKey, func(fc *flowslatest.FlowCollector) { + updateCR(crKey, func(fc *flowslatest.FlowCollector) { fc.Spec.ConsolePlugin.Register = ptr.To(false) }) @@ -263,7 +263,7 @@ func flowCollectorConsolePluginSpecs() { }) It("Should cleanup console plugin if disabled", func() { - UpdateCR(crKey, func(fc *flowslatest.FlowCollector) { + updateCR(crKey, func(fc *flowslatest.FlowCollector) { fc.Spec.ConsolePlugin.Enable = ptr.To(false) }) Eventually(func() error { @@ -284,7 +284,7 @@ func flowCollectorConsolePluginSpecs() { }) It("Should recreate console plugin if enabled back", func() { - UpdateCR(crKey, func(fc *flowslatest.FlowCollector) { + updateCR(crKey, func(fc *flowslatest.FlowCollector) { fc.Spec.ConsolePlugin.Enable = ptr.To(true) }) Eventually(func() error { diff --git a/controllers/flowcollector_controller_ebpf_test.go b/controllers/flowcollector_controller_ebpf_test.go index 1626aa23a..8cbe6a34e 100644 --- a/controllers/flowcollector_controller_ebpf_test.go +++ b/controllers/flowcollector_controller_ebpf_test.go @@ -147,7 +147,7 @@ func flowCollectorEBPFSpecs() { }) It("Should update fields that have changed", func() { - UpdateCR(crKey, func(fc *flowslatest.FlowCollector) { + updateCR(crKey, func(fc *flowslatest.FlowCollector) { Expect(*fc.Spec.Agent.EBPF.Sampling).To(Equal(int32(123))) *fc.Spec.Agent.EBPF.Sampling = 4 fc.Spec.Agent.EBPF.Privileged = true @@ -176,7 +176,7 @@ func flowCollectorEBPFSpecs() { }) It("Should redeploy all when changing namespace", func() { - UpdateCR(crKey, func(fc *flowslatest.FlowCollector) { + updateCR(crKey, func(fc *flowslatest.FlowCollector) { fc.Spec.Namespace = operatorNamespace2 }) diff --git a/controllers/flowcollector_controller_iso_test.go b/controllers/flowcollector_controller_iso_test.go index 491448238..3ae2e3437 100644 --- a/controllers/flowcollector_controller_iso_test.go +++ b/controllers/flowcollector_controller_iso_test.go @@ -12,6 +12,7 @@ import ( "k8s.io/utils/ptr" flowslatest "github.com/netobserv/network-observability-operator/api/v1beta2" + "github.com/netobserv/network-observability-operator/pkg/test" ) // nolint:cyclop @@ -203,7 +204,7 @@ func flowCollectorIsoSpecs() { }) It("Should not have modified input CR values", func() { - cr := GetCR(crKey) + cr := test.GetCR(ctx, k8sClient, crKey) // For easier debugging, we check CR parts one by one Expect(cr.Spec.Processor).Should(Equal(specInput.Processor)) diff --git a/controllers/flowcollector_controller_test.go b/controllers/flowcollector_controller_test.go index a16da8db3..6209b85f8 100644 --- a/controllers/flowcollector_controller_test.go +++ b/controllers/flowcollector_controller_test.go @@ -1,15 +1,12 @@ package controllers import ( - "fmt" "strings" "time" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" - monitoringv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1" appsv1 "k8s.io/api/apps/v1" - ascv2 "k8s.io/api/autoscaling/v2" v1 "k8s.io/api/core/v1" rbacv1 "k8s.io/api/rbac/v1" kerr "k8s.io/apimachinery/pkg/api/errors" @@ -21,13 +18,12 @@ import ( flowslatest "github.com/netobserv/network-observability-operator/api/v1beta2" "github.com/netobserv/network-observability-operator/controllers/constants" . "github.com/netobserv/network-observability-operator/controllers/controllerstest" - "github.com/netobserv/network-observability-operator/controllers/flp" "github.com/netobserv/network-observability-operator/pkg/test" ) const ( - timeout = time.Second * 10 - interval = 50 * time.Millisecond + timeout = test.Timeout + interval = test.Interval conntrackEndTimeout = 10 * time.Second conntrackTerminatingTimeout = 5 * time.Second conntrackHeartbeatInterval = 30 * time.Second @@ -35,8 +31,13 @@ const ( var outputRecordTypes = flowslatest.LogTypeAll +var updateCR = func(key types.NamespacedName, updater func(*flowslatest.FlowCollector)) { + test.UpdateCR(ctx, k8sClient, key, updater) +} + // nolint:cyclop func flowCollectorControllerSpecs() { + const operatorNamespace = "main-namespace" const otherNamespace = "other-namespace" crKey := types.NamespacedName{ @@ -46,22 +47,6 @@ func flowCollectorControllerSpecs() { Name: "ovs-flows-config", Namespace: "openshift-network-operator", } - flpKey1 := types.NamespacedName{ - Name: constants.FLPName, - Namespace: operatorNamespace, - } - flpKey2 := types.NamespacedName{ - Name: constants.FLPName, - Namespace: otherNamespace, - } - flpKeyKafkaIngester := types.NamespacedName{ - Name: constants.FLPName + flp.FlpConfSuffix[flp.ConfKafkaIngester], - Namespace: operatorNamespace, - } - flpKeyKafkaTransformer := types.NamespacedName{ - Name: constants.FLPName + flp.FlpConfSuffix[flp.ConfKafkaTransformer], - Namespace: operatorNamespace, - } cpKey1 := types.NamespacedName{ Name: "netobserv-plugin", Namespace: operatorNamespace, @@ -70,10 +55,6 @@ func flowCollectorControllerSpecs() { Name: "netobserv-plugin", Namespace: otherNamespace, } - rbKeyIngest := types.NamespacedName{Name: flp.RoleBindingName(flp.ConfKafkaIngester)} - rbKeyTransform := types.NamespacedName{Name: flp.RoleBindingName(flp.ConfKafkaTransformer)} - rbKeyIngestMono := types.NamespacedName{Name: flp.RoleBindingMonoName(flp.ConfKafkaIngester)} - rbKeyTransformMono := types.NamespacedName{Name: flp.RoleBindingMonoName(flp.ConfKafkaTransformer)} rbKeyPlugin := types.NamespacedName{Name: constants.PluginName} // Created objects to cleanup @@ -87,9 +68,7 @@ func flowCollectorControllerSpecs() { // Add any teardown steps that needs to be executed after each test }) - Context("Deploying as DaemonSet", func() { - var digest string - ds := appsv1.DaemonSet{} + Context("Without Kafka", func() { It("Should create successfully", func() { created := &flowslatest.FlowCollector{ ObjectMeta: metav1.ObjectMeta{ @@ -98,35 +77,15 @@ func flowCollectorControllerSpecs() { Spec: flowslatest.FlowCollectorSpec{ Namespace: operatorNamespace, DeploymentModel: flowslatest.DeploymentModelDirect, - Processor: flowslatest.FlowCollectorFLP{ - Port: 9999, - ImagePullPolicy: "Never", - LogLevel: "error", - Debug: flowslatest.DebugConfig{ - Env: map[string]string{ - "GOGC": "200", - }, - }, - LogTypes: &outputRecordTypes, - ConversationHeartbeatInterval: &metav1.Duration{ - Duration: conntrackHeartbeatInterval, - }, - ConversationEndTimeout: &metav1.Duration{ - Duration: conntrackEndTimeout, - }, - ConversationTerminatingTimeout: &metav1.Duration{ - Duration: conntrackTerminatingTimeout, - }, - Metrics: flowslatest.FLPMetrics{ - IncludeList: &[]flowslatest.FLPMetric{"node_ingress_bytes_total", "namespace_ingress_bytes_total", "workload_ingress_bytes_total"}, - }, - }, Agent: flowslatest.FlowCollectorAgent{ Type: "IPFIX", IPFIX: flowslatest.FlowCollectorIPFIX{ Sampling: 200, }, }, + Processor: flowslatest.FlowCollectorFLP{ + Port: 9999, + }, ConsolePlugin: flowslatest.FlowCollectorConsolePlugin{ Enable: ptr.To(true), Port: 9001, @@ -144,46 +103,6 @@ func flowCollectorControllerSpecs() { // Create Expect(k8sClient.Create(ctx, created)).Should(Succeed()) - By("Expecting to create the flowlogs-pipeline DaemonSet") - Eventually(func() error { - if err := k8sClient.Get(ctx, flpKey1, &ds); err != nil { - return err - } - digest = ds.Spec.Template.Annotations[constants.PodConfigurationDigest] - if digest == "" { - return fmt.Errorf("%q annotation can't be empty", constants.PodConfigurationDigest) - } - return nil - }, timeout, interval).Should(Succeed()) - - By("Expecting to create the flowlogs-pipeline ServiceAccount") - Eventually(func() interface{} { - svcAcc := v1.ServiceAccount{} - if err := k8sClient.Get(ctx, flpKey1, &svcAcc); err != nil { - return err - } - return svcAcc - }, timeout, interval).Should(Satisfy(func(svcAcc v1.ServiceAccount) bool { - return svcAcc.Labels != nil && svcAcc.Labels["app"] == constants.FLPName - })) - - By("Expecting to create two flowlogs-pipeline role binding") - rb1 := rbacv1.ClusterRoleBinding{} - Eventually(func() interface{} { - return k8sClient.Get(ctx, rbKeyIngestMono, &rb1) - }, timeout, interval).Should(Succeed()) - Expect(rb1.Subjects).Should(HaveLen(1)) - Expect(rb1.Subjects[0].Name).Should(Equal("flowlogs-pipeline")) - Expect(rb1.RoleRef.Name).Should(Equal("flowlogs-pipeline-ingester")) - - rb2 := rbacv1.ClusterRoleBinding{} - Eventually(func() interface{} { - return k8sClient.Get(ctx, rbKeyTransformMono, &rb2) - }, timeout, interval).Should(Succeed()) - Expect(rb2.Subjects).Should(HaveLen(1)) - Expect(rb2.Subjects[0].Name).Should(Equal("flowlogs-pipeline")) - Expect(rb2.RoleRef.Name).Should(Equal("flowlogs-pipeline-transformer")) - By("Expecting to create console plugin role binding") rb3 := rbacv1.ClusterRoleBinding{} Eventually(func() interface{} { @@ -193,16 +112,6 @@ func flowCollectorControllerSpecs() { Expect(rb3.Subjects[0].Name).Should(Equal("netobserv-plugin")) Expect(rb3.RoleRef.Name).Should(Equal("netobserv-plugin")) - By("Not expecting transformer role binding") - Eventually(func() interface{} { - return k8sClient.Get(ctx, rbKeyIngest, &rbacv1.ClusterRoleBinding{}) - }, timeout, interval).Should(MatchError(`clusterrolebindings.rbac.authorization.k8s.io "flowlogs-pipeline-ingester-role" not found`)) - - By("Not expecting ingester role binding") - Eventually(func() interface{} { - return k8sClient.Get(ctx, rbKeyTransform, &rbacv1.ClusterRoleBinding{}) - }, timeout, interval).Should(MatchError(`clusterrolebindings.rbac.authorization.k8s.io "flowlogs-pipeline-transformer-role" not found`)) - By("Creating the ovn-flows-configmap with the configuration from the FlowCollector") Eventually(func() interface{} { ofc := v1.ConfigMap{} @@ -216,62 +125,12 @@ func flowCollectorControllerSpecs() { "cacheMaxFlows": "400", "cacheActiveTimeout": "20s", })) - - By("Expecting flowlogs-pipeline-config configmap to be created") - Eventually(func() interface{} { - cm := v1.ConfigMap{} - return k8sClient.Get(ctx, types.NamespacedName{ - Name: "flowlogs-pipeline-config", - Namespace: operatorNamespace, - }, &cm) - }, timeout, interval).Should(Succeed()) - - By("Expecting the monitoring dashboards configmap to be created") - Eventually(func() interface{} { - cm := v1.ConfigMap{} - return k8sClient.Get(ctx, types.NamespacedName{ - Name: "grafana-dashboard-netobserv-flow-metrics", - Namespace: "openshift-config-managed", - }, &cm) - }, timeout, interval).Should(Succeed()) - - By("Expecting the infra health dashboards configmap to be created") - Eventually(func() interface{} { - cm := v1.ConfigMap{} - return k8sClient.Get(ctx, types.NamespacedName{ - Name: "grafana-dashboard-netobserv-health", - Namespace: "openshift-config-managed", - }, &cm) - }, timeout, interval).Should(Succeed()) }) It("Should update successfully", func() { - UpdateCR(crKey, func(fc *flowslatest.FlowCollector) { + updateCR(crKey, func(fc *flowslatest.FlowCollector) { fc.Spec.Processor = flowslatest.FlowCollectorFLP{ - Port: 7891, - ImagePullPolicy: "Never", - LogLevel: "error", - Debug: flowslatest.DebugConfig{ - Env: map[string]string{ - // we'll test that env vars are sorted, to keep idempotency - "GOMAXPROCS": "33", - "GOGC": "400", - }, - }, - LogTypes: &outputRecordTypes, - ConversationHeartbeatInterval: &metav1.Duration{ - Duration: conntrackHeartbeatInterval, - }, - ConversationEndTimeout: &metav1.Duration{ - Duration: conntrackEndTimeout, - }, - ConversationTerminatingTimeout: &metav1.Duration{ - Duration: conntrackTerminatingTimeout, - }, - Metrics: flowslatest.FLPMetrics{ - IncludeList: &[]flowslatest.FLPMetric{"node_ingress_bytes_total"}, - DisableAlerts: []flowslatest.FLPAlert{flowslatest.AlertLokiError}, - }, + Port: 7891, } fc.Spec.Loki = flowslatest.FlowCollectorLoki{} fc.Spec.Agent.IPFIX = flowslatest.FlowCollectorIPFIX{ @@ -281,16 +140,6 @@ func flowCollectorControllerSpecs() { } }) - By("CR updated", func() { - Eventually(func() error { - err := k8sClient.Get(ctx, flpKey1, &ds) - if err != nil { - return err - } - return checkDigestUpdate(&digest, ds.Spec.Template.Annotations) - }, timeout, interval).Should(Succeed()) - }) - By("Expecting to create the ovn-flows-configmap with the configuration from the FlowCollector", func() { Eventually(func() interface{} { ofc := v1.ConfigMap{} @@ -305,83 +154,6 @@ func flowCollectorControllerSpecs() { "cacheActiveTimeout": "30s", })) }) - - By("Creating the required HostPort to access flowlogs-pipeline through the NodeIP", func() { - var cnt *v1.Container - for i := range ds.Spec.Template.Spec.Containers { - if ds.Spec.Template.Spec.Containers[i].Name == constants.FLPName { - cnt = &ds.Spec.Template.Spec.Containers[i] - break - } - } - Expect(cnt).ToNot(BeNil(), "can't find a container named", constants.FLPName) - var cp *v1.ContainerPort - for i := range cnt.Ports { - if cnt.Ports[i].Name == constants.FLPPortName { - cp = &cnt.Ports[i] - break - } - } - Expect(cp). - ToNot(BeNil(), "can't find a container port named", constants.FLPPortName) - Expect(*cp).To(Equal(v1.ContainerPort{ - Name: constants.FLPPortName, - HostPort: 7891, - ContainerPort: 7891, - Protocol: "UDP", - })) - Expect(cnt.Env).To(Equal([]v1.EnvVar{ - {Name: "GOGC", Value: "400"}, {Name: "GOMAXPROCS", Value: "33"}, {Name: "GODEBUG", Value: "http2server=0"}, - })) - }) - - By("Allocating the proper toleration to allow its placement in the master nodes", func() { - Expect(ds.Spec.Template.Spec.Tolerations). - To(ContainElement(v1.Toleration{Operator: v1.TolerationOpExists})) - }) - - By("Expecting the flow dashboards configmap to be deleted") - Eventually(func() interface{} { - return k8sClient.Get(ctx, types.NamespacedName{ - Name: "grafana-dashboard-netobserv", - Namespace: "openshift-config-managed", - }, &v1.ConfigMap{}) - }, timeout, interval).Should(MatchError(`configmaps "grafana-dashboard-netobserv" not found`)) - - By("Expecting the health dashboards rows to be filtered") - Eventually(func() interface{} { - cm := v1.ConfigMap{} - if err := k8sClient.Get(ctx, types.NamespacedName{ - Name: "grafana-dashboard-netobserv-health", - Namespace: "openshift-config-managed", - }, &cm); err != nil { - return err - } - d, err := test.DashboardFromBytes([]byte(cm.Data["netobserv-health-metrics.json"])) - if err != nil { - return err - } - return d.Titles() - }, timeout, interval).Should(Equal([]string{ - "Flows", - "Agents", - "Processor", - "Operator", - })) - }) - - It("Should redeploy if the spec doesn't change but the external flowlogs-pipeline-config does", func() { - UpdateCR(crKey, func(fc *flowslatest.FlowCollector) { - fc.Spec.Loki.MaxRetries = ptr.To(int32(7)) - }) - - By("Expecting that the flowlogsPipeline.PodConfigurationDigest attribute has changed") - Eventually(func() error { - if err := k8sClient.Get(ctx, flpKey1, &ds); err != nil { - return err - } - return checkDigestUpdate(&digest, ds.Spec.Template.Annotations) - }).Should(Succeed()) }) It("Should prevent undesired sampling-everything", func() { @@ -415,503 +187,15 @@ func flowCollectorControllerSpecs() { return ofc.Data["sampling"] }, timeout, interval).Should(Equal("1")) }) - - It("Should create desired objects when they're not found (e.g. case of an operator upgrade)", func() { - psvc := v1.Service{} - sm := monitoringv1.ServiceMonitor{} - pr := monitoringv1.PrometheusRule{} - By("Expecting prometheus service to exist") - Eventually(func() interface{} { - return k8sClient.Get(ctx, types.NamespacedName{ - Name: "flowlogs-pipeline-prom", - Namespace: operatorNamespace, - }, &psvc) - }, timeout, interval).Should(Succeed()) - - By("Expecting ServiceMonitor to exist") - Eventually(func() interface{} { - return k8sClient.Get(ctx, types.NamespacedName{ - Name: "flowlogs-pipeline-monitor", - Namespace: operatorNamespace, - }, &sm) - }, timeout, interval).Should(Succeed()) - - By("Expecting PrometheusRule to exist and be updated") - Eventually(func() interface{} { - return k8sClient.Get(ctx, types.NamespacedName{ - Name: "flowlogs-pipeline-alert", - Namespace: operatorNamespace, - }, &pr) - }, timeout, interval).Should(Succeed()) - Expect(pr.Spec.Groups).Should(HaveLen(1)) - Expect(pr.Spec.Groups[0].Rules).Should(HaveLen(1)) - - // Manually delete ServiceMonitor - By("Deleting ServiceMonitor") - Eventually(func() error { - return k8sClient.Delete(ctx, &sm) - }, timeout, interval).Should(Succeed()) - - // Do a dummy change that will trigger reconcile, and make sure SM is created again - UpdateCR(crKey, func(fc *flowslatest.FlowCollector) { - fc.Spec.Processor.LogLevel = "info" - }) - By("Expecting ServiceMonitor to exist") - Eventually(func() interface{} { - return k8sClient.Get(ctx, types.NamespacedName{ - Name: "flowlogs-pipeline-monitor", - Namespace: operatorNamespace, - }, &sm) - }, timeout, interval).Should(Succeed()) - - // Manually delete Rule - By("Deleting prom rule") - Eventually(func() error { - return k8sClient.Delete(ctx, &pr) - }, timeout, interval).Should(Succeed()) - - // Do a dummy change that will trigger reconcile, and make sure Rule is created again - UpdateCR(crKey, func(fc *flowslatest.FlowCollector) { - fc.Spec.Processor.LogLevel = "debug" - }) - By("Expecting PrometheusRule to exist") - Eventually(func() interface{} { - return k8sClient.Get(ctx, types.NamespacedName{ - Name: "flowlogs-pipeline-alert", - Namespace: operatorNamespace, - }, &pr) - }, timeout, interval).Should(Succeed()) - }) - }) - - Context("With Kafka", func() { - It("Should update kafka config successfully", func() { - UpdateCR(crKey, func(fc *flowslatest.FlowCollector) { - fc.Spec.DeploymentModel = flowslatest.DeploymentModelKafka - fc.Spec.Kafka = flowslatest.FlowCollectorKafka{ - Address: "localhost:9092", - Topic: "FLP", - TLS: flowslatest.ClientTLS{ - CACert: flowslatest.CertificateReference{ - Type: "secret", - Name: "some-secret", - CertFile: "ca.crt", - }, - }, - } - }) - }) - - It("Should deploy kafka ingester and transformer", func() { - By("Expecting ingester daemonset to be created") - Eventually(func() interface{} { - return k8sClient.Get(ctx, flpKeyKafkaIngester, &appsv1.DaemonSet{}) - }, timeout, interval).Should(Succeed()) - - By("Expecting transformer deployment to be created") - Eventually(func() interface{} { - return k8sClient.Get(ctx, flpKeyKafkaTransformer, &appsv1.Deployment{}) - }, timeout, interval).Should(Succeed()) - - By("Not Expecting transformer service to be created") - Eventually(func() interface{} { - return k8sClient.Get(ctx, flpKeyKafkaTransformer, &v1.Service{}) - }, timeout, interval).Should(MatchError(`services "flowlogs-pipeline-transformer" not found`)) - - By("Expecting to create two different flowlogs-pipeline role bindings") - rb1 := rbacv1.ClusterRoleBinding{} - Eventually(func() interface{} { - return k8sClient.Get(ctx, rbKeyIngest, &rb1) - }, timeout, interval).Should(Succeed()) - Expect(rb1.Subjects).Should(HaveLen(1)) - Expect(rb1.Subjects[0].Name).Should(Equal("flowlogs-pipeline-ingester")) - Expect(rb1.RoleRef.Name).Should(Equal("flowlogs-pipeline-ingester")) - - rb2 := rbacv1.ClusterRoleBinding{} - Eventually(func() interface{} { - return k8sClient.Get(ctx, rbKeyTransform, &rb2) - }, timeout, interval).Should(Succeed()) - Expect(rb2.Subjects).Should(HaveLen(1)) - Expect(rb2.Subjects[0].Name).Should(Equal("flowlogs-pipeline-transformer")) - Expect(rb2.RoleRef.Name).Should(Equal("flowlogs-pipeline-transformer")) - - By("Not expecting mono-transformer role binding") - Eventually(func() interface{} { - return k8sClient.Get(ctx, rbKeyIngestMono, &rbacv1.ClusterRoleBinding{}) - }, timeout, interval).Should(MatchError(`clusterrolebindings.rbac.authorization.k8s.io "flowlogs-pipeline-ingester-role-mono" not found`)) - - By("Not expecting mono-ingester role binding") - Eventually(func() interface{} { - return k8sClient.Get(ctx, rbKeyTransformMono, &rbacv1.ClusterRoleBinding{}) - }, timeout, interval).Should(MatchError(`clusterrolebindings.rbac.authorization.k8s.io "flowlogs-pipeline-transformer-role-mono" not found`)) - }) - - It("Should delete previous flp deployment", func() { - By("Expecting monolith to be deleted") - Eventually(func() interface{} { - return k8sClient.Get(ctx, flpKey1, &appsv1.DaemonSet{}) - }, timeout, interval).Should(MatchError(`daemonsets.apps "flowlogs-pipeline" not found`)) - }) - }) - - Context("Adding auto-scaling", func() { - hpa := ascv2.HorizontalPodAutoscaler{} - It("Should update with HPA", func() { - UpdateCR(crKey, func(fc *flowslatest.FlowCollector) { - fc.Spec.Processor.KafkaConsumerAutoscaler = flowslatest.FlowCollectorHPA{ - Status: flowslatest.HPAStatusEnabled, - MinReplicas: ptr.To(int32(1)), - MaxReplicas: 1, - Metrics: []ascv2.MetricSpec{{ - Type: ascv2.ResourceMetricSourceType, - Resource: &ascv2.ResourceMetricSource{ - Name: v1.ResourceCPU, - Target: ascv2.MetricTarget{ - Type: ascv2.UtilizationMetricType, - AverageUtilization: ptr.To(int32(90)), - }, - }, - }}, - } - }) - }) - - It("Should have HPA installed", func() { - By("Expecting HPA to be created") - Eventually(func() interface{} { - return k8sClient.Get(ctx, flpKeyKafkaTransformer, &hpa) - }, timeout, interval).Should(Succeed()) - Expect(*hpa.Spec.MinReplicas).To(Equal(int32(1))) - Expect(hpa.Spec.MaxReplicas).To(Equal(int32(1))) - Expect(*hpa.Spec.Metrics[0].Resource.Target.AverageUtilization).To(Equal(int32(90))) - }) - - It("Should autoscale when the HPA options change", func() { - UpdateCR(crKey, func(fc *flowslatest.FlowCollector) { - fc.Spec.Processor.KafkaConsumerAutoscaler.MinReplicas = ptr.To(int32(2)) - fc.Spec.Processor.KafkaConsumerAutoscaler.MaxReplicas = 2 - }) - - By("Changing the Horizontal Pod Autoscaler instance") - Eventually(func() error { - if err := k8sClient.Get(ctx, flpKeyKafkaTransformer, &hpa); err != nil { - return err - } - if *hpa.Spec.MinReplicas != int32(2) || hpa.Spec.MaxReplicas != int32(2) || - *hpa.Spec.Metrics[0].Resource.Target.AverageUtilization != int32(90) { - return fmt.Errorf("expected {2, 2, 90}: Got %v, %v, %v", - *hpa.Spec.MinReplicas, hpa.Spec.MaxReplicas, - *hpa.Spec.Metrics[0].Resource.Target.AverageUtilization) - } - return nil - }, timeout, interval).Should(Succeed()) - }) - }) - - Context("Back without Kafka", func() { - It("Should remove kafka config successfully", func() { - UpdateCR(crKey, func(fc *flowslatest.FlowCollector) { - fc.Spec.DeploymentModel = flowslatest.DeploymentModelDirect - }) - }) - - It("Should deploy single flp again", func() { - By("Expecting daemonset to be created") - Eventually(func() interface{} { - return k8sClient.Get(ctx, flpKey1, &appsv1.DaemonSet{}) - }, timeout, interval).Should(Succeed()) - }) - - It("Should delete kafka ingester and transformer", func() { - By("Expecting ingester daemonset to be deleted") - Eventually(func() interface{} { - return k8sClient.Get(ctx, flpKeyKafkaIngester, &appsv1.DaemonSet{}) - }, timeout, interval).Should(MatchError(`daemonsets.apps "flowlogs-pipeline-ingester" not found`)) - - By("Expecting transformer deployment to be deleted") - Eventually(func() interface{} { - return k8sClient.Get(ctx, flpKeyKafkaTransformer, &appsv1.Deployment{}) - }, timeout, interval).Should(MatchError(`deployments.apps "flowlogs-pipeline-transformer" not found`)) - }) - }) - - Context("Using certificates with loki manual mode", func() { - flpDS := appsv1.DaemonSet{} - It("Should update Loki to use TLS", func() { - // Create CM certificate - cm := &v1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{ - Name: "loki-ca", - Namespace: operatorNamespace, - }, - Data: map[string]string{"ca.crt": "certificate data"}, - } - cleanupList = append(cleanupList, cm) - Expect(k8sClient.Create(ctx, cm)).Should(Succeed()) - UpdateCR(crKey, func(fc *flowslatest.FlowCollector) { - fc.Spec.Loki.Mode = flowslatest.LokiModeManual - fc.Spec.Loki.Manual.TLS = flowslatest.ClientTLS{ - Enable: true, - CACert: flowslatest.CertificateReference{ - Type: flowslatest.RefTypeConfigMap, - Name: "loki-ca", - CertFile: "ca.crt", - }, - } - }) - }) - - It("Should have certificate mounted", func() { - By("Expecting certificate mounted") - Eventually(func() interface{} { - if err := k8sClient.Get(ctx, flpKey1, &flpDS); err != nil { - return err - } - return flpDS.Spec.Template.Spec.Volumes - }, timeout, interval).Should(HaveLen(2)) - Expect(flpDS.Spec.Template.Spec.Volumes[0].Name).To(Equal("config-volume")) - Expect(flpDS.Spec.Template.Spec.Volumes[1].Name).To(Equal("loki-certs-ca")) - }) - - It("Should restore no TLS config", func() { - UpdateCR(crKey, func(fc *flowslatest.FlowCollector) { - fc.Spec.Loki.Manual.TLS = flowslatest.ClientTLS{ - Enable: false, - } - }) - Eventually(func() interface{} { - if err := k8sClient.Get(ctx, flpKey1, &flpDS); err != nil { - return err - } - return flpDS.Spec.Template.Spec.Volumes - }, timeout, interval).Should(HaveLen(1)) - Expect(flpDS.Spec.Template.Spec.Volumes[0].Name).To(Equal("config-volume")) - }) - }) - - Context("Using certificates with loki distributed mode", func() { - flpDS := appsv1.DaemonSet{} - It("Should update Loki to use TLS", func() { - // Create CM certificate - cm := &v1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{ - Name: "loki-distri-ca", - Namespace: operatorNamespace, - }, - Data: map[string]string{"ca.crt": "certificate data"}, - } - cleanupList = append(cleanupList, cm) - Expect(k8sClient.Create(ctx, cm)).Should(Succeed()) - UpdateCR(crKey, func(fc *flowslatest.FlowCollector) { - fc.Spec.Loki.Mode = flowslatest.LokiModeMicroservices - fc.Spec.Loki.Microservices = flowslatest.LokiMicroservicesParams{ - IngesterURL: "http://loki-ingested:3100/", - QuerierURL: "http://loki-queries:3100/", - TLS: flowslatest.ClientTLS{ - Enable: true, - CACert: flowslatest.CertificateReference{ - Type: flowslatest.RefTypeConfigMap, - Name: "loki-distri-ca", - CertFile: "ca.crt", - }, - }, - } - }) - }) - - It("Should have certificate mounted", func() { - By("Expecting certificate mounted") - Eventually(func() interface{} { - if err := k8sClient.Get(ctx, flpKey1, &flpDS); err != nil { - return err - } - return flpDS.Spec.Template.Spec.Volumes - }, timeout, interval).Should(HaveLen(2)) - Expect(flpDS.Spec.Template.Spec.Volumes[0].Name).To(Equal("config-volume")) - Expect(flpDS.Spec.Template.Spec.Volumes[1].Name).To(Equal("loki-certs-ca")) - }) - - It("Should restore no TLS config", func() { - UpdateCR(crKey, func(fc *flowslatest.FlowCollector) { - fc.Spec.Loki.Microservices.TLS = flowslatest.ClientTLS{ - Enable: false, - } - }) - Eventually(func() interface{} { - if err := k8sClient.Get(ctx, flpKey1, &flpDS); err != nil { - return err - } - return flpDS.Spec.Template.Spec.Volumes - }, timeout, interval).Should(HaveLen(1)) - Expect(flpDS.Spec.Template.Spec.Volumes[0].Name).To(Equal("config-volume")) - }) - }) - - Context("Using certificates with loki monolithic mode", func() { - flpDS := appsv1.DaemonSet{} - It("Should update Loki to use TLS", func() { - // Create CM certificate - cm := &v1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{ - Name: "loki-mono-ca", - Namespace: operatorNamespace, - }, - Data: map[string]string{"ca.crt": "certificate data"}, - } - cleanupList = append(cleanupList, cm) - Expect(k8sClient.Create(ctx, cm)).Should(Succeed()) - UpdateCR(crKey, func(fc *flowslatest.FlowCollector) { - fc.Spec.Loki.Mode = flowslatest.LokiModeMonolithic - fc.Spec.Loki.Monolithic = flowslatest.LokiMonolithParams{ - URL: "http://loki-mono:3100/", - TLS: flowslatest.ClientTLS{ - Enable: true, - CACert: flowslatest.CertificateReference{ - Type: flowslatest.RefTypeConfigMap, - Name: "loki-mono-ca", - CertFile: "ca.crt", - }, - }, - } - }) - }) - - It("Should have certificate mounted", func() { - By("Expecting certificate mounted") - Eventually(func() interface{} { - if err := k8sClient.Get(ctx, flpKey1, &flpDS); err != nil { - return err - } - return flpDS.Spec.Template.Spec.Volumes - }, timeout, interval).Should(HaveLen(2)) - Expect(flpDS.Spec.Template.Spec.Volumes[0].Name).To(Equal("config-volume")) - Expect(flpDS.Spec.Template.Spec.Volumes[1].Name).To(Equal("loki-certs-ca")) - }) - - It("Should restore no TLS config", func() { - UpdateCR(crKey, func(fc *flowslatest.FlowCollector) { - fc.Spec.Loki.Monolithic.TLS = flowslatest.ClientTLS{ - Enable: false, - } - }) - Eventually(func() interface{} { - if err := k8sClient.Get(ctx, flpKey1, &flpDS); err != nil { - return err - } - return flpDS.Spec.Template.Spec.Volumes - }, timeout, interval).Should(HaveLen(1)) - Expect(flpDS.Spec.Template.Spec.Volumes[0].Name).To(Equal("config-volume")) - }) - }) - - Context("Using Certificates With Loki in LokiStack Mode", func() { - flpDS := appsv1.DaemonSet{} - It("Should update Loki config successfully", func() { - // Create CM certificate - cm := &v1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{ - Name: "lokistack-gateway-ca-bundle", - Namespace: operatorNamespace, - }, - Data: map[string]string{"service-ca.crt": "certificate data"}, - } - cleanupList = append(cleanupList, cm) - Expect(k8sClient.Create(ctx, cm)).Should(Succeed()) - UpdateCR(crKey, func(fc *flowslatest.FlowCollector) { - fc.Spec.Loki.Mode = flowslatest.LokiModeLokiStack - fc.Spec.Loki.LokiStack = flowslatest.LokiStackRef{ - Name: "lokistack", - Namespace: operatorNamespace, - } - }) - }) - - It("Should have certificate mounted", func() { - By("Expecting certificate mounted") - Eventually(func() interface{} { - if err := k8sClient.Get(ctx, flpKey1, &flpDS); err != nil { - return err - } - return flpDS.Spec.Template.Spec.Volumes - }, timeout, interval).Should(HaveLen(3)) - Expect(flpDS.Spec.Template.Spec.Volumes[0].Name).To(Equal("config-volume")) - Expect(flpDS.Spec.Template.Spec.Volumes[1].Name).To(Equal("flowlogs-pipeline")) - Expect(flpDS.Spec.Template.Spec.Volumes[2].Name).To(Equal("loki-certs-ca")) - }) - - It("Should deploy Loki roles", func() { - By("Expecting Writer ClusterRole") - Eventually(func() interface{} { - var cr rbacv1.ClusterRole - return k8sClient.Get(ctx, types.NamespacedName{Name: constants.LokiCRWriter}, &cr) - }, timeout, interval).Should(Succeed()) - By("Expecting Reader ClusterRole") - Eventually(func() interface{} { - var cr rbacv1.ClusterRole - return k8sClient.Get(ctx, types.NamespacedName{Name: constants.LokiCRReader}, &cr) - }, timeout, interval).Should(Succeed()) - By("Expecting FLP Writer ClusterRoleBinding") - Eventually(func() interface{} { - var crb rbacv1.ClusterRoleBinding - return k8sClient.Get(ctx, types.NamespacedName{Name: constants.LokiCRBWriter}, &crb) - }, timeout, interval).Should(Succeed()) - }) - - It("Should restore no TLS config in manual mode", func() { - UpdateCR(crKey, func(fc *flowslatest.FlowCollector) { - fc.Spec.Loki.Mode = flowslatest.LokiModeManual - fc.Spec.Loki.Manual.TLS = flowslatest.ClientTLS{ - Enable: false, - } - }) - Eventually(func() interface{} { - if err := k8sClient.Get(ctx, flpKey1, &flpDS); err != nil { - return err - } - return flpDS.Spec.Template.Spec.Volumes - }, timeout, interval).Should(HaveLen(1)) - Expect(flpDS.Spec.Template.Spec.Volumes[0].Name).To(Equal("config-volume")) - }) }) Context("Changing namespace", func() { It("Should update namespace successfully", func() { - UpdateCR(crKey, func(fc *flowslatest.FlowCollector) { - fc.Spec.Processor.Port = 9999 + updateCR(crKey, func(fc *flowslatest.FlowCollector) { fc.Spec.Namespace = otherNamespace - fc.Spec.Agent.IPFIX = flowslatest.FlowCollectorIPFIX{ - Sampling: 200, - } }) }) - It("Should redeploy FLP in new namespace", func() { - By("Expecting daemonset in previous namespace to be deleted") - Eventually(func() interface{} { - return k8sClient.Get(ctx, flpKey1, &appsv1.DaemonSet{}) - }, timeout, interval).Should(MatchError(`daemonsets.apps "flowlogs-pipeline" not found`)) - - By("Expecting deployment in previous namespace to be deleted") - Eventually(func() interface{} { - return k8sClient.Get(ctx, flpKey1, &appsv1.Deployment{}) - }, timeout, interval).Should(MatchError(`deployments.apps "flowlogs-pipeline" not found`)) - - By("Expecting service account in previous namespace to be deleted") - Eventually(func() interface{} { - return k8sClient.Get(ctx, flpKey1, &v1.ServiceAccount{}) - }, timeout, interval).Should(MatchError(`serviceaccounts "flowlogs-pipeline" not found`)) - - By("Expecting daemonset to be created in new namespace") - Eventually(func() interface{} { - return k8sClient.Get(ctx, flpKey2, &appsv1.DaemonSet{}) - }, timeout, interval).Should(Succeed()) - - By("Expecting service account to be created in new namespace") - Eventually(func() interface{} { - return k8sClient.Get(ctx, flpKey2, &v1.ServiceAccount{}) - }, timeout, interval).Should(Succeed()) - }) - It("Should redeploy console plugin in new namespace", func() { By("Expecting deployment in previous namespace to be deleted") Eventually(func() interface{} { @@ -961,20 +245,6 @@ func flowCollectorControllerSpecs() { }) It("Should be garbage collected", func() { - By("Expecting flowlogs-pipeline daemonset to be garbage collected") - Eventually(func() interface{} { - d := appsv1.DaemonSet{} - _ = k8sClient.Get(ctx, flpKey2, &d) - return &d - }, timeout, interval).Should(BeGarbageCollectedBy(&flowCR)) - - By("Expecting flowlogs-pipeline service account to be garbage collected") - Eventually(func() interface{} { - svcAcc := v1.ServiceAccount{} - _ = k8sClient.Get(ctx, flpKey2, &svcAcc) - return &svcAcc - }, timeout, interval).Should(BeGarbageCollectedBy(&flowCR)) - By("Expecting console plugin deployment to be garbage collected") Eventually(func() interface{} { d := appsv1.Deployment{} @@ -1002,16 +272,6 @@ func flowCollectorControllerSpecs() { _ = k8sClient.Get(ctx, ovsConfigMapKey, &cm) return &cm }, timeout, interval).Should(BeGarbageCollectedBy(&flowCR)) - - By("Expecting flowlogs-pipeline configmap to be garbage collected") - Eventually(func() interface{} { - cm := v1.ConfigMap{} - _ = k8sClient.Get(ctx, types.NamespacedName{ - Name: "flowlogs-pipeline-config", - Namespace: otherNamespace, - }, &cm) - return &cm - }, timeout, interval).Should(BeGarbageCollectedBy(&flowCR)) }) It("Should not get CR", func() { @@ -1030,30 +290,3 @@ func flowCollectorControllerSpecs() { }) }) } - -func GetCR(key types.NamespacedName) *flowslatest.FlowCollector { - cr := flowslatest.FlowCollector{} - Eventually(func() error { - return k8sClient.Get(ctx, key, &cr) - }).Should(Succeed()) - return &cr -} - -func UpdateCR(key types.NamespacedName, updater func(*flowslatest.FlowCollector)) { - Eventually(func() error { - cr := GetCR(key) - updater(cr) - return k8sClient.Update(ctx, cr) - }, timeout, interval).Should(Succeed()) -} - -func checkDigestUpdate(oldDigest *string, annots map[string]string) error { - newDigest := annots[constants.PodConfigurationDigest] - if newDigest == "" { - return fmt.Errorf("%q annotation can't be empty", constants.PodConfigurationDigest) - } else if newDigest == *oldDigest { - return fmt.Errorf("expect digest to change, but is still %s", *oldDigest) - } - *oldDigest = newDigest - return nil -} diff --git a/controllers/flp/flp_controller_test.go b/controllers/flp/flp_controller_test.go new file mode 100644 index 000000000..92043ed29 --- /dev/null +++ b/controllers/flp/flp_controller_test.go @@ -0,0 +1,827 @@ +package flp + +import ( + "fmt" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + monitoringv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1" + appsv1 "k8s.io/api/apps/v1" + ascv2 "k8s.io/api/autoscaling/v2" + v1 "k8s.io/api/core/v1" + rbacv1 "k8s.io/api/rbac/v1" + kerr "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/utils/ptr" + "sigs.k8s.io/controller-runtime/pkg/client" + + flowslatest "github.com/netobserv/network-observability-operator/api/v1beta2" + "github.com/netobserv/network-observability-operator/controllers/constants" + . "github.com/netobserv/network-observability-operator/controllers/controllerstest" + "github.com/netobserv/network-observability-operator/pkg/test" +) + +const ( + timeout = test.Timeout + interval = test.Interval +) + +var ( + outputRecordTypes = flowslatest.LogTypeAll + updateCR = func(key types.NamespacedName, updater func(*flowslatest.FlowCollector)) { + test.UpdateCR(ctx, k8sClient, key, updater) + } +) + +// nolint:cyclop +func ControllerSpecs() { + const operatorNamespace = "main-namespace" + const otherNamespace = "other-namespace" + crKey := types.NamespacedName{ + Name: "cluster", + } + flpKey1 := types.NamespacedName{ + Name: constants.FLPName, + Namespace: operatorNamespace, + } + flpKey2 := types.NamespacedName{ + Name: constants.FLPName, + Namespace: otherNamespace, + } + flpKeyKafkaIngester := types.NamespacedName{ + Name: constants.FLPName + FlpConfSuffix[ConfKafkaIngester], + Namespace: operatorNamespace, + } + flpKeyKafkaTransformer := types.NamespacedName{ + Name: constants.FLPName + FlpConfSuffix[ConfKafkaTransformer], + Namespace: operatorNamespace, + } + rbKeyIngest := types.NamespacedName{Name: RoleBindingName(ConfKafkaIngester)} + rbKeyTransform := types.NamespacedName{Name: RoleBindingName(ConfKafkaTransformer)} + rbKeyIngestMono := types.NamespacedName{Name: RoleBindingMonoName(ConfKafkaIngester)} + rbKeyTransformMono := types.NamespacedName{Name: RoleBindingMonoName(ConfKafkaTransformer)} + + // Created objects to cleanup + cleanupList := []client.Object{} + + BeforeEach(func() { + // Add any setup steps that needs to be executed before each test + }) + + AfterEach(func() { + // Add any teardown steps that needs to be executed after each test + }) + + Context("Deploying as DaemonSet", func() { + var digest string + ds := appsv1.DaemonSet{} + It("Should create successfully", func() { + created := &flowslatest.FlowCollector{ + ObjectMeta: metav1.ObjectMeta{Name: crKey.Name}, + Spec: flowslatest.FlowCollectorSpec{ + Namespace: operatorNamespace, + DeploymentModel: flowslatest.DeploymentModelDirect, + Processor: flowslatest.FlowCollectorFLP{ + ImagePullPolicy: "Never", + LogLevel: "error", + Debug: flowslatest.DebugConfig{ + Env: map[string]string{ + "GOGC": "200", + }, + }, + LogTypes: &outputRecordTypes, + ConversationHeartbeatInterval: &metav1.Duration{ + Duration: conntrackHeartbeatInterval, + }, + ConversationEndTimeout: &metav1.Duration{ + Duration: conntrackEndTimeout, + }, + ConversationTerminatingTimeout: &metav1.Duration{ + Duration: conntrackTerminatingTimeout, + }, + Metrics: flowslatest.FLPMetrics{ + IncludeList: &[]flowslatest.FLPMetric{"node_ingress_bytes_total", "namespace_ingress_bytes_total", "workload_ingress_bytes_total"}, + }, + }, + }, + } + + // Create + Expect(k8sClient.Create(ctx, created)).Should(Succeed()) + + By("Expecting to create the flowlogs-pipeline DaemonSet") + Eventually(func() error { + if err := k8sClient.Get(ctx, flpKey1, &ds); err != nil { + return err + } + digest = ds.Spec.Template.Annotations[constants.PodConfigurationDigest] + if digest == "" { + return fmt.Errorf("%q annotation can't be empty", constants.PodConfigurationDigest) + } + return nil + }, timeout, interval).Should(Succeed()) + + By("Expecting to create the flowlogs-pipeline ServiceAccount") + Eventually(func() interface{} { + svcAcc := v1.ServiceAccount{} + if err := k8sClient.Get(ctx, flpKey1, &svcAcc); err != nil { + return err + } + return svcAcc + }, timeout, interval).Should(Satisfy(func(svcAcc v1.ServiceAccount) bool { + return svcAcc.Labels != nil && svcAcc.Labels["app"] == constants.FLPName + })) + + By("Expecting to create two flowlogs-pipeline role binding") + rb1 := rbacv1.ClusterRoleBinding{} + Eventually(func() interface{} { + return k8sClient.Get(ctx, rbKeyIngestMono, &rb1) + }, timeout, interval).Should(Succeed()) + Expect(rb1.Subjects).Should(HaveLen(1)) + Expect(rb1.Subjects[0].Name).Should(Equal("flowlogs-pipeline")) + Expect(rb1.RoleRef.Name).Should(Equal("flowlogs-pipeline-ingester")) + + rb2 := rbacv1.ClusterRoleBinding{} + Eventually(func() interface{} { + return k8sClient.Get(ctx, rbKeyTransformMono, &rb2) + }, timeout, interval).Should(Succeed()) + Expect(rb2.Subjects).Should(HaveLen(1)) + Expect(rb2.Subjects[0].Name).Should(Equal("flowlogs-pipeline")) + Expect(rb2.RoleRef.Name).Should(Equal("flowlogs-pipeline-transformer")) + + By("Not expecting transformer role binding") + Eventually(func() interface{} { + return k8sClient.Get(ctx, rbKeyIngest, &rbacv1.ClusterRoleBinding{}) + }, timeout, interval).Should(MatchError(`clusterrolebindings.rbac.authorization.k8s.io "flowlogs-pipeline-ingester-role" not found`)) + + By("Not expecting ingester role binding") + Eventually(func() interface{} { + return k8sClient.Get(ctx, rbKeyTransform, &rbacv1.ClusterRoleBinding{}) + }, timeout, interval).Should(MatchError(`clusterrolebindings.rbac.authorization.k8s.io "flowlogs-pipeline-transformer-role" not found`)) + + By("Expecting flowlogs-pipeline-config configmap to be created") + Eventually(func() interface{} { + cm := v1.ConfigMap{} + return k8sClient.Get(ctx, types.NamespacedName{ + Name: "flowlogs-pipeline-config", + Namespace: operatorNamespace, + }, &cm) + }, timeout, interval).Should(Succeed()) + }) + + It("Should update successfully", func() { + updateCR(crKey, func(fc *flowslatest.FlowCollector) { + fc.Spec.Processor = flowslatest.FlowCollectorFLP{ + Port: 7891, + ImagePullPolicy: "Never", + LogLevel: "error", + Debug: flowslatest.DebugConfig{ + Env: map[string]string{ + // we'll test that env vars are sorted, to keep idempotency + "GOMAXPROCS": "33", + "GOGC": "400", + }, + }, + LogTypes: &outputRecordTypes, + ConversationHeartbeatInterval: &metav1.Duration{ + Duration: conntrackHeartbeatInterval, + }, + ConversationEndTimeout: &metav1.Duration{ + Duration: conntrackEndTimeout, + }, + ConversationTerminatingTimeout: &metav1.Duration{ + Duration: conntrackTerminatingTimeout, + }, + Metrics: flowslatest.FLPMetrics{ + IncludeList: &[]flowslatest.FLPMetric{"node_ingress_bytes_total"}, + DisableAlerts: []flowslatest.FLPAlert{flowslatest.AlertLokiError}, + }, + } + // Using IPFIX should change proto to UDP + fc.Spec.Agent.Type = flowslatest.AgentIPFIX + fc.Spec.Loki = flowslatest.FlowCollectorLoki{} + }) + + By("CR updated", func() { + Eventually(func() error { + err := k8sClient.Get(ctx, flpKey1, &ds) + if err != nil { + return err + } + return checkDigestUpdate(&digest, ds.Spec.Template.Annotations) + }, timeout, interval).Should(Succeed()) + }) + + By("Creating the required HostPort to access flowlogs-pipeline through the NodeIP", func() { + var cnt *v1.Container + for i := range ds.Spec.Template.Spec.Containers { + if ds.Spec.Template.Spec.Containers[i].Name == constants.FLPName { + cnt = &ds.Spec.Template.Spec.Containers[i] + break + } + } + Expect(cnt).ToNot(BeNil(), "can't find a container named", constants.FLPName) + var cp *v1.ContainerPort + for i := range cnt.Ports { + if cnt.Ports[i].Name == constants.FLPPortName { + cp = &cnt.Ports[i] + break + } + } + Expect(cp).ToNot(BeNil(), "can't find a container port named", constants.FLPPortName) + Expect(*cp).To(Equal(v1.ContainerPort{ + Name: constants.FLPPortName, + HostPort: 7891, + ContainerPort: 7891, + Protocol: "UDP", + })) + Expect(cnt.Env).To(Equal([]v1.EnvVar{ + {Name: "GOGC", Value: "400"}, {Name: "GOMAXPROCS", Value: "33"}, {Name: "GODEBUG", Value: "http2server=0"}, + })) + }) + + By("Allocating the proper toleration to allow its placement in the master nodes", func() { + Expect(ds.Spec.Template.Spec.Tolerations). + To(ContainElement(v1.Toleration{Operator: v1.TolerationOpExists})) + }) + }) + + It("Should redeploy if the spec doesn't change but the external flowlogs-pipeline-config does", func() { + updateCR(crKey, func(fc *flowslatest.FlowCollector) { + fc.Spec.Loki.MaxRetries = ptr.To(int32(7)) + }) + + By("Expecting that the flowlogsPipeline.PodConfigurationDigest attribute has changed") + Eventually(func() error { + if err := k8sClient.Get(ctx, flpKey1, &ds); err != nil { + return err + } + return checkDigestUpdate(&digest, ds.Spec.Template.Annotations) + }).Should(Succeed()) + }) + }) + + Context("With Kafka", func() { + It("Should update kafka config successfully", func() { + updateCR(crKey, func(fc *flowslatest.FlowCollector) { + fc.Spec.DeploymentModel = flowslatest.DeploymentModelKafka + fc.Spec.Kafka = flowslatest.FlowCollectorKafka{ + Address: "localhost:9092", + Topic: "FLP", + TLS: flowslatest.ClientTLS{ + CACert: flowslatest.CertificateReference{ + Type: "secret", + Name: "some-secret", + CertFile: "ca.crt", + }, + }, + } + }) + }) + + It("Should deploy kafka ingester and transformer", func() { + By("Expecting ingester daemonset to be created") + Eventually(func() interface{} { + return k8sClient.Get(ctx, flpKeyKafkaIngester, &appsv1.DaemonSet{}) + }, timeout, interval).Should(Succeed()) + + By("Expecting transformer deployment to be created") + Eventually(func() interface{} { + return k8sClient.Get(ctx, flpKeyKafkaTransformer, &appsv1.Deployment{}) + }, timeout, interval).Should(Succeed()) + + By("Not Expecting transformer service to be created") + Eventually(func() interface{} { + return k8sClient.Get(ctx, flpKeyKafkaTransformer, &v1.Service{}) + }, timeout, interval).Should(MatchError(`services "flowlogs-pipeline-transformer" not found`)) + + By("Expecting to create two different flowlogs-pipeline role bindings") + rb1 := rbacv1.ClusterRoleBinding{} + Eventually(func() interface{} { + return k8sClient.Get(ctx, rbKeyIngest, &rb1) + }, timeout, interval).Should(Succeed()) + Expect(rb1.Subjects).Should(HaveLen(1)) + Expect(rb1.Subjects[0].Name).Should(Equal("flowlogs-pipeline-ingester")) + Expect(rb1.RoleRef.Name).Should(Equal("flowlogs-pipeline-ingester")) + + rb2 := rbacv1.ClusterRoleBinding{} + Eventually(func() interface{} { + return k8sClient.Get(ctx, rbKeyTransform, &rb2) + }, timeout, interval).Should(Succeed()) + Expect(rb2.Subjects).Should(HaveLen(1)) + Expect(rb2.Subjects[0].Name).Should(Equal("flowlogs-pipeline-transformer")) + Expect(rb2.RoleRef.Name).Should(Equal("flowlogs-pipeline-transformer")) + + By("Not expecting mono-transformer role binding") + Eventually(func() interface{} { + return k8sClient.Get(ctx, rbKeyIngestMono, &rbacv1.ClusterRoleBinding{}) + }, timeout, interval).Should(MatchError(`clusterrolebindings.rbac.authorization.k8s.io "flowlogs-pipeline-ingester-role-mono" not found`)) + + By("Not expecting mono-ingester role binding") + Eventually(func() interface{} { + return k8sClient.Get(ctx, rbKeyTransformMono, &rbacv1.ClusterRoleBinding{}) + }, timeout, interval).Should(MatchError(`clusterrolebindings.rbac.authorization.k8s.io "flowlogs-pipeline-transformer-role-mono" not found`)) + }) + + It("Should delete previous flp deployment", func() { + By("Expecting monolith to be deleted") + Eventually(func() interface{} { + return k8sClient.Get(ctx, flpKey1, &appsv1.DaemonSet{}) + }, timeout, interval).Should(MatchError(`daemonsets.apps "flowlogs-pipeline" not found`)) + }) + }) + + Context("Adding auto-scaling", func() { + hpa := ascv2.HorizontalPodAutoscaler{} + It("Should update with HPA", func() { + updateCR(crKey, func(fc *flowslatest.FlowCollector) { + fc.Spec.Processor.KafkaConsumerAutoscaler = flowslatest.FlowCollectorHPA{ + Status: flowslatest.HPAStatusEnabled, + MinReplicas: ptr.To(int32(1)), + MaxReplicas: 1, + Metrics: []ascv2.MetricSpec{{ + Type: ascv2.ResourceMetricSourceType, + Resource: &ascv2.ResourceMetricSource{ + Name: v1.ResourceCPU, + Target: ascv2.MetricTarget{ + Type: ascv2.UtilizationMetricType, + AverageUtilization: ptr.To(int32(90)), + }, + }, + }}, + } + }) + }) + + It("Should have HPA installed", func() { + By("Expecting HPA to be created") + Eventually(func() interface{} { + return k8sClient.Get(ctx, flpKeyKafkaTransformer, &hpa) + }, timeout, interval).Should(Succeed()) + Expect(*hpa.Spec.MinReplicas).To(Equal(int32(1))) + Expect(hpa.Spec.MaxReplicas).To(Equal(int32(1))) + Expect(*hpa.Spec.Metrics[0].Resource.Target.AverageUtilization).To(Equal(int32(90))) + }) + + It("Should autoscale when the HPA options change", func() { + updateCR(crKey, func(fc *flowslatest.FlowCollector) { + fc.Spec.Processor.KafkaConsumerAutoscaler.MinReplicas = ptr.To(int32(2)) + fc.Spec.Processor.KafkaConsumerAutoscaler.MaxReplicas = 2 + }) + + By("Changing the Horizontal Pod Autoscaler instance") + Eventually(func() error { + if err := k8sClient.Get(ctx, flpKeyKafkaTransformer, &hpa); err != nil { + return err + } + if *hpa.Spec.MinReplicas != int32(2) || hpa.Spec.MaxReplicas != int32(2) || + *hpa.Spec.Metrics[0].Resource.Target.AverageUtilization != int32(90) { + return fmt.Errorf("expected {2, 2, 90}: Got %v, %v, %v", + *hpa.Spec.MinReplicas, hpa.Spec.MaxReplicas, + *hpa.Spec.Metrics[0].Resource.Target.AverageUtilization) + } + return nil + }, timeout, interval).Should(Succeed()) + }) + }) + + Context("Back without Kafka", func() { + It("Should remove kafka config successfully", func() { + updateCR(crKey, func(fc *flowslatest.FlowCollector) { + fc.Spec.DeploymentModel = flowslatest.DeploymentModelDirect + }) + }) + + It("Should deploy single flp again", func() { + By("Expecting daemonset to be created") + Eventually(func() interface{} { + return k8sClient.Get(ctx, flpKey1, &appsv1.DaemonSet{}) + }, timeout, interval).Should(Succeed()) + }) + + It("Should delete kafka ingester and transformer", func() { + By("Expecting ingester daemonset to be deleted") + Eventually(func() interface{} { + return k8sClient.Get(ctx, flpKeyKafkaIngester, &appsv1.DaemonSet{}) + }, timeout, interval).Should(MatchError(`daemonsets.apps "flowlogs-pipeline-ingester" not found`)) + + By("Expecting transformer deployment to be deleted") + Eventually(func() interface{} { + return k8sClient.Get(ctx, flpKeyKafkaTransformer, &appsv1.Deployment{}) + }, timeout, interval).Should(MatchError(`deployments.apps "flowlogs-pipeline-transformer" not found`)) + }) + }) + + Context("Checking monitoring resources", func() { + It("Should create desired objects when they're not found (e.g. case of an operator upgrade)", func() { + psvc := v1.Service{} + sm := monitoringv1.ServiceMonitor{} + pr := monitoringv1.PrometheusRule{} + By("Expecting prometheus service to exist") + Eventually(func() interface{} { + return k8sClient.Get(ctx, types.NamespacedName{ + Name: "flowlogs-pipeline-prom", + Namespace: operatorNamespace, + }, &psvc) + }, timeout, interval).Should(Succeed()) + + By("Expecting ServiceMonitor to exist") + Eventually(func() interface{} { + return k8sClient.Get(ctx, types.NamespacedName{ + Name: "flowlogs-pipeline-monitor", + Namespace: operatorNamespace, + }, &sm) + }, timeout, interval).Should(Succeed()) + + By("Expecting PrometheusRule to exist and be updated") + Eventually(func() interface{} { + return k8sClient.Get(ctx, types.NamespacedName{ + Name: "flowlogs-pipeline-alert", + Namespace: operatorNamespace, + }, &pr) + }, timeout, interval).Should(Succeed()) + Expect(pr.Spec.Groups).Should(HaveLen(1)) + Expect(pr.Spec.Groups[0].Rules).Should(HaveLen(1)) + + // Manually delete ServiceMonitor + By("Deleting ServiceMonitor") + Eventually(func() error { + return k8sClient.Delete(ctx, &sm) + }, timeout, interval).Should(Succeed()) + + // Do a dummy change that will trigger reconcile, and make sure SM is created again + updateCR(crKey, func(fc *flowslatest.FlowCollector) { + fc.Spec.Processor.LogLevel = "trace" + }) + By("Expecting ServiceMonitor to exist") + Eventually(func() interface{} { + return k8sClient.Get(ctx, types.NamespacedName{ + Name: "flowlogs-pipeline-monitor", + Namespace: operatorNamespace, + }, &sm) + }, timeout, interval).Should(Succeed()) + + // Manually delete Rule + By("Deleting prom rule") + Eventually(func() error { + return k8sClient.Delete(ctx, &pr) + }, timeout, interval).Should(Succeed()) + + // Do a dummy change that will trigger reconcile, and make sure Rule is created again + updateCR(crKey, func(fc *flowslatest.FlowCollector) { + fc.Spec.Processor.LogLevel = "debug" + }) + By("Expecting PrometheusRule to exist") + Eventually(func() interface{} { + return k8sClient.Get(ctx, types.NamespacedName{ + Name: "flowlogs-pipeline-alert", + Namespace: operatorNamespace, + }, &pr) + }, timeout, interval).Should(Succeed()) + }) + }) + + Context("Using certificates with loki manual mode", func() { + flpDS := appsv1.DaemonSet{} + It("Should update Loki to use TLS", func() { + // Create CM certificate + cm := &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "loki-ca", + Namespace: operatorNamespace, + }, + Data: map[string]string{"ca.crt": "certificate data"}, + } + cleanupList = append(cleanupList, cm) + Expect(k8sClient.Create(ctx, cm)).Should(Succeed()) + updateCR(crKey, func(fc *flowslatest.FlowCollector) { + fc.Spec.Loki.Mode = flowslatest.LokiModeManual + fc.Spec.Loki.Manual.TLS = flowslatest.ClientTLS{ + Enable: true, + CACert: flowslatest.CertificateReference{ + Type: flowslatest.RefTypeConfigMap, + Name: "loki-ca", + CertFile: "ca.crt", + }, + } + }) + }) + + It("Should have certificate mounted", func() { + By("Expecting certificate mounted") + Eventually(func() interface{} { + if err := k8sClient.Get(ctx, flpKey1, &flpDS); err != nil { + return err + } + return flpDS.Spec.Template.Spec.Volumes + }, timeout, interval).Should(HaveLen(2)) + Expect(flpDS.Spec.Template.Spec.Volumes[0].Name).To(Equal("config-volume")) + Expect(flpDS.Spec.Template.Spec.Volumes[1].Name).To(Equal("loki-certs-ca")) + }) + + It("Should restore no TLS config", func() { + updateCR(crKey, func(fc *flowslatest.FlowCollector) { + fc.Spec.Loki.Manual.TLS = flowslatest.ClientTLS{ + Enable: false, + } + }) + Eventually(func() interface{} { + if err := k8sClient.Get(ctx, flpKey1, &flpDS); err != nil { + return err + } + return flpDS.Spec.Template.Spec.Volumes + }, timeout, interval).Should(HaveLen(1)) + Expect(flpDS.Spec.Template.Spec.Volumes[0].Name).To(Equal("config-volume")) + }) + }) + + Context("Using certificates with loki distributed mode", func() { + flpDS := appsv1.DaemonSet{} + It("Should update Loki to use TLS", func() { + // Create CM certificate + cm := &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "loki-distri-ca", + Namespace: operatorNamespace, + }, + Data: map[string]string{"ca.crt": "certificate data"}, + } + cleanupList = append(cleanupList, cm) + Expect(k8sClient.Create(ctx, cm)).Should(Succeed()) + updateCR(crKey, func(fc *flowslatest.FlowCollector) { + fc.Spec.Loki.Mode = flowslatest.LokiModeMicroservices + fc.Spec.Loki.Microservices = flowslatest.LokiMicroservicesParams{ + IngesterURL: "http://loki-ingested:3100/", + QuerierURL: "http://loki-queries:3100/", + TLS: flowslatest.ClientTLS{ + Enable: true, + CACert: flowslatest.CertificateReference{ + Type: flowslatest.RefTypeConfigMap, + Name: "loki-distri-ca", + CertFile: "ca.crt", + }, + }, + } + }) + }) + + It("Should have certificate mounted", func() { + By("Expecting certificate mounted") + Eventually(func() interface{} { + if err := k8sClient.Get(ctx, flpKey1, &flpDS); err != nil { + return err + } + return flpDS.Spec.Template.Spec.Volumes + }, timeout, interval).Should(HaveLen(2)) + Expect(flpDS.Spec.Template.Spec.Volumes[0].Name).To(Equal("config-volume")) + Expect(flpDS.Spec.Template.Spec.Volumes[1].Name).To(Equal("loki-certs-ca")) + }) + + It("Should restore no TLS config", func() { + updateCR(crKey, func(fc *flowslatest.FlowCollector) { + fc.Spec.Loki.Microservices.TLS = flowslatest.ClientTLS{ + Enable: false, + } + }) + Eventually(func() interface{} { + if err := k8sClient.Get(ctx, flpKey1, &flpDS); err != nil { + return err + } + return flpDS.Spec.Template.Spec.Volumes + }, timeout, interval).Should(HaveLen(1)) + Expect(flpDS.Spec.Template.Spec.Volumes[0].Name).To(Equal("config-volume")) + }) + }) + + Context("Using certificates with loki monolithic mode", func() { + flpDS := appsv1.DaemonSet{} + It("Should update Loki to use TLS", func() { + // Create CM certificate + cm := &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "loki-mono-ca", + Namespace: operatorNamespace, + }, + Data: map[string]string{"ca.crt": "certificate data"}, + } + cleanupList = append(cleanupList, cm) + Expect(k8sClient.Create(ctx, cm)).Should(Succeed()) + updateCR(crKey, func(fc *flowslatest.FlowCollector) { + fc.Spec.Loki.Mode = flowslatest.LokiModeMonolithic + fc.Spec.Loki.Monolithic = flowslatest.LokiMonolithParams{ + URL: "http://loki-mono:3100/", + TLS: flowslatest.ClientTLS{ + Enable: true, + CACert: flowslatest.CertificateReference{ + Type: flowslatest.RefTypeConfigMap, + Name: "loki-mono-ca", + CertFile: "ca.crt", + }, + }, + } + }) + }) + + It("Should have certificate mounted", func() { + By("Expecting certificate mounted") + Eventually(func() interface{} { + if err := k8sClient.Get(ctx, flpKey1, &flpDS); err != nil { + return err + } + return flpDS.Spec.Template.Spec.Volumes + }, timeout, interval).Should(HaveLen(2)) + Expect(flpDS.Spec.Template.Spec.Volumes[0].Name).To(Equal("config-volume")) + Expect(flpDS.Spec.Template.Spec.Volumes[1].Name).To(Equal("loki-certs-ca")) + }) + + It("Should restore no TLS config", func() { + updateCR(crKey, func(fc *flowslatest.FlowCollector) { + fc.Spec.Loki.Monolithic.TLS = flowslatest.ClientTLS{ + Enable: false, + } + }) + Eventually(func() interface{} { + if err := k8sClient.Get(ctx, flpKey1, &flpDS); err != nil { + return err + } + return flpDS.Spec.Template.Spec.Volumes + }, timeout, interval).Should(HaveLen(1)) + Expect(flpDS.Spec.Template.Spec.Volumes[0].Name).To(Equal("config-volume")) + }) + }) + + Context("Using Certificates With Loki in LokiStack Mode", func() { + flpDS := appsv1.DaemonSet{} + It("Should update Loki config successfully", func() { + // Create CM certificate + cm := &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "lokistack-gateway-ca-bundle", + Namespace: operatorNamespace, + }, + Data: map[string]string{"service-ca.crt": "certificate data"}, + } + cleanupList = append(cleanupList, cm) + Expect(k8sClient.Create(ctx, cm)).Should(Succeed()) + updateCR(crKey, func(fc *flowslatest.FlowCollector) { + fc.Spec.Loki.Mode = flowslatest.LokiModeLokiStack + fc.Spec.Loki.LokiStack = flowslatest.LokiStackRef{ + Name: "lokistack", + Namespace: operatorNamespace, + } + }) + }) + + It("Should have certificate mounted", func() { + By("Expecting certificate mounted") + Eventually(func() interface{} { + if err := k8sClient.Get(ctx, flpKey1, &flpDS); err != nil { + return err + } + return flpDS.Spec.Template.Spec.Volumes + }, timeout, interval).Should(HaveLen(3)) + Expect(flpDS.Spec.Template.Spec.Volumes[0].Name).To(Equal("config-volume")) + Expect(flpDS.Spec.Template.Spec.Volumes[1].Name).To(Equal("flowlogs-pipeline")) + Expect(flpDS.Spec.Template.Spec.Volumes[2].Name).To(Equal("loki-certs-ca")) + }) + + It("Should deploy Loki roles", func() { + By("Expecting Writer ClusterRole") + Eventually(func() interface{} { + var cr rbacv1.ClusterRole + return k8sClient.Get(ctx, types.NamespacedName{Name: constants.LokiCRWriter}, &cr) + }, timeout, interval).Should(Succeed()) + By("Expecting Reader ClusterRole") + Eventually(func() interface{} { + var cr rbacv1.ClusterRole + return k8sClient.Get(ctx, types.NamespacedName{Name: constants.LokiCRReader}, &cr) + }, timeout, interval).Should(Succeed()) + By("Expecting FLP Writer ClusterRoleBinding") + Eventually(func() interface{} { + var crb rbacv1.ClusterRoleBinding + return k8sClient.Get(ctx, types.NamespacedName{Name: constants.LokiCRBWriter}, &crb) + }, timeout, interval).Should(Succeed()) + }) + + It("Should restore no TLS config in manual mode", func() { + updateCR(crKey, func(fc *flowslatest.FlowCollector) { + fc.Spec.Loki.Mode = flowslatest.LokiModeManual + fc.Spec.Loki.Manual.TLS = flowslatest.ClientTLS{ + Enable: false, + } + }) + Eventually(func() interface{} { + if err := k8sClient.Get(ctx, flpKey1, &flpDS); err != nil { + return err + } + return flpDS.Spec.Template.Spec.Volumes + }, timeout, interval).Should(HaveLen(1)) + Expect(flpDS.Spec.Template.Spec.Volumes[0].Name).To(Equal("config-volume")) + }) + }) + + Context("Changing namespace", func() { + It("Should update namespace successfully", func() { + updateCR(crKey, func(fc *flowslatest.FlowCollector) { + fc.Spec.Processor.Port = 9999 + fc.Spec.Namespace = otherNamespace + }) + }) + + It("Should redeploy FLP in new namespace", func() { + By("Expecting daemonset in previous namespace to be deleted") + Eventually(func() interface{} { + return k8sClient.Get(ctx, flpKey1, &appsv1.DaemonSet{}) + }, timeout, interval).Should(MatchError(`daemonsets.apps "flowlogs-pipeline" not found`)) + + By("Expecting deployment in previous namespace to be deleted") + Eventually(func() interface{} { + return k8sClient.Get(ctx, flpKey1, &appsv1.Deployment{}) + }, timeout, interval).Should(MatchError(`deployments.apps "flowlogs-pipeline" not found`)) + + By("Expecting service account in previous namespace to be deleted") + Eventually(func() interface{} { + return k8sClient.Get(ctx, flpKey1, &v1.ServiceAccount{}) + }, timeout, interval).Should(MatchError(`serviceaccounts "flowlogs-pipeline" not found`)) + + By("Expecting daemonset to be created in new namespace") + Eventually(func() interface{} { + return k8sClient.Get(ctx, flpKey2, &appsv1.DaemonSet{}) + }, timeout, interval).Should(Succeed()) + + By("Expecting service account to be created in new namespace") + Eventually(func() interface{} { + return k8sClient.Get(ctx, flpKey2, &v1.ServiceAccount{}) + }, timeout, interval).Should(Succeed()) + }) + }) + + Context("Cleanup", func() { + // Retrieve CR to get its UID + flowCR := flowslatest.FlowCollector{} + It("Should get CR", func() { + Eventually(func() error { + return k8sClient.Get(ctx, crKey, &flowCR) + }, timeout, interval).Should(Succeed()) + }) + + It("Should delete CR", func() { + Eventually(func() error { + return k8sClient.Delete(ctx, &flowCR) + }, timeout, interval).Should(Succeed()) + }) + + It("Should be garbage collected", func() { + By("Expecting flowlogs-pipeline daemonset to be garbage collected") + Eventually(func() interface{} { + d := appsv1.DaemonSet{} + _ = k8sClient.Get(ctx, flpKey2, &d) + return &d + }, timeout, interval).Should(BeGarbageCollectedBy(&flowCR)) + + By("Expecting flowlogs-pipeline service account to be garbage collected") + Eventually(func() interface{} { + svcAcc := v1.ServiceAccount{} + _ = k8sClient.Get(ctx, flpKey2, &svcAcc) + return &svcAcc + }, timeout, interval).Should(BeGarbageCollectedBy(&flowCR)) + + By("Expecting flowlogs-pipeline configmap to be garbage collected") + Eventually(func() interface{} { + cm := v1.ConfigMap{} + _ = k8sClient.Get(ctx, types.NamespacedName{ + Name: "flowlogs-pipeline-config", + Namespace: otherNamespace, + }, &cm) + return &cm + }, timeout, interval).Should(BeGarbageCollectedBy(&flowCR)) + }) + + It("Should not get CR", func() { + Eventually(func() bool { + err := k8sClient.Get(ctx, crKey, &flowCR) + return kerr.IsNotFound(err) + }, timeout, interval).Should(BeTrue()) + }) + + It("Should cleanup other data", func() { + for _, obj := range cleanupList { + Eventually(func() error { + return k8sClient.Delete(ctx, obj) + }, timeout, interval).Should(Succeed()) + } + }) + }) +} + +func checkDigestUpdate(oldDigest *string, annots map[string]string) error { + newDigest := annots[constants.PodConfigurationDigest] + if newDigest == "" { + return fmt.Errorf("%q annotation can't be empty", constants.PodConfigurationDigest) + } else if newDigest == *oldDigest { + return fmt.Errorf("expect digest to change, but is still %s", *oldDigest) + } + *oldDigest = newDigest + return nil +} diff --git a/controllers/flp/flp_test.go b/controllers/flp/flp_test.go index 47a8eb0e5..769e9966d 100644 --- a/controllers/flp/flp_test.go +++ b/controllers/flp/flp_test.go @@ -52,7 +52,6 @@ var pullPolicy = corev1.PullIfNotPresent var minReplicas = int32(1) var maxReplicas = int32(5) var targetCPU = int32(75) -var outputRecordTypes = flowslatest.LogTypeAll const testNamespace = "flp" diff --git a/controllers/flp/suite_test.go b/controllers/flp/suite_test.go new file mode 100644 index 000000000..9d3ba2f9f --- /dev/null +++ b/controllers/flp/suite_test.go @@ -0,0 +1,45 @@ +package flp + +import ( + "context" + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/envtest" + + "github.com/netobserv/network-observability-operator/pkg/manager" + "github.com/netobserv/network-observability-operator/pkg/test" +) + +// These tests use Ginkgo (BDD-style Go testing framework). Refer to +// http://onsi.github.io/ginkgo/ to learn more about Ginkgo. + +var namespacesToPrepare = []string{"main-namespace", "other-namespace"} + +var ( + ctx context.Context + k8sClient client.Client + testEnv *envtest.Environment + cancel context.CancelFunc +) + +func TestAPIs(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "FLP Controller Suite") +} + +// go test ./... runs always Ginkgo test suites in parallel and they would interfere +// this way we make sure that both test sub-suites are executed serially +var _ = Describe("FlowCollector Controller", Ordered, Serial, func() { + ControllerSpecs() +}) + +var _ = BeforeSuite(func() { + ctx, k8sClient, testEnv, cancel = test.PrepareEnvTest([]manager.Registerer{Start}, namespacesToPrepare, "..") +}) + +var _ = AfterSuite(func() { + test.TeardownEnvTest(testEnv, cancel) +}) diff --git a/controllers/monitoring/monitoring_controller_test.go b/controllers/monitoring/monitoring_controller_test.go new file mode 100644 index 000000000..fb26d2d88 --- /dev/null +++ b/controllers/monitoring/monitoring_controller_test.go @@ -0,0 +1,169 @@ +package monitoring + +import ( + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + v1 "k8s.io/api/core/v1" + kerr "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + + flowslatest "github.com/netobserv/network-observability-operator/api/v1beta2" + . "github.com/netobserv/network-observability-operator/controllers/controllerstest" + "github.com/netobserv/network-observability-operator/pkg/test" +) + +const ( + timeout = test.Timeout + interval = test.Interval + conntrackEndTimeout = 10 * time.Second + conntrackTerminatingTimeout = 5 * time.Second + conntrackHeartbeatInterval = 30 * time.Second +) + +var updateCR = func(key types.NamespacedName, updater func(*flowslatest.FlowCollector)) { + test.UpdateCR(ctx, k8sClient, key, updater) +} + +// nolint:cyclop +func ControllerSpecs() { + + const operatorNamespace = "main-namespace" + crKey := types.NamespacedName{ + Name: "cluster", + } + + BeforeEach(func() { + // Add any setup steps that needs to be executed before each test + }) + + AfterEach(func() { + // Add any teardown steps that needs to be executed after each test + }) + + Context("Installing CR", func() { + It("Should create successfully", func() { + created := &flowslatest.FlowCollector{ + ObjectMeta: metav1.ObjectMeta{ + Name: crKey.Name, + }, + Spec: flowslatest.FlowCollectorSpec{ + Namespace: operatorNamespace, + DeploymentModel: flowslatest.DeploymentModelDirect, + }, + } + + // Create + Expect(k8sClient.Create(ctx, created)).Should(Succeed()) + + By("Expecting the monitoring dashboards configmap to be created") + Eventually(func() interface{} { + cm := v1.ConfigMap{} + return k8sClient.Get(ctx, types.NamespacedName{ + Name: "grafana-dashboard-netobserv-flow-metrics", + Namespace: "openshift-config-managed", + }, &cm) + }, timeout, interval).Should(Succeed()) + + By("Expecting the infra health dashboards configmap to be created") + Eventually(func() interface{} { + cm := v1.ConfigMap{} + if err := k8sClient.Get(ctx, types.NamespacedName{ + Name: "grafana-dashboard-netobserv-health", + Namespace: "openshift-config-managed", + }, &cm); err != nil { + return err + } + d, err := test.DashboardFromBytes([]byte(cm.Data["netobserv-health-metrics.json"])) + if err != nil { + return err + } + return d.Titles() + }, timeout, interval).Should(Equal([]string{ + "Flows", + "Flows Overhead", + "Top flow rates per source and destination namespaces", + "Agents", + "Processor", + "Operator", + })) + }) + + It("Should update successfully", func() { + updateCR(crKey, func(fc *flowslatest.FlowCollector) { + fc.Spec.Processor = flowslatest.FlowCollectorFLP{ + Metrics: flowslatest.FLPMetrics{ + IncludeList: &[]flowslatest.FLPMetric{}, + DisableAlerts: []flowslatest.FLPAlert{flowslatest.AlertLokiError}, + }, + } + }) + + By("Expecting the flow dashboards configmap to be deleted") + Eventually(func() interface{} { + return k8sClient.Get(ctx, types.NamespacedName{ + Name: "grafana-dashboard-netobserv-flow-metrics", + Namespace: "openshift-config-managed", + }, &v1.ConfigMap{}) + }, timeout, interval).Should(MatchError(`configmaps "grafana-dashboard-netobserv-flow-metrics" not found`)) + + By("Expecting the health dashboards rows to be filtered") + Eventually(func() interface{} { + cm := v1.ConfigMap{} + if err := k8sClient.Get(ctx, types.NamespacedName{ + Name: "grafana-dashboard-netobserv-health", + Namespace: "openshift-config-managed", + }, &cm); err != nil { + return err + } + d, err := test.DashboardFromBytes([]byte(cm.Data["netobserv-health-metrics.json"])) + if err != nil { + return err + } + return d.Titles() + }, timeout, interval).Should(Equal([]string{ + "Flows", + "Agents", + "Processor", + "Operator", + })) + }) + }) + + Context("Cleanup", func() { + // Retrieve CR to get its UID + flowCR := flowslatest.FlowCollector{} + It("Should get CR", func() { + Eventually(func() error { + return k8sClient.Get(ctx, crKey, &flowCR) + }, timeout, interval).Should(Succeed()) + }) + + It("Should delete CR", func() { + Eventually(func() error { + return k8sClient.Delete(ctx, &flowCR) + }, timeout, interval).Should(Succeed()) + }) + + It("Should be garbage collected", func() { + By("Expecting the health dashboards configmap to be garbage collected") + Eventually(func() interface{} { + cm := v1.ConfigMap{} + _ = k8sClient.Get(ctx, types.NamespacedName{ + Name: "grafana-dashboard-netobserv-health", + Namespace: "openshift-config-managed", + }, &cm) + return &cm + }, timeout, interval).Should(BeGarbageCollectedBy(&flowCR)) + }) + + It("Should not get CR", func() { + Eventually(func() bool { + err := k8sClient.Get(ctx, crKey, &flowCR) + return kerr.IsNotFound(err) + }, timeout, interval).Should(BeTrue()) + }) + }) +} diff --git a/controllers/monitoring/suite_test.go b/controllers/monitoring/suite_test.go new file mode 100644 index 000000000..71dc263dd --- /dev/null +++ b/controllers/monitoring/suite_test.go @@ -0,0 +1,45 @@ +package monitoring + +import ( + "context" + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/envtest" + + "github.com/netobserv/network-observability-operator/pkg/manager" + "github.com/netobserv/network-observability-operator/pkg/test" +) + +// These tests use Ginkgo (BDD-style Go testing framework). Refer to +// http://onsi.github.io/ginkgo/ to learn more about Ginkgo. + +var namespacesToPrepare = []string{"openshift-config-managed", "main-namespace"} + +var ( + ctx context.Context + k8sClient client.Client + testEnv *envtest.Environment + cancel context.CancelFunc +) + +func TestAPIs(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Monitoring Controller Suite") +} + +// go test ./... runs always Ginkgo test suites in parallel and they would interfere +// this way we make sure that both test sub-suites are executed serially +var _ = Describe("FlowCollector Controller", Ordered, Serial, func() { + ControllerSpecs() +}) + +var _ = BeforeSuite(func() { + ctx, k8sClient, testEnv, cancel = test.PrepareEnvTest([]manager.Registerer{Start}, namespacesToPrepare, "..") +}) + +var _ = AfterSuite(func() { + test.TeardownEnvTest(testEnv, cancel) +}) diff --git a/controllers/suite_test.go b/controllers/suite_test.go index 5a758817d..b0032c26c 100644 --- a/controllers/suite_test.go +++ b/controllers/suite_test.go @@ -18,38 +18,20 @@ package controllers import ( "context" - "path/filepath" "testing" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" - configv1 "github.com/openshift/api/config/v1" - osv1alpha1 "github.com/openshift/api/console/v1alpha1" - operatorsv1 "github.com/openshift/api/operator/v1" - monitoringv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1" - ascv2 "k8s.io/api/autoscaling/v2" - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/kubernetes/scheme" - apiregv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" - ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/envtest" - logf "sigs.k8s.io/controller-runtime/pkg/log" - "sigs.k8s.io/controller-runtime/pkg/log/zap" - flowsv1beta1 "github.com/netobserv/network-observability-operator/api/v1beta1" - flowsv1beta2 "github.com/netobserv/network-observability-operator/api/v1beta2" - "github.com/netobserv/network-observability-operator/pkg/manager" - //+kubebuilder:scaffold:imports + "github.com/netobserv/network-observability-operator/pkg/test" ) // These tests use Ginkgo (BDD-style Go testing framework). Refer to // http://onsi.github.io/ginkgo/ to learn more about Ginkgo. -const testCnoNamespace = "openshift-network-operator" - -var namespacesToPrepare = []string{testCnoNamespace, "openshift-config-managed", "loki-namespace", "kafka-exporter-namespace", "main-namespace", "main-namespace-privileged"} +var namespacesToPrepare = []string{"openshift-network-operator", "openshift-config-managed", "loki-namespace", "kafka-exporter-namespace", "main-namespace", "main-namespace-privileged"} var ( ctx context.Context @@ -76,114 +58,9 @@ var _ = Describe("FlowCollector Controller", Ordered, Serial, func() { }) var _ = BeforeSuite(func() { - logf.SetLogger(zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true))) - ctx, cancel = context.WithCancel(context.TODO()) - - By("bootstrapping test environment") - testEnv = &envtest.Environment{ - Scheme: scheme.Scheme, - CRDInstallOptions: envtest.CRDInstallOptions{ - Paths: []string{ - // FIXME: till v1beta2 becomes the new storage version we will point to hack folder - // where v1beta2 is marked as the storage version - // filepath.Join("..", "config", "crd", "bases"), - filepath.Join("..", "hack"), - // We need to install the ConsolePlugin CRD to test setup of our Network Console Plugin - filepath.Join("..", "vendor", "github.com", "openshift", "api", "console", "v1alpha1"), - filepath.Join("..", "vendor", "github.com", "openshift", "api", "config", "v1"), - filepath.Join("..", "vendor", "github.com", "openshift", "api", "operator", "v1"), - filepath.Join("..", "test-assets"), - }, - CleanUpAfterUse: true, - WebhookOptions: envtest.WebhookInstallOptions{ - Paths: []string{ - filepath.Join("..", "config", "webhook"), - }, - }, - }, - ErrorIfCRDPathMissing: true, - } - - cfg, err := testEnv.Start() - Expect(err).NotTo(HaveOccurred()) - Expect(cfg).NotTo(BeNil()) - - err = flowsv1beta1.AddToScheme(scheme.Scheme) - Expect(err).NotTo(HaveOccurred()) - - err = flowsv1beta2.AddToScheme(scheme.Scheme) - Expect(err).NotTo(HaveOccurred()) - - err = corev1.AddToScheme(scheme.Scheme) - Expect(err).NotTo(HaveOccurred()) - - err = osv1alpha1.Install(scheme.Scheme) - Expect(err).NotTo(HaveOccurred()) - - err = configv1.Install(scheme.Scheme) - Expect(err).NotTo(HaveOccurred()) - - err = apiregv1.AddToScheme(scheme.Scheme) - Expect(err).NotTo(HaveOccurred()) - - err = ascv2.AddToScheme(scheme.Scheme) - Expect(err).NotTo(HaveOccurred()) - - err = operatorsv1.AddToScheme(scheme.Scheme) - Expect(err).NotTo(HaveOccurred()) - - err = monitoringv1.AddToScheme(scheme.Scheme) - Expect(err).NotTo(HaveOccurred()) - - //+kubebuilder:scaffold:scheme - - k8sClient, err = client.New(cfg, client.Options{Scheme: scheme.Scheme}) - Expect(err).NotTo(HaveOccurred()) - Expect(k8sClient).NotTo(BeNil()) - - Expect(prepareNamespaces()).NotTo(HaveOccurred()) - - k8sManager, err := manager.NewManager( - context.Background(), - cfg, - &manager.Config{ - EBPFAgentImage: "registry-proxy.engineering.redhat.com/rh-osbs/network-observability-ebpf-agent@sha256:6481481ba23375107233f8d0a4f839436e34e50c2ec550ead0a16c361ae6654e", - FlowlogsPipelineImage: "registry-proxy.engineering.redhat.com/rh-osbs/network-observability-flowlogs-pipeline@sha256:6481481ba23375107233f8d0a4f839436e34e50c2ec550ead0a16c361ae6654e", - ConsolePluginImage: "registry-proxy.engineering.redhat.com/rh-osbs/network-observability-console-plugin@sha256:6481481ba23375107233f8d0a4f839436e34e50c2ec550ead0a16c361ae6654e", - DownstreamDeployment: false, - }, - &ctrl.Options{ - Scheme: scheme.Scheme, - }, - Registerers, - ) - - Expect(err).ToNot(HaveOccurred()) - Expect(k8sManager).NotTo(BeNil()) - - go func() { - defer GinkgoRecover() - err = k8sManager.Start(ctx) - Expect(err).ToNot(HaveOccurred(), "failed to run manager") - }() - + ctx, k8sClient, testEnv, cancel = test.PrepareEnvTest(Registerers, namespacesToPrepare, ".") }) var _ = AfterSuite(func() { - cancel() - By("tearing down the test environment") - err := testEnv.Stop() - Expect(err).NotTo(HaveOccurred()) + test.TeardownEnvTest(testEnv, cancel) }) - -func prepareNamespaces() error { - for _, ns := range namespacesToPrepare { - if err := k8sClient.Create(ctx, &corev1.Namespace{ - TypeMeta: metav1.TypeMeta{Kind: "Namespace", APIVersion: "v1"}, - ObjectMeta: metav1.ObjectMeta{Name: ns}, - }); err != nil { - return err - } - } - return nil -} diff --git a/pkg/test/envtest.go b/pkg/test/envtest.go new file mode 100644 index 000000000..eb8570ffe --- /dev/null +++ b/pkg/test/envtest.go @@ -0,0 +1,161 @@ +package test + +import ( + "context" + "path/filepath" + "time" + + gv2 "github.com/onsi/ginkgo/v2" + //nolint:revive,stylecheck + . "github.com/onsi/gomega" + configv1 "github.com/openshift/api/config/v1" + osv1alpha1 "github.com/openshift/api/console/v1alpha1" + operatorsv1 "github.com/openshift/api/operator/v1" + monitoringv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1" + ascv2 "k8s.io/api/autoscaling/v2" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes/scheme" + apiregv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/envtest" + logf "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/log/zap" + "sigs.k8s.io/controller-runtime/pkg/metrics/server" + + flowsv1beta1 "github.com/netobserv/network-observability-operator/api/v1beta1" + flowsv1beta2 "github.com/netobserv/network-observability-operator/api/v1beta2" + "github.com/netobserv/network-observability-operator/pkg/manager" +) + +const ( + Timeout = time.Second * 10 + Interval = 1 * time.Second +) + +func PrepareEnvTest(controllers []manager.Registerer, namespaces []string, basePath string) (context.Context, client.Client, *envtest.Environment, context.CancelFunc) { + logf.SetLogger(zap.New(zap.WriteTo(gv2.GinkgoWriter), zap.UseDevMode(true))) + ctx, cancel := context.WithCancel(context.TODO()) + + gv2.By("bootstrapping test environment") + testEnv := &envtest.Environment{ + Scheme: scheme.Scheme, + CRDInstallOptions: envtest.CRDInstallOptions{ + Paths: []string{ + // FIXME: till v1beta2 becomes the new storage version we will point to hack folder + // where v1beta2 is marked as the storage version + // filepath.Join("..", "config", "crd", "bases"), + filepath.Join(basePath, "..", "hack"), + // We need to install the ConsolePlugin CRD to test setup of our Network Console Plugin + filepath.Join(basePath, "..", "vendor", "github.com", "openshift", "api", "console", "v1alpha1"), + filepath.Join(basePath, "..", "vendor", "github.com", "openshift", "api", "config", "v1"), + filepath.Join(basePath, "..", "vendor", "github.com", "openshift", "api", "operator", "v1"), + filepath.Join(basePath, "..", "test-assets"), + }, + CleanUpAfterUse: true, + WebhookOptions: envtest.WebhookInstallOptions{ + Paths: []string{ + filepath.Join(basePath, "..", "config", "webhook"), + }, + }, + }, + ErrorIfCRDPathMissing: true, + } + + cfg, err := testEnv.Start() + Expect(err).NotTo(HaveOccurred()) + Expect(cfg).NotTo(BeNil()) + + err = flowsv1beta1.AddToScheme(scheme.Scheme) + Expect(err).NotTo(HaveOccurred()) + + err = flowsv1beta2.AddToScheme(scheme.Scheme) + Expect(err).NotTo(HaveOccurred()) + + err = corev1.AddToScheme(scheme.Scheme) + Expect(err).NotTo(HaveOccurred()) + + err = osv1alpha1.Install(scheme.Scheme) + Expect(err).NotTo(HaveOccurred()) + + err = configv1.Install(scheme.Scheme) + Expect(err).NotTo(HaveOccurred()) + + err = apiregv1.AddToScheme(scheme.Scheme) + Expect(err).NotTo(HaveOccurred()) + + err = ascv2.AddToScheme(scheme.Scheme) + Expect(err).NotTo(HaveOccurred()) + + err = operatorsv1.AddToScheme(scheme.Scheme) + Expect(err).NotTo(HaveOccurred()) + + err = monitoringv1.AddToScheme(scheme.Scheme) + Expect(err).NotTo(HaveOccurred()) + + k8sClient, err := client.New(cfg, client.Options{Scheme: scheme.Scheme}) + Expect(err).NotTo(HaveOccurred()) + Expect(k8sClient).NotTo(BeNil()) + + for _, ns := range namespaces { + err := k8sClient.Create(ctx, &corev1.Namespace{ + TypeMeta: metav1.TypeMeta{Kind: "Namespace", APIVersion: "v1"}, + ObjectMeta: metav1.ObjectMeta{Name: ns}, + }) + Expect(err).NotTo(HaveOccurred()) + } + + k8sManager, err := manager.NewManager( + context.Background(), + cfg, + &manager.Config{ + EBPFAgentImage: "registry-proxy.engineering.redhat.com/rh-osbs/network-observability-ebpf-agent@sha256:6481481ba23375107233f8d0a4f839436e34e50c2ec550ead0a16c361ae6654e", + FlowlogsPipelineImage: "registry-proxy.engineering.redhat.com/rh-osbs/network-observability-flowlogs-pipeline@sha256:6481481ba23375107233f8d0a4f839436e34e50c2ec550ead0a16c361ae6654e", + ConsolePluginImage: "registry-proxy.engineering.redhat.com/rh-osbs/network-observability-console-plugin@sha256:6481481ba23375107233f8d0a4f839436e34e50c2ec550ead0a16c361ae6654e", + DownstreamDeployment: false, + }, + &ctrl.Options{ + Scheme: scheme.Scheme, + Metrics: server.Options{ + BindAddress: "0", // disable + }, + }, + controllers, + ) + + Expect(err).ToNot(HaveOccurred()) + Expect(k8sManager).NotTo(BeNil()) + + go func() { + defer gv2.GinkgoRecover() + err = k8sManager.Start(ctx) + Expect(err).ToNot(HaveOccurred(), "failed to run manager") + }() + + return ctx, k8sClient, testEnv, cancel +} + +func TeardownEnvTest(testEnv *envtest.Environment, cancel context.CancelFunc) { + cancel() + gv2.By("tearing down the test environment") + err := testEnv.Stop() + Expect(err).NotTo(HaveOccurred()) +} + +func GetCR(ctx context.Context, k8sClient client.Client, key types.NamespacedName) *flowsv1beta2.FlowCollector { + cr := flowsv1beta2.FlowCollector{} + Eventually(func() error { + return k8sClient.Get(ctx, key, &cr) + }).Should(Succeed()) + return &cr +} + +func UpdateCR(ctx context.Context, k8sClient client.Client, key types.NamespacedName, updater func(*flowsv1beta2.FlowCollector)) { + Eventually(func() error { + cr := GetCR(ctx, k8sClient, key) + updater(cr) + return k8sClient.Update(ctx, cr) + }, Timeout, Interval).Should(Succeed()) +} From 21d647d106092f153280cbe357f14fe87ecd7091 Mon Sep 17 00:00:00 2001 From: Joel Takvorian Date: Fri, 8 Dec 2023 10:16:24 +0100 Subject: [PATCH 2/2] try fixing flaky tests --- ...wcollector_controller_certificates_test.go | 160 ++++++++++-------- pkg/test/envtest.go | 16 ++ 2 files changed, 110 insertions(+), 66 deletions(-) diff --git a/controllers/flowcollector_controller_certificates_test.go b/controllers/flowcollector_controller_certificates_test.go index ebe56c9af..25f1bd1ee 100644 --- a/controllers/flowcollector_controller_certificates_test.go +++ b/controllers/flowcollector_controller_certificates_test.go @@ -1,6 +1,8 @@ package controllers import ( + "time" + . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" appsv1 "k8s.io/api/apps/v1" @@ -14,11 +16,16 @@ import ( "github.com/netobserv/network-observability-operator/controllers/constants" . "github.com/netobserv/network-observability-operator/controllers/controllerstest" "github.com/netobserv/network-observability-operator/controllers/flp" + "github.com/netobserv/network-observability-operator/pkg/test" "github.com/netobserv/network-observability-operator/pkg/watchers" ) -var cmw watchers.ConfigWatchable -var sw watchers.SecretWatchable +var ( + cmw watchers.ConfigWatchable + sw watchers.SecretWatchable + consistentlyTimeout = 2 * time.Second + consistentlyInterval = 500 * time.Millisecond +) // nolint:cyclop func flowCollectorCertificatesSpecs() { @@ -48,7 +55,7 @@ func flowCollectorCertificatesSpecs() { "other": "any", }, } - expectedLokiHash, _ := cmw.GetDigest(&lokiCert, []string{"service-ca.crt"}) + expectedLokiHash, _ := cmw.GetDigest(&lokiCert, []string{"service-ca.crt"}) // C80Sbg== kafkaCert := v1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ Name: "kafka-ca", @@ -59,7 +66,7 @@ func flowCollectorCertificatesSpecs() { "other": "any", }, } - expectedKafkaHash, _ := cmw.GetDigest(&kafkaCert, []string{"cert.crt"}) + expectedKafkaHash, _ := cmw.GetDigest(&kafkaCert, []string{"cert.crt"}) // tDuVsw== kafkaUserCert := v1.Secret{ ObjectMeta: metav1.ObjectMeta{ Name: "kafka-user", @@ -71,7 +78,7 @@ func flowCollectorCertificatesSpecs() { "other": []byte("any"), }, } - expectedKafkaUserHash, _ := sw.GetDigest(&kafkaUserCert, []string{"user.crt", "user.key"}) + expectedKafkaUserHash, _ := sw.GetDigest(&kafkaUserCert, []string{"user.crt", "user.key"}) // QztU6w== kafka2Cert := v1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ Name: "kafka-exporter-ca", @@ -82,7 +89,7 @@ func flowCollectorCertificatesSpecs() { "other": "any", }, } - expectedKafka2Hash, _ := cmw.GetDigest(&kafka2Cert, []string{"cert.crt"}) + expectedKafka2Hash, _ := cmw.GetDigest(&kafka2Cert, []string{"cert.crt"}) // RO7D5Q== kafka2Sasl := v1.Secret{ ObjectMeta: metav1.ObjectMeta{ Name: "kafka-exporter-sasl", @@ -93,8 +100,8 @@ func flowCollectorCertificatesSpecs() { "password": []byte("azerty"), }, } - expectedKafkaSaslHash1, _ := sw.GetDigest(&kafka2Sasl, []string{"username"}) - expectedKafkaSaslHash2, _ := sw.GetDigest(&kafka2Sasl, []string{"password"}) + expectedKafkaSaslHash1, _ := sw.GetDigest(&kafka2Sasl, []string{"username"}) // hlEvyw== + expectedKafkaSaslHash2, _ := sw.GetDigest(&kafka2Sasl, []string{"password"}) // FOs6Rg== BeforeEach(func() { // Add any setup steps that needs to be executed before each test @@ -107,9 +114,6 @@ func flowCollectorCertificatesSpecs() { agent := appsv1.DaemonSet{} flp := appsv1.Deployment{} plugin := appsv1.Deployment{} - var lastAgentAnnots map[string]string - var lastFLPAnnots map[string]string - var lastPluginAnnots map[string]string Context("Verify expectations are sane", func() { It("Expected hashes should all be different", func() { @@ -252,63 +256,80 @@ func flowCollectorCertificatesSpecs() { if err := k8sClient.Get(ctx, agentKey, &agent); err != nil { return err } - return agent.Spec.Template.Spec.Volumes - }, timeout, interval).Should(HaveLen(2)) - Expect(agent.Spec.Template.Annotations).To(HaveLen(2)) - Expect(agent.Spec.Template.Annotations["flows.netobserv.io/watched-kafka-ca"]).To(Equal(expectedKafkaHash)) - Expect(agent.Spec.Template.Annotations["flows.netobserv.io/watched-kafka-user"]).To(Equal(expectedKafkaUserHash)) - Expect(agent.Spec.Template.Spec.Volumes[0].Name).To(Equal("kafka-certs-ca")) - Expect(agent.Spec.Template.Spec.Volumes[1].Name).To(Equal("kafka-certs-user")) - lastAgentAnnots = agent.Spec.Template.Annotations + return test.VolumeNames(agent.Spec.Template.Spec.Volumes) + }, timeout, interval).Should(ContainElements( + "kafka-certs-ca", + "kafka-certs-user", + )) + Eventually(func() interface{} { + if err := k8sClient.Get(ctx, agentKey, &agent); err != nil { + return err + } + return test.Annotations(agent.Spec.Template.Annotations) + }, timeout, interval).Should(ContainElements( + "flows.netobserv.io/watched-kafka-ca="+expectedKafkaHash, + "flows.netobserv.io/watched-kafka-user="+expectedKafkaUserHash, + )) By("Expecting Loki certificate for Plugin mounted") Eventually(func() interface{} { if err := k8sClient.Get(ctx, pluginKey, &plugin); err != nil { return err } - return plugin.Spec.Template.Spec.Volumes - }, timeout, interval).Should(HaveLen(5)) + return test.VolumeNames(plugin.Spec.Template.Spec.Volumes) + }, timeout, interval).Should(ContainElements( + "console-serving-cert", + "config-volume", + "loki-certs-ca", + "loki-status-certs-ca", + "loki-status-certs-user", + )) Expect(plugin.Spec.Template.Annotations).To(HaveLen(1)) - Expect(plugin.Spec.Template.Spec.Volumes[0].Name).To(Equal("console-serving-cert")) - Expect(plugin.Spec.Template.Spec.Volumes[1].Name).To(Equal("config-volume")) - Expect(plugin.Spec.Template.Spec.Volumes[2].Name).To(Equal("loki-certs-ca")) - Expect(plugin.Spec.Template.Spec.Volumes[3].Name).To(Equal("loki-status-certs-ca")) - Expect(plugin.Spec.Template.Spec.Volumes[4].Name).To(Equal("loki-status-certs-user")) - lastPluginAnnots = plugin.Spec.Template.Annotations By("Expecting Loki and Kafka certificates for FLP mounted") Eventually(func() interface{} { if err := k8sClient.Get(ctx, flpKey, &flp); err != nil { return err } - return flp.Spec.Template.Spec.Volumes - }, timeout, interval).Should(HaveLen(8)) - Expect(flp.Spec.Template.Annotations).To(HaveLen(8)) - Expect(flp.Spec.Template.Annotations["flows.netobserv.io/watched-kafka-ca"]).To(Equal(expectedKafkaHash)) - Expect(flp.Spec.Template.Annotations["flows.netobserv.io/watched-kafka-user"]).To(Equal(expectedKafkaUserHash)) - Expect(flp.Spec.Template.Annotations["flows.netobserv.io/watched-kafka-export-0-ca"]).To(Equal(expectedKafka2Hash)) - Expect(flp.Spec.Template.Annotations["flows.netobserv.io/watched-kafka-export-0-sd1"]).To(Equal(expectedKafkaSaslHash1)) - Expect(flp.Spec.Template.Annotations["flows.netobserv.io/watched-kafka-export-0-sd2"]).To(Equal(expectedKafkaSaslHash2)) - Expect(flp.Spec.Template.Spec.Volumes[0].Name).To(Equal("config-volume")) - Expect(flp.Spec.Template.Spec.Volumes[1].Name).To(Equal("kafka-cert-ca")) - Expect(flp.Spec.Template.Spec.Volumes[2].Name).To(Equal("kafka-cert-user")) - Expect(flp.Spec.Template.Spec.Volumes[3].Name).To(Equal("flowlogs-pipeline")) // token - Expect(flp.Spec.Template.Spec.Volumes[4].Name).To(Equal("loki-certs-ca")) - Expect(flp.Spec.Template.Spec.Volumes[5].Name).To(Equal("kafka-export-0-ca")) - Expect(flp.Spec.Template.Spec.Volumes[6].Name).To(Equal("kafka-export-0-sasl-id")) - Expect(flp.Spec.Template.Spec.Volumes[7].Name).To(Equal("kafka-export-0-sasl-secret")) - lastFLPAnnots = flp.Spec.Template.Annotations + return test.VolumeNames(flp.Spec.Template.Spec.Volumes) + }, timeout, interval).Should(ContainElements( + "config-volume", + "kafka-cert-ca", + "kafka-cert-user", + "flowlogs-pipeline", + "loki-certs-ca", + "kafka-export-0-ca", + "kafka-export-0-sasl-id", + "kafka-export-0-sasl-secret", + )) + Eventually(func() interface{} { + if err := k8sClient.Get(ctx, flpKey, &flp); err != nil { + return err + } + return test.Annotations(flp.Spec.Template.Annotations) + }, timeout, interval).Should(ContainElements( + "flows.netobserv.io/watched-kafka-ca="+expectedKafkaHash, + "flows.netobserv.io/watched-kafka-user="+expectedKafkaUserHash, + "flows.netobserv.io/watched-kafka-export-0-ca="+expectedKafka2Hash, + "flows.netobserv.io/watched-kafka-export-0-sd1="+expectedKafkaSaslHash1, + "flows.netobserv.io/watched-kafka-export-0-sd2="+expectedKafkaSaslHash2, + )) }) }) Context("Updating Kafka certificates", func() { + var modifiedKafkaHash, modifiedKafkaUserHash string It("Should update Kafka certificate", func() { By("Updating Kafka CA certificate") kafkaCert.Data["cert.crt"] = "--- KAFKA CA CERT MODIFIED ---" Eventually(func() interface{} { return k8sClient.Update(ctx, &kafkaCert) }, timeout, interval).Should(Succeed()) + modifiedKafkaHash, _ = cmw.GetDigest(&kafkaCert, []string{"cert.crt"}) + Expect(modifiedKafkaHash).ToNot(Equal(expectedKafkaHash)) By("Updating Kafka User certificate") kafkaUserCert.Data["user.crt"] = []byte("--- KAFKA USER CERT MODIFIED ---") Eventually(func() interface{} { return k8sClient.Update(ctx, &kafkaUserCert) }, timeout, interval).Should(Succeed()) + modifiedKafkaUserHash, _ = sw.GetDigest(&kafkaUserCert, []string{"user.crt", "user.key"}) + Expect(modifiedKafkaUserHash).ToNot(Equal(expectedKafkaUserHash)) }) It("Should copy certificates when necessary", func() { @@ -337,24 +358,31 @@ func flowCollectorCertificatesSpecs() { })) }) - It("Should redeploy eBPF Agent", func() { + It("Should change eBPF Agent annotations", func() { Eventually(func() interface{} { if err := k8sClient.Get(ctx, agentKey, &agent); err != nil { return err } - return agent.Spec.Template.Annotations - }, timeout, interval).Should(Not(Equal(lastAgentAnnots))) - lastAgentAnnots = agent.Spec.Template.Annotations + return test.Annotations(agent.Spec.Template.Annotations) + }, timeout, interval).Should(ContainElements( + "flows.netobserv.io/watched-kafka-ca="+modifiedKafkaHash, + "flows.netobserv.io/watched-kafka-user="+modifiedKafkaUserHash, + )) }) - It("Should redeploy FLP", func() { + It("Should change FLP annotations", func() { Eventually(func() interface{} { if err := k8sClient.Get(ctx, flpKey, &flp); err != nil { return err } - return flp.Spec.Template.Annotations - }, timeout, interval).Should(Not(Equal(lastFLPAnnots))) - lastFLPAnnots = flp.Spec.Template.Annotations + return test.Annotations(flp.Spec.Template.Annotations) + }, timeout, interval).Should(ContainElements( + "flows.netobserv.io/watched-kafka-ca="+modifiedKafkaHash, + "flows.netobserv.io/watched-kafka-user="+modifiedKafkaUserHash, + "flows.netobserv.io/watched-kafka-export-0-ca="+expectedKafka2Hash, + "flows.netobserv.io/watched-kafka-export-0-sd1="+expectedKafkaSaslHash1, + "flows.netobserv.io/watched-kafka-export-0-sd2="+expectedKafkaSaslHash2, + )) }) }) @@ -380,25 +408,25 @@ func flowCollectorCertificatesSpecs() { }) // Console plugin is not restarted, as Loki certificate is always read from file - It("Should not redeploy Console plugin", func() { - Eventually(func() interface{} { + It("Should not trigger Console plugin redeploy", func() { + lastPluginAnnots := plugin.Spec.Template.Annotations + Consistently(func() interface{} { if err := k8sClient.Get(ctx, pluginKey, &plugin); err != nil { return err } return plugin.Spec.Template.Annotations - }, timeout, interval).Should(Equal(lastPluginAnnots)) - lastPluginAnnots = plugin.Spec.Template.Annotations + }, consistentlyTimeout, consistentlyInterval).Should(Equal(lastPluginAnnots)) }) // FLP is not restarted, as Loki certificate is always read from file - It("Should not redeploy FLP", func() { - Eventually(func() interface{} { + It("Should not trigger FLP redeploy", func() { + lastFLPAnnots := flp.Spec.Template.Annotations + Consistently(func() interface{} { if err := k8sClient.Get(ctx, flpKey, &flp); err != nil { return err } return flp.Spec.Template.Annotations - }, timeout, interval).Should(Equal(lastFLPAnnots)) - lastFLPAnnots = flp.Spec.Template.Annotations + }, consistentlyTimeout, consistentlyInterval).Should(Equal(lastFLPAnnots)) }) }) @@ -415,23 +443,23 @@ func flowCollectorCertificatesSpecs() { }) It("Should not redeploy Agent", func() { - Eventually(func() interface{} { + lastAgentAnnots := agent.Spec.Template.Annotations + Consistently(func() interface{} { if err := k8sClient.Get(ctx, agentKey, &agent); err != nil { return err } return agent.Spec.Template.Annotations - }, timeout, interval).Should(Equal(lastAgentAnnots)) - lastAgentAnnots = agent.Spec.Template.Annotations + }, consistentlyTimeout, consistentlyInterval).Should(Equal(lastAgentAnnots)) }) It("Should not redeploy FLP", func() { - Eventually(func() interface{} { + lastFLPAnnots := flp.Spec.Template.Annotations + Consistently(func() interface{} { if err := k8sClient.Get(ctx, flpKey, &flp); err != nil { return err } return flp.Spec.Template.Annotations - }, timeout, interval).Should(Equal(lastFLPAnnots)) - lastFLPAnnots = flp.Spec.Template.Annotations + }, consistentlyTimeout, consistentlyInterval).Should(Equal(lastFLPAnnots)) }) }) diff --git a/pkg/test/envtest.go b/pkg/test/envtest.go index eb8570ffe..3db14db8b 100644 --- a/pkg/test/envtest.go +++ b/pkg/test/envtest.go @@ -159,3 +159,19 @@ func UpdateCR(ctx context.Context, k8sClient client.Client, key types.Namespaced return k8sClient.Update(ctx, cr) }, Timeout, Interval).Should(Succeed()) } + +func VolumeNames(vols []corev1.Volume) []string { + var volNames []string + for iv := range vols { + volNames = append(volNames, vols[iv].Name) + } + return volNames +} + +func Annotations(annots map[string]string) []string { + var kv []string + for k, v := range annots { + kv = append(kv, k+"="+v) + } + return kv +}