diff --git a/cmd/ansible-operator/main.go b/cmd/ansible-operator/main.go index 101d180974..9c0678ba7c 100644 --- a/cmd/ansible-operator/main.go +++ b/cmd/ansible-operator/main.go @@ -15,21 +15,284 @@ package main import ( - log "github.com/sirupsen/logrus" + "context" + "errors" + "fmt" + "os" + "runtime" + "strconv" + "strings" + "github.com/spf13/pflag" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/client-go/rest" + "sigs.k8s.io/controller-runtime/pkg/cache" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/config" + "sigs.k8s.io/controller-runtime/pkg/healthz" logf "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/manager/signals" - "github.com/operator-framework/operator-sdk/pkg/ansible" - aoflags "github.com/operator-framework/operator-sdk/pkg/ansible/flags" + "github.com/operator-framework/operator-sdk/pkg/ansible/controller" + "github.com/operator-framework/operator-sdk/pkg/ansible/flags" + "github.com/operator-framework/operator-sdk/pkg/ansible/proxy" + "github.com/operator-framework/operator-sdk/pkg/ansible/proxy/controllermap" + "github.com/operator-framework/operator-sdk/pkg/ansible/runner" + "github.com/operator-framework/operator-sdk/pkg/ansible/watches" + "github.com/operator-framework/operator-sdk/pkg/k8sutil" + kubemetrics "github.com/operator-framework/operator-sdk/pkg/kube-metrics" + "github.com/operator-framework/operator-sdk/pkg/leader" "github.com/operator-framework/operator-sdk/pkg/log/zap" + "github.com/operator-framework/operator-sdk/pkg/metrics" + sdkVersion "github.com/operator-framework/operator-sdk/version" +) + +var ( + metricsHost = "0.0.0.0" + log = logf.Log.WithName("cmd") + metricsPort int32 = 8383 + operatorMetricsPort int32 = 8686 + healthProbePort int32 = 6789 ) +func printVersion() { + log.Info(fmt.Sprintf("Go Version: %s", runtime.Version())) + log.Info(fmt.Sprintf("Go OS/Arch: %s/%s", runtime.GOOS, runtime.GOARCH)) + log.Info(fmt.Sprintf("Version of operator-sdk: %v", sdkVersion.Version)) +} + func main() { - flags := aoflags.AddTo(pflag.CommandLine) + f := &flags.Flags{} + f.AddTo(pflag.CommandLine) pflag.Parse() logf.SetLogger(zap.Logger()) - if err := ansible.Run(flags); err != nil { - log.Fatal(err) + printVersion() + + cfg, err := config.GetConfig() + if err != nil { + log.Error(err, "Failed to get config.") + os.Exit(1) + } + + // Set default manager options + // TODO: probably should expose the host & port as an environment variables + options := manager.Options{ + HealthProbeBindAddress: fmt.Sprintf("%s:%d", metricsHost, healthProbePort), + MetricsBindAddress: fmt.Sprintf("%s:%d", metricsHost, metricsPort), + NewClient: func(cache cache.Cache, config *rest.Config, options client.Options) (client.Client, error) { + c, err := client.New(config, options) + if err != nil { + return nil, err + } + return &client.DelegatingClient{ + Reader: cache, + Writer: c, + StatusClient: c, + }, nil + }, + } + + namespace, found := os.LookupEnv(k8sutil.WatchNamespaceEnvVar) + log = log.WithValues("Namespace", namespace) + if found { + if namespace == metav1.NamespaceAll { + log.Info("Watching all namespaces.") + options.Namespace = metav1.NamespaceAll + } else { + if strings.Contains(namespace, ",") { + log.Info("Watching multiple namespaces.") + options.NewCache = cache.MultiNamespacedCacheBuilder(strings.Split(namespace, ",")) + } else { + log.Info("Watching single namespace.") + options.Namespace = namespace + } + } + } else { + log.Info(fmt.Sprintf("%v environment variable not set. Watching all namespaces.", + k8sutil.WatchNamespaceEnvVar)) + options.Namespace = metav1.NamespaceAll + } + + // Create a new manager to provide shared dependencies and start components + mgr, err := manager.New(cfg, options) + if err != nil { + log.Error(err, "Failed to create a new manager.") + os.Exit(1) + } + + var gvks []schema.GroupVersionKind + cMap := controllermap.NewControllerMap() + watches, err := watches.Load(f.WatchesFile, f.MaxWorkers, f.AnsibleVerbosity) + if err != nil { + log.Error(err, "Failed to load watches.") + os.Exit(1) + } + for _, w := range watches { + runner, err := runner.New(w) + if err != nil { + log.Error(err, "Failed to create runner") + os.Exit(1) + } + + ctr := controller.Add(mgr, controller.Options{ + GVK: w.GroupVersionKind, + Runner: runner, + ManageStatus: w.ManageStatus, + AnsibleDebugLogs: getAnsibleDebugLog(), + MaxWorkers: w.MaxWorkers, + ReconcilePeriod: w.ReconcilePeriod, + Selector: w.Selector, + }) + if ctr == nil { + log.Error(fmt.Errorf("failed to add controller for GVK %v", w.GroupVersionKind.String()), "") + os.Exit(1) + } + + cMap.Store(w.GroupVersionKind, &controllermap.Contents{Controller: *ctr, + WatchDependentResources: w.WatchDependentResources, + WatchClusterScopedResources: w.WatchClusterScopedResources, + OwnerWatchMap: controllermap.NewWatchMap(), + AnnotationWatchMap: controllermap.NewWatchMap(), + }, w.Blacklist) + gvks = append(gvks, w.GroupVersionKind) + } + + operatorName, err := k8sutil.GetOperatorName() + if err != nil { + log.Error(err, "Failed to get the operator name") + os.Exit(1) + } + + // Become the leader before proceeding + err = leader.Become(context.TODO(), operatorName+"-lock") + if err != nil { + log.Error(err, "Failed to become leader.") + os.Exit(1) + } + + addMetrics(context.TODO(), cfg, gvks) + err = mgr.AddHealthzCheck("ping", healthz.Ping) + if err != nil { + log.Error(err, "Failed to add Healthz check.") + } + + done := make(chan error) + + // start the proxy + err = proxy.Run(done, proxy.Options{ + Address: "localhost", + Port: 8888, + KubeConfig: mgr.GetConfig(), + Cache: mgr.GetCache(), + RESTMapper: mgr.GetRESTMapper(), + ControllerMap: cMap, + OwnerInjection: f.InjectOwnerRef, + WatchedNamespaces: []string{namespace}, + }) + if err != nil { + log.Error(err, "Error starting proxy.") + os.Exit(1) + } + + // start the operator + go func() { + done <- mgr.Start(signals.SetupSignalHandler()) + }() + + // wait for either to finish + err = <-done + if err != nil { + log.Error(err, "Proxy or operator exited with error.") + os.Exit(1) + } + log.Info("Exiting.") +} + +// addMetrics will create the Services and Service Monitors to allow the operator export the metrics by using +// the Prometheus operator +func addMetrics(ctx context.Context, cfg *rest.Config, gvks []schema.GroupVersionKind) { + // Get the namespace the operator is currently deployed in. + operatorNs, err := k8sutil.GetOperatorNamespace() + if err != nil { + if errors.Is(err, k8sutil.ErrRunLocal) { + log.Info("Skipping CR metrics server creation; not running in a cluster.") + return + } + } + + if err := serveCRMetrics(cfg, operatorNs, gvks); err != nil { + log.Info("Could not generate and serve custom resource metrics", "error", err.Error()) + } + + // Add to the below struct any other metrics ports you want to expose. + servicePorts := []v1.ServicePort{ + {Port: metricsPort, Name: metrics.OperatorPortName, Protocol: v1.ProtocolTCP, + TargetPort: intstr.IntOrString{Type: intstr.Int, IntVal: metricsPort}}, + {Port: operatorMetricsPort, Name: metrics.CRPortName, Protocol: v1.ProtocolTCP, + TargetPort: intstr.IntOrString{Type: intstr.Int, IntVal: operatorMetricsPort}}, + } + + // Create Service object to expose the metrics port(s). + service, err := metrics.CreateMetricsService(ctx, cfg, servicePorts) + if err != nil { + log.Info("Could not create metrics Service", "error", err.Error()) + return + } + + // CreateServiceMonitors will automatically create the prometheus-operator ServiceMonitor resources + // necessary to configure Prometheus to scrape metrics from this operator. + services := []*v1.Service{service} + + // The ServiceMonitor is created in the same namespace where the operator is deployed + _, err = metrics.CreateServiceMonitors(cfg, operatorNs, services) + if err != nil { + log.Info("Could not create ServiceMonitor object", "error", err.Error()) + // If this operator is deployed to a cluster without the prometheus-operator running, it will return + // ErrServiceMonitorNotPresent, which can be used to safely skip ServiceMonitor creation. + if err == metrics.ErrServiceMonitorNotPresent { + log.Info("Install prometheus-operator in your cluster to create ServiceMonitor objects", "error", err.Error()) + } + } +} + +// serveCRMetrics takes GVKs retrieved from watches and generates metrics based on those types. +// It serves those metrics on "http://metricsHost:operatorMetricsPort". +func serveCRMetrics(cfg *rest.Config, operatorNs string, gvks []schema.GroupVersionKind) error { + // The metrics will be generated from the namespaces which are returned here. + // NOTE that passing nil or an empty list of namespaces in GenerateAndServeCRMetrics will result in an error. + ns, err := kubemetrics.GetNamespacesForMetrics(operatorNs) + if err != nil { + return err + } + + // Generate and serve custom resource specific metrics. + err = kubemetrics.GenerateAndServeCRMetrics(cfg, ns, gvks, metricsHost, operatorMetricsPort) + if err != nil { + return err + } + return nil +} + +// getAnsibleDebugLog return the value from the ANSIBLE_DEBUG_LOGS it order to +// print the full Ansible logs +func getAnsibleDebugLog() bool { + const envVar = "ANSIBLE_DEBUG_LOGS" + val := false + if envVal, ok := os.LookupEnv(envVar); ok { + if i, err := strconv.ParseBool(envVal); err != nil { + log.Info("Could not parse environment variable as an boolean; using default value", + "envVar", envVar, "default", val) + } else { + val = i + } + } else if !ok { + log.Info("Environment variable not set; using default value", "envVar", envVar, + envVar, val) } + return val } diff --git a/cmd/helm-operator/main.go b/cmd/helm-operator/main.go index f76ba5d911..e97bb124ff 100644 --- a/cmd/helm-operator/main.go +++ b/cmd/helm-operator/main.go @@ -15,21 +15,217 @@ package main import ( - log "github.com/sirupsen/logrus" + "context" + "errors" + "fmt" + "os" + "runtime" + "strings" + "github.com/spf13/pflag" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/client-go/rest" + "sigs.k8s.io/controller-runtime/pkg/cache" + crclient "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/config" logf "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/manager/signals" - "github.com/operator-framework/operator-sdk/pkg/helm" - hoflags "github.com/operator-framework/operator-sdk/pkg/helm/flags" + "github.com/operator-framework/operator-sdk/pkg/helm/controller" + "github.com/operator-framework/operator-sdk/pkg/helm/flags" + "github.com/operator-framework/operator-sdk/pkg/helm/release" + "github.com/operator-framework/operator-sdk/pkg/helm/watches" + "github.com/operator-framework/operator-sdk/pkg/k8sutil" + kubemetrics "github.com/operator-framework/operator-sdk/pkg/kube-metrics" + "github.com/operator-framework/operator-sdk/pkg/leader" "github.com/operator-framework/operator-sdk/pkg/log/zap" + "github.com/operator-framework/operator-sdk/pkg/metrics" + sdkVersion "github.com/operator-framework/operator-sdk/version" +) + +var ( + metricsHost = "0.0.0.0" + metricsPort int32 = 8383 + operatorMetricsPort int32 = 8686 + + log = logf.Log.WithName("cmd") ) +func printVersion() { + log.Info(fmt.Sprintf("Go Version: %s", runtime.Version())) + log.Info(fmt.Sprintf("Go OS/Arch: %s/%s", runtime.GOOS, runtime.GOARCH)) + log.Info(fmt.Sprintf("Version of operator-sdk: %v", sdkVersion.Version)) +} + func main() { - flags := hoflags.AddTo(pflag.CommandLine) + f := flags.Flags{} + f.AddTo(pflag.CommandLine) pflag.Parse() logf.SetLogger(zap.Logger()) - if err := helm.Run(flags); err != nil { - log.Fatal(err) + printVersion() + + cfg, err := config.GetConfig() + if err != nil { + log.Error(err, "Failed to get config.") + os.Exit(1) + } + + // Set default manager options + options := manager.Options{ + MetricsBindAddress: fmt.Sprintf("%s:%d", metricsHost, metricsPort), + NewClient: func(cache cache.Cache, config *rest.Config, options crclient.Options) (crclient.Client, error) { + c, err := crclient.New(config, options) + if err != nil { + return nil, err + } + return &crclient.DelegatingClient{ + Reader: cache, + Writer: c, + StatusClient: c, + }, nil + }, + } + + namespace, found := os.LookupEnv(k8sutil.WatchNamespaceEnvVar) + log = log.WithValues("Namespace", namespace) + if found { + if namespace == metav1.NamespaceAll { + log.Info("Watching all namespaces.") + options.Namespace = metav1.NamespaceAll + } else { + if strings.Contains(namespace, ",") { + log.Info("Watching multiple namespaces.") + options.NewCache = cache.MultiNamespacedCacheBuilder(strings.Split(namespace, ",")) + } else { + log.Info("Watching single namespace.") + options.Namespace = namespace + } + } + } else { + log.Info(fmt.Sprintf("%v environment variable not set. Watching all namespaces.", + k8sutil.WatchNamespaceEnvVar)) + options.Namespace = metav1.NamespaceAll + } + + mgr, err := manager.New(cfg, options) + if err != nil { + log.Error(err, "Failed to create a new manager.") + os.Exit(1) + } + + ws, err := watches.Load(f.WatchesFile) + if err != nil { + log.Error(err, "Failed to create new manager factories.") + os.Exit(1) + } + var gvks []schema.GroupVersionKind + for _, w := range ws { + // Register the controller with the factory. + err := controller.Add(mgr, controller.WatchOptions{ + Namespace: namespace, + GVK: w.GroupVersionKind, + ManagerFactory: release.NewManagerFactory(mgr, w.ChartDir), + ReconcilePeriod: f.ReconcilePeriod, + WatchDependentResources: *w.WatchDependentResources, + OverrideValues: w.OverrideValues, + MaxWorkers: f.MaxWorkers, + }) + if err != nil { + log.Error(err, "Failed to add manager factory to controller.") + os.Exit(1) + } + gvks = append(gvks, w.GroupVersionKind) + } + + operatorName, err := k8sutil.GetOperatorName() + if err != nil { + log.Error(err, "Failed to get operator name") + os.Exit(1) + } + + ctx := context.TODO() + + // Become the leader before proceeding + err = leader.Become(ctx, operatorName+"-lock") + if err != nil { + log.Error(err, "Failed to become leader.") + os.Exit(1) + } + + addMetrics(context.TODO(), cfg, gvks) + + // Start the Cmd + if err = mgr.Start(signals.SetupSignalHandler()); err != nil { + log.Error(err, "Manager exited non-zero.") + os.Exit(1) + } +} + +// addMetrics will create the Services and Service Monitors to allow the operator export the metrics by using +// the Prometheus operator +func addMetrics(ctx context.Context, cfg *rest.Config, gvks []schema.GroupVersionKind) { + // Get the namespace the operator is currently deployed in. + operatorNs, err := k8sutil.GetOperatorNamespace() + if err != nil { + if errors.Is(err, k8sutil.ErrRunLocal) { + log.Info("Skipping CR metrics server creation; not running in a cluster.") + return + } + } + + if err := serveCRMetrics(cfg, operatorNs, gvks); err != nil { + log.Info("Could not generate and serve custom resource metrics", "error", err.Error()) + } + + // Add to the below struct any other metrics ports you want to expose. + servicePorts := []v1.ServicePort{ + {Port: metricsPort, Name: metrics.OperatorPortName, Protocol: v1.ProtocolTCP, + TargetPort: intstr.IntOrString{Type: intstr.Int, IntVal: metricsPort}}, + {Port: operatorMetricsPort, Name: metrics.CRPortName, Protocol: v1.ProtocolTCP, + TargetPort: intstr.IntOrString{Type: intstr.Int, IntVal: operatorMetricsPort}}, + } + + // Create Service object to expose the metrics port(s). + service, err := metrics.CreateMetricsService(ctx, cfg, servicePorts) + if err != nil { + log.Info("Could not create metrics Service", "error", err.Error()) + } + + // CreateServiceMonitors will automatically create the prometheus-operator ServiceMonitor resources + // necessary to configure Prometheus to scrape metrics from this operator. + services := []*v1.Service{service} + + // The ServiceMonitor is created in the same namespace where the operator is deployed + _, err = metrics.CreateServiceMonitors(cfg, operatorNs, services) + if err != nil { + log.Info("Could not create ServiceMonitor object", "error", err.Error()) + // If this operator is deployed to a cluster without the prometheus-operator running, it will return + // ErrServiceMonitorNotPresent, which can be used to safely skip ServiceMonitor creation. + if err == metrics.ErrServiceMonitorNotPresent { + log.Info("Install prometheus-operator in your cluster to create ServiceMonitor objects", "error", err.Error()) + } + } +} + +// serveCRMetrics gets the Operator/CustomResource GVKs and generates metrics based on those types. +// It serves those metrics on "http://metricsHost:operatorMetricsPort". +func serveCRMetrics(cfg *rest.Config, operatorNs string, gvks []schema.GroupVersionKind) error { + // The metrics will be generated from the namespaces which are returned here. + // NOTE that passing nil or an empty list of namespaces in GenerateAndServeCRMetrics will result in an error. + ns, err := kubemetrics.GetNamespacesForMetrics(operatorNs) + if err != nil { + return err + } + + // Generate and serve custom resource specific metrics. + err = kubemetrics.GenerateAndServeCRMetrics(cfg, ns, gvks, metricsHost, operatorMetricsPort) + if err != nil { + return err } + return nil } diff --git a/internal/flags/watch/flags.go b/internal/flags/watch/flags.go deleted file mode 100644 index 1d52c58d93..0000000000 --- a/internal/flags/watch/flags.go +++ /dev/null @@ -1,54 +0,0 @@ -// Copyright 2019 The Operator-SDK Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package watch - -import ( - "strings" - "time" - - "github.com/spf13/pflag" -) - -// todo(camilamacedo86):the WatchFlags should be renamed for the release 1.0.0 -// WatchFlags provides flag for configuration of a controller's reconcile period and for a -// watches.yaml file, which is used to configure dynamic operators (e.g. Ansible and Helm). -type WatchFlags struct { //nolint:golint - /* - The nolint is regards to: type name will be used as watch.WatchFlags by other packages, and that stutters; - consider calling this Flags (golint) - todo(camilamacedo86): Note that we decided to not introduce breakchanges to add the linters - and it should be done after. - From @joelanford: Even though watch.WatchFlags is an internal type, it is embedded in exported types, - which means that changing it to watch.Flags is a breaking change. - */ - - ReconcilePeriod time.Duration - WatchesFile string -} - -// AddTo - Add the reconcile period and watches file flags to the the flagset -// helpTextPrefix will allow you add a prefix to default help text. Joined by a space. -func (f *WatchFlags) AddTo(flagSet *pflag.FlagSet, helpTextPrefix ...string) { - flagSet.DurationVar(&f.ReconcilePeriod, - "reconcile-period", - time.Minute, - strings.Join(append(helpTextPrefix, "Default reconcile period for controllers"), " "), - ) - flagSet.StringVar(&f.WatchesFile, - "watches-file", - "./watches.yaml", - strings.Join(append(helpTextPrefix, "Path to the watches file to use"), " "), - ) -} diff --git a/pkg/ansible/flags/flag.go b/pkg/ansible/flags/flag.go index 3bb961ba61..92a57c211e 100644 --- a/pkg/ansible/flags/flag.go +++ b/pkg/ansible/flags/flag.go @@ -15,16 +15,18 @@ package flags import ( - "strings" + "runtime" + "time" - "github.com/operator-framework/operator-sdk/internal/flags/watch" - "github.com/operator-framework/operator-sdk/pkg/log/zap" "github.com/spf13/pflag" + + "github.com/operator-framework/operator-sdk/pkg/log/zap" ) -// AnsibleOperatorFlags - Options to be used by an ansible operator -type AnsibleOperatorFlags struct { - watch.WatchFlags +// Flags - Options to be used by an ansible operator +type Flags struct { + ReconcilePeriod time.Duration + WatchesFile string InjectOwnerRef bool MaxWorkers int AnsibleVerbosity int @@ -37,45 +39,41 @@ const AnsibleCollectionsPathEnvVar = "ANSIBLE_COLLECTIONS_PATH" // AddTo - Add the ansible operator flags to the the flagset // helpTextPrefix will allow you add a prefix to default help text. Joined by a space. -func AddTo(flagSet *pflag.FlagSet, helpTextPrefix ...string) *AnsibleOperatorFlags { - aof := &AnsibleOperatorFlags{} - aof.WatchFlags.AddTo(flagSet, helpTextPrefix...) +func (f *Flags) AddTo(flagSet *pflag.FlagSet, helpTextPrefix ...string) { flagSet.AddFlagSet(zap.FlagSet()) - flagSet.BoolVar(&aof.InjectOwnerRef, + flagSet.DurationVar(&f.ReconcilePeriod, + "reconcile-period", + time.Minute, + "Default reconcile period for controllers", + ) + flagSet.StringVar(&f.WatchesFile, + "watches-file", + "./watches.yaml", + "Path to the watches file to use", + ) + flagSet.BoolVar(&f.InjectOwnerRef, "inject-owner-ref", true, - strings.Join(append(helpTextPrefix, - "The ansible operator will inject owner references unless this flag is false"), " "), + "The ansible operator will inject owner references unless this flag is false", ) - flagSet.IntVar(&aof.MaxWorkers, + flagSet.IntVar(&f.MaxWorkers, "max-workers", - 1, - strings.Join(append(helpTextPrefix, - "Maximum number of workers to use. Overridden by environment variable."), - " "), + runtime.NumCPU(), + "Maximum number of workers to use. Overridden by environment variable.", ) - flagSet.IntVar(&aof.AnsibleVerbosity, + flagSet.IntVar(&f.AnsibleVerbosity, "ansible-verbosity", 2, - strings.Join(append(helpTextPrefix, - "Ansible verbosity. Overridden by environment variable."), - " "), + "Ansible verbosity. Overridden by environment variable.", ) - flagSet.StringVar(&aof.AnsibleRolesPath, + flagSet.StringVar(&f.AnsibleRolesPath, "ansible-roles-path", "", - strings.Join(append(helpTextPrefix, - "Ansible Roles Path. If unset, roles are assumed to be in {{CWD}}/roles."), - " "), + "Ansible Roles Path. If unset, roles are assumed to be in {{CWD}}/roles.", ) - flagSet.StringVar(&aof.AnsibleCollectionsPath, + flagSet.StringVar(&f.AnsibleCollectionsPath, "ansible-collections-path", "", - strings.Join(append(helpTextPrefix, - `Path to installed Ansible Collections. If set, collections should be - located in {{value}}/ansible_collections/. If unset, collections are - assumed to be in ~/.ansible/collections or - /usr/share/ansible/collections.`), " "), + "Path to installed Ansible Collections. If set, collections should be located in {{value}}/ansible_collections/. If unset, collections are assumed to be in ~/.ansible/collections or /usr/share/ansible/collections.", ) - return aof } diff --git a/pkg/ansible/run.go b/pkg/ansible/run.go deleted file mode 100644 index 3bbc5ee24e..0000000000 --- a/pkg/ansible/run.go +++ /dev/null @@ -1,293 +0,0 @@ -// Copyright 2019 The Operator-SDK Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package ansible - -import ( - "context" - "errors" - "fmt" - "os" - "runtime" - "strconv" - "strings" - - "github.com/operator-framework/operator-sdk/pkg/ansible/controller" - aoflags "github.com/operator-framework/operator-sdk/pkg/ansible/flags" - proxy "github.com/operator-framework/operator-sdk/pkg/ansible/proxy" - "github.com/operator-framework/operator-sdk/pkg/ansible/proxy/controllermap" - "github.com/operator-framework/operator-sdk/pkg/ansible/runner" - "github.com/operator-framework/operator-sdk/pkg/ansible/watches" - "github.com/operator-framework/operator-sdk/pkg/k8sutil" - kubemetrics "github.com/operator-framework/operator-sdk/pkg/kube-metrics" - "github.com/operator-framework/operator-sdk/pkg/leader" - "github.com/operator-framework/operator-sdk/pkg/metrics" - sdkVersion "github.com/operator-framework/operator-sdk/version" - "sigs.k8s.io/controller-runtime/pkg/healthz" - - v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/util/intstr" - "k8s.io/client-go/rest" - "sigs.k8s.io/controller-runtime/pkg/cache" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/client/config" - logf "sigs.k8s.io/controller-runtime/pkg/log" - "sigs.k8s.io/controller-runtime/pkg/manager" - "sigs.k8s.io/controller-runtime/pkg/manager/signals" -) - -var ( - metricsHost = "0.0.0.0" - log = logf.Log.WithName("cmd") - metricsPort int32 = 8383 - operatorMetricsPort int32 = 8686 - healthProbePort int32 = 6789 -) - -func printVersion() { - log.Info(fmt.Sprintf("Go Version: %s", runtime.Version())) - log.Info(fmt.Sprintf("Go OS/Arch: %s/%s", runtime.GOOS, runtime.GOARCH)) - log.Info(fmt.Sprintf("Version of operator-sdk: %v", sdkVersion.Version)) -} - -// Run will start the ansible operator and proxy, blocking until one of them -// returns. -func Run(flags *aoflags.AnsibleOperatorFlags) error { - printVersion() - - cfg, err := config.GetConfig() - if err != nil { - log.Error(err, "Failed to get config.") - return err - } - - // Set default manager options - // TODO: probably should expose the host & port as an environment variables - options := manager.Options{ - HealthProbeBindAddress: fmt.Sprintf("%s:%d", metricsHost, healthProbePort), - MetricsBindAddress: fmt.Sprintf("%s:%d", metricsHost, metricsPort), - NewClient: func(cache cache.Cache, config *rest.Config, options client.Options) (client.Client, error) { - c, err := client.New(config, options) - if err != nil { - return nil, err - } - return &client.DelegatingClient{ - Reader: cache, - Writer: c, - StatusClient: c, - }, nil - }, - } - - namespace, found := os.LookupEnv(k8sutil.WatchNamespaceEnvVar) - log = log.WithValues("Namespace", namespace) - if found { - if namespace == metav1.NamespaceAll { - log.Info("Watching all namespaces.") - options.Namespace = metav1.NamespaceAll - } else { - if strings.Contains(namespace, ",") { - log.Info("Watching multiple namespaces.") - options.NewCache = cache.MultiNamespacedCacheBuilder(strings.Split(namespace, ",")) - } else { - log.Info("Watching single namespace.") - options.Namespace = namespace - } - } - } else { - log.Info(fmt.Sprintf("%v environment variable not set. Watching all namespaces.", - k8sutil.WatchNamespaceEnvVar)) - options.Namespace = metav1.NamespaceAll - } - - // Create a new manager to provide shared dependencies and start components - mgr, err := manager.New(cfg, options) - if err != nil { - log.Error(err, "Failed to create a new manager.") - return err - } - - var gvks []schema.GroupVersionKind - cMap := controllermap.NewControllerMap() - watches, err := watches.Load(flags.WatchesFile, flags.MaxWorkers, flags.AnsibleVerbosity) - if err != nil { - log.Error(err, "Failed to load watches.") - return err - } - for _, w := range watches { - runner, err := runner.New(w) - if err != nil { - log.Error(err, "Failed to create runner") - return err - } - - ctr := controller.Add(mgr, controller.Options{ - GVK: w.GroupVersionKind, - Runner: runner, - ManageStatus: w.ManageStatus, - AnsibleDebugLogs: getAnsibleDebugLog(), - MaxWorkers: w.MaxWorkers, - ReconcilePeriod: w.ReconcilePeriod, - Selector: w.Selector, - }) - if ctr == nil { - return fmt.Errorf("failed to add controller for GVK %v", w.GroupVersionKind.String()) - } - - cMap.Store(w.GroupVersionKind, &controllermap.Contents{Controller: *ctr, - WatchDependentResources: w.WatchDependentResources, - WatchClusterScopedResources: w.WatchClusterScopedResources, - OwnerWatchMap: controllermap.NewWatchMap(), - AnnotationWatchMap: controllermap.NewWatchMap(), - }, w.Blacklist) - gvks = append(gvks, w.GroupVersionKind) - } - - operatorName, err := k8sutil.GetOperatorName() - if err != nil { - log.Error(err, "Failed to get the operator name") - return err - } - - // Become the leader before proceeding - err = leader.Become(context.TODO(), operatorName+"-lock") - if err != nil { - log.Error(err, "Failed to become leader.") - return err - } - - addMetrics(context.TODO(), cfg, gvks) - err = mgr.AddHealthzCheck("ping", healthz.Ping) - if err != nil { - log.Error(err, "Failed to add Healthz check.") - } - - done := make(chan error) - - // start the proxy - err = proxy.Run(done, proxy.Options{ - Address: "localhost", - Port: 8888, - KubeConfig: mgr.GetConfig(), - Cache: mgr.GetCache(), - RESTMapper: mgr.GetRESTMapper(), - ControllerMap: cMap, - OwnerInjection: flags.InjectOwnerRef, - WatchedNamespaces: []string{namespace}, - }) - if err != nil { - log.Error(err, "Error starting proxy.") - return err - } - - // start the operator - go func() { - done <- mgr.Start(signals.SetupSignalHandler()) - }() - - // wait for either to finish - err = <-done - if err != nil { - log.Error(err, "Proxy or operator exited with error.") - os.Exit(1) - } - log.Info("Exiting.") - return nil -} - -// addMetrics will create the Services and Service Monitors to allow the operator export the metrics by using -// the Prometheus operator -func addMetrics(ctx context.Context, cfg *rest.Config, gvks []schema.GroupVersionKind) { - // Get the namespace the operator is currently deployed in. - operatorNs, err := k8sutil.GetOperatorNamespace() - if err != nil { - if errors.Is(err, k8sutil.ErrRunLocal) { - log.Info("Skipping CR metrics server creation; not running in a cluster.") - return - } - } - - if err := serveCRMetrics(cfg, operatorNs, gvks); err != nil { - log.Info("Could not generate and serve custom resource metrics", "error", err.Error()) - } - - // Add to the below struct any other metrics ports you want to expose. - servicePorts := []v1.ServicePort{ - {Port: metricsPort, Name: metrics.OperatorPortName, Protocol: v1.ProtocolTCP, - TargetPort: intstr.IntOrString{Type: intstr.Int, IntVal: metricsPort}}, - {Port: operatorMetricsPort, Name: metrics.CRPortName, Protocol: v1.ProtocolTCP, - TargetPort: intstr.IntOrString{Type: intstr.Int, IntVal: operatorMetricsPort}}, - } - - // Create Service object to expose the metrics port(s). - service, err := metrics.CreateMetricsService(ctx, cfg, servicePorts) - if err != nil { - log.Info("Could not create metrics Service", "error", err.Error()) - return - } - - // CreateServiceMonitors will automatically create the prometheus-operator ServiceMonitor resources - // necessary to configure Prometheus to scrape metrics from this operator. - services := []*v1.Service{service} - - // The ServiceMonitor is created in the same namespace where the operator is deployed - _, err = metrics.CreateServiceMonitors(cfg, operatorNs, services) - if err != nil { - log.Info("Could not create ServiceMonitor object", "error", err.Error()) - // If this operator is deployed to a cluster without the prometheus-operator running, it will return - // ErrServiceMonitorNotPresent, which can be used to safely skip ServiceMonitor creation. - if err == metrics.ErrServiceMonitorNotPresent { - log.Info("Install prometheus-operator in your cluster to create ServiceMonitor objects", "error", err.Error()) - } - } -} - -// serveCRMetrics takes GVKs retrieved from watches and generates metrics based on those types. -// It serves those metrics on "http://metricsHost:operatorMetricsPort". -func serveCRMetrics(cfg *rest.Config, operatorNs string, gvks []schema.GroupVersionKind) error { - // The metrics will be generated from the namespaces which are returned here. - // NOTE that passing nil or an empty list of namespaces in GenerateAndServeCRMetrics will result in an error. - ns, err := kubemetrics.GetNamespacesForMetrics(operatorNs) - if err != nil { - return err - } - - // Generate and serve custom resource specific metrics. - err = kubemetrics.GenerateAndServeCRMetrics(cfg, ns, gvks, metricsHost, operatorMetricsPort) - if err != nil { - return err - } - return nil -} - -// getAnsibleDebugLog return the value from the ANSIBLE_DEBUG_LOGS it order to -// print the full Ansible logs -func getAnsibleDebugLog() bool { - const envVar = "ANSIBLE_DEBUG_LOGS" - val := false - if envVal, ok := os.LookupEnv(envVar); ok { - if i, err := strconv.ParseBool(envVal); err != nil { - log.Info("Could not parse environment variable as an boolean; using default value", - "envVar", envVar, "default", val) - } else { - val = i - } - } else if !ok { - log.Info("Environment variable not set; using default value", "envVar", envVar, - envVar, val) - } - return val -} diff --git a/pkg/helm/flags/flag.go b/pkg/helm/flags/flag.go index 87b137457d..b19fef2825 100644 --- a/pkg/helm/flags/flag.go +++ b/pkg/helm/flags/flag.go @@ -15,31 +15,37 @@ package flags import ( - "strings" + "runtime" + "time" - "github.com/operator-framework/operator-sdk/internal/flags/watch" - "github.com/operator-framework/operator-sdk/pkg/log/zap" "github.com/spf13/pflag" + + "github.com/operator-framework/operator-sdk/pkg/log/zap" ) -// HelmOperatorFlags - Options to be used by a helm operator -type HelmOperatorFlags struct { - watch.WatchFlags - MaxWorkers int +// Flags - Options to be used by a helm operator +type Flags struct { + ReconcilePeriod time.Duration + WatchesFile string + MaxWorkers int } // AddTo - Add the helm operator flags to the the flagset -// helpTextPrefix will allow you add a prefix to default help text. Joined by a space. -func AddTo(flagSet *pflag.FlagSet, helpTextPrefix ...string) *HelmOperatorFlags { - hof := &HelmOperatorFlags{} - hof.WatchFlags.AddTo(flagSet, helpTextPrefix...) +func (f *Flags) AddTo(flagSet *pflag.FlagSet) { flagSet.AddFlagSet(zap.FlagSet()) - flagSet.IntVar(&hof.MaxWorkers, + flagSet.DurationVar(&f.ReconcilePeriod, + "reconcile-period", + time.Minute, + "Default reconcile period for controllers", + ) + flagSet.StringVar(&f.WatchesFile, + "watches-file", + "./watches.yaml", + "Path to the watches file to use", + ) + flagSet.IntVar(&f.MaxWorkers, "max-workers", - 1, - strings.Join(append(helpTextPrefix, - "Maximum number of workers to use."), - " "), + runtime.NumCPU(), + "Maximum number of workers to use", ) - return hof } diff --git a/pkg/helm/run.go b/pkg/helm/run.go deleted file mode 100644 index 949ef6d043..0000000000 --- a/pkg/helm/run.go +++ /dev/null @@ -1,226 +0,0 @@ -// Copyright 2019 The Operator-SDK Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package helm - -import ( - "context" - "errors" - "fmt" - "os" - "runtime" - "strings" - - v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/util/intstr" - "k8s.io/client-go/rest" - "sigs.k8s.io/controller-runtime/pkg/cache" - crclient "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/client/config" - logf "sigs.k8s.io/controller-runtime/pkg/log" - "sigs.k8s.io/controller-runtime/pkg/manager" - "sigs.k8s.io/controller-runtime/pkg/manager/signals" - - "github.com/operator-framework/operator-sdk/pkg/helm/controller" - hoflags "github.com/operator-framework/operator-sdk/pkg/helm/flags" - "github.com/operator-framework/operator-sdk/pkg/helm/release" - "github.com/operator-framework/operator-sdk/pkg/helm/watches" - "github.com/operator-framework/operator-sdk/pkg/k8sutil" - kubemetrics "github.com/operator-framework/operator-sdk/pkg/kube-metrics" - "github.com/operator-framework/operator-sdk/pkg/leader" - "github.com/operator-framework/operator-sdk/pkg/metrics" - sdkVersion "github.com/operator-framework/operator-sdk/version" -) - -var ( - metricsHost = "0.0.0.0" - metricsPort int32 = 8383 - operatorMetricsPort int32 = 8686 -) - -var log = logf.Log.WithName("cmd") - -func printVersion() { - log.Info(fmt.Sprintf("Go Version: %s", runtime.Version())) - log.Info(fmt.Sprintf("Go OS/Arch: %s/%s", runtime.GOOS, runtime.GOARCH)) - log.Info(fmt.Sprintf("Version of operator-sdk: %v", sdkVersion.Version)) -} - -// Run runs the helm operator -func Run(flags *hoflags.HelmOperatorFlags) error { - printVersion() - - cfg, err := config.GetConfig() - if err != nil { - log.Error(err, "Failed to get config.") - return err - } - - // Set default manager options - options := manager.Options{ - MetricsBindAddress: fmt.Sprintf("%s:%d", metricsHost, metricsPort), - NewClient: func(cache cache.Cache, config *rest.Config, options crclient.Options) (crclient.Client, error) { - c, err := crclient.New(config, options) - if err != nil { - return nil, err - } - return &crclient.DelegatingClient{ - Reader: cache, - Writer: c, - StatusClient: c, - }, nil - }, - } - - namespace, found := os.LookupEnv(k8sutil.WatchNamespaceEnvVar) - log = log.WithValues("Namespace", namespace) - if found { - if namespace == metav1.NamespaceAll { - log.Info("Watching all namespaces.") - options.Namespace = metav1.NamespaceAll - } else { - if strings.Contains(namespace, ",") { - log.Info("Watching multiple namespaces.") - options.NewCache = cache.MultiNamespacedCacheBuilder(strings.Split(namespace, ",")) - } else { - log.Info("Watching single namespace.") - options.Namespace = namespace - } - } - } else { - log.Info(fmt.Sprintf("%v environment variable not set. Watching all namespaces.", - k8sutil.WatchNamespaceEnvVar)) - options.Namespace = metav1.NamespaceAll - } - - mgr, err := manager.New(cfg, options) - if err != nil { - log.Error(err, "Failed to create a new manager.") - return err - } - - ws, err := watches.Load(flags.WatchesFile) - if err != nil { - log.Error(err, "Failed to create new manager factories.") - return err - } - var gvks []schema.GroupVersionKind - for _, w := range ws { - // Register the controller with the factory. - err := controller.Add(mgr, controller.WatchOptions{ - Namespace: namespace, - GVK: w.GroupVersionKind, - ManagerFactory: release.NewManagerFactory(mgr, w.ChartDir), - ReconcilePeriod: flags.ReconcilePeriod, - WatchDependentResources: *w.WatchDependentResources, - OverrideValues: w.OverrideValues, - MaxWorkers: flags.MaxWorkers, - }) - if err != nil { - log.Error(err, "Failed to add manager factory to controller.") - return err - } - gvks = append(gvks, w.GroupVersionKind) - } - - operatorName, err := k8sutil.GetOperatorName() - if err != nil { - log.Error(err, "Failed to get operator name") - return err - } - - ctx := context.TODO() - - // Become the leader before proceeding - err = leader.Become(ctx, operatorName+"-lock") - if err != nil { - log.Error(err, "Failed to become leader.") - return err - } - - addMetrics(context.TODO(), cfg, gvks) - - // Start the Cmd - if err = mgr.Start(signals.SetupSignalHandler()); err != nil { - log.Error(err, "Manager exited non-zero.") - os.Exit(1) - } - return nil -} - -// addMetrics will create the Services and Service Monitors to allow the operator export the metrics by using -// the Prometheus operator -func addMetrics(ctx context.Context, cfg *rest.Config, gvks []schema.GroupVersionKind) { - // Get the namespace the operator is currently deployed in. - operatorNs, err := k8sutil.GetOperatorNamespace() - if err != nil { - if errors.Is(err, k8sutil.ErrRunLocal) { - log.Info("Skipping CR metrics server creation; not running in a cluster.") - return - } - } - - if err := serveCRMetrics(cfg, operatorNs, gvks); err != nil { - log.Info("Could not generate and serve custom resource metrics", "error", err.Error()) - } - - // Add to the below struct any other metrics ports you want to expose. - servicePorts := []v1.ServicePort{ - {Port: metricsPort, Name: metrics.OperatorPortName, Protocol: v1.ProtocolTCP, - TargetPort: intstr.IntOrString{Type: intstr.Int, IntVal: metricsPort}}, - {Port: operatorMetricsPort, Name: metrics.CRPortName, Protocol: v1.ProtocolTCP, - TargetPort: intstr.IntOrString{Type: intstr.Int, IntVal: operatorMetricsPort}}, - } - - // Create Service object to expose the metrics port(s). - service, err := metrics.CreateMetricsService(ctx, cfg, servicePorts) - if err != nil { - log.Info("Could not create metrics Service", "error", err.Error()) - } - - // CreateServiceMonitors will automatically create the prometheus-operator ServiceMonitor resources - // necessary to configure Prometheus to scrape metrics from this operator. - services := []*v1.Service{service} - - // The ServiceMonitor is created in the same namespace where the operator is deployed - _, err = metrics.CreateServiceMonitors(cfg, operatorNs, services) - if err != nil { - log.Info("Could not create ServiceMonitor object", "error", err.Error()) - // If this operator is deployed to a cluster without the prometheus-operator running, it will return - // ErrServiceMonitorNotPresent, which can be used to safely skip ServiceMonitor creation. - if err == metrics.ErrServiceMonitorNotPresent { - log.Info("Install prometheus-operator in your cluster to create ServiceMonitor objects", "error", err.Error()) - } - } -} - -// serveCRMetrics gets the Operator/CustomResource GVKs and generates metrics based on those types. -// It serves those metrics on "http://metricsHost:operatorMetricsPort". -func serveCRMetrics(cfg *rest.Config, operatorNs string, gvks []schema.GroupVersionKind) error { - // The metrics will be generated from the namespaces which are returned here. - // NOTE that passing nil or an empty list of namespaces in GenerateAndServeCRMetrics will result in an error. - ns, err := kubemetrics.GetNamespacesForMetrics(operatorNs) - if err != nil { - return err - } - - // Generate and serve custom resource specific metrics. - err = kubemetrics.GenerateAndServeCRMetrics(cfg, ns, gvks, metricsHost, operatorMetricsPort) - if err != nil { - return err - } - return nil -}