Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cmd/ansible-operator,cmd/helm-operator: embed code in main.go #3415

Merged
merged 1 commit into from
Jul 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
275 changes: 269 additions & 6 deletions cmd/ansible-operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,284 @@
package main

import (
log "github.com/sirupsen/logrus"
"context"
"errors"
"fmt"
"os"
"runtime"
"strconv"
"strings"

"github.com/spf13/pflag"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/config"
"sigs.k8s.io/controller-runtime/pkg/healthz"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/manager/signals"

"github.com/operator-framework/operator-sdk/pkg/ansible"
aoflags "github.com/operator-framework/operator-sdk/pkg/ansible/flags"
"github.com/operator-framework/operator-sdk/pkg/ansible/controller"
"github.com/operator-framework/operator-sdk/pkg/ansible/flags"
"github.com/operator-framework/operator-sdk/pkg/ansible/proxy"
"github.com/operator-framework/operator-sdk/pkg/ansible/proxy/controllermap"
"github.com/operator-framework/operator-sdk/pkg/ansible/runner"
"github.com/operator-framework/operator-sdk/pkg/ansible/watches"
"github.com/operator-framework/operator-sdk/pkg/k8sutil"
kubemetrics "github.com/operator-framework/operator-sdk/pkg/kube-metrics"
"github.com/operator-framework/operator-sdk/pkg/leader"
"github.com/operator-framework/operator-sdk/pkg/log/zap"
"github.com/operator-framework/operator-sdk/pkg/metrics"
sdkVersion "github.com/operator-framework/operator-sdk/version"
)

var (
metricsHost = "0.0.0.0"
log = logf.Log.WithName("cmd")
metricsPort int32 = 8383
operatorMetricsPort int32 = 8686
healthProbePort int32 = 6789
)

func printVersion() {
log.Info(fmt.Sprintf("Go Version: %s", runtime.Version()))
log.Info(fmt.Sprintf("Go OS/Arch: %s/%s", runtime.GOOS, runtime.GOARCH))
log.Info(fmt.Sprintf("Version of operator-sdk: %v", sdkVersion.Version))
}

func main() {
flags := aoflags.AddTo(pflag.CommandLine)
f := &flags.Flags{}
f.AddTo(pflag.CommandLine)
pflag.Parse()
logf.SetLogger(zap.Logger())

if err := ansible.Run(flags); err != nil {
log.Fatal(err)
printVersion()

cfg, err := config.GetConfig()
if err != nil {
log.Error(err, "Failed to get config.")
os.Exit(1)
}

// Set default manager options
// TODO: probably should expose the host & port as an environment variables
options := manager.Options{
HealthProbeBindAddress: fmt.Sprintf("%s:%d", metricsHost, healthProbePort),
MetricsBindAddress: fmt.Sprintf("%s:%d", metricsHost, metricsPort),
NewClient: func(cache cache.Cache, config *rest.Config, options client.Options) (client.Client, error) {
c, err := client.New(config, options)
if err != nil {
return nil, err
}
return &client.DelegatingClient{
Reader: cache,
Writer: c,
StatusClient: c,
}, nil
},
}

namespace, found := os.LookupEnv(k8sutil.WatchNamespaceEnvVar)
log = log.WithValues("Namespace", namespace)
if found {
if namespace == metav1.NamespaceAll {
log.Info("Watching all namespaces.")
options.Namespace = metav1.NamespaceAll
} else {
if strings.Contains(namespace, ",") {
log.Info("Watching multiple namespaces.")
options.NewCache = cache.MultiNamespacedCacheBuilder(strings.Split(namespace, ","))
} else {
log.Info("Watching single namespace.")
options.Namespace = namespace
}
}
} else {
log.Info(fmt.Sprintf("%v environment variable not set. Watching all namespaces.",
k8sutil.WatchNamespaceEnvVar))
options.Namespace = metav1.NamespaceAll
}

// Create a new manager to provide shared dependencies and start components
mgr, err := manager.New(cfg, options)
if err != nil {
log.Error(err, "Failed to create a new manager.")
os.Exit(1)
}

var gvks []schema.GroupVersionKind
cMap := controllermap.NewControllerMap()
watches, err := watches.Load(f.WatchesFile, f.MaxWorkers, f.AnsibleVerbosity)
if err != nil {
log.Error(err, "Failed to load watches.")
os.Exit(1)
}
for _, w := range watches {
runner, err := runner.New(w)
if err != nil {
log.Error(err, "Failed to create runner")
os.Exit(1)
}

ctr := controller.Add(mgr, controller.Options{
GVK: w.GroupVersionKind,
Runner: runner,
ManageStatus: w.ManageStatus,
AnsibleDebugLogs: getAnsibleDebugLog(),
MaxWorkers: w.MaxWorkers,
ReconcilePeriod: w.ReconcilePeriod,
Selector: w.Selector,
})
if ctr == nil {
log.Error(fmt.Errorf("failed to add controller for GVK %v", w.GroupVersionKind.String()), "")
os.Exit(1)
}

cMap.Store(w.GroupVersionKind, &controllermap.Contents{Controller: *ctr,
WatchDependentResources: w.WatchDependentResources,
WatchClusterScopedResources: w.WatchClusterScopedResources,
OwnerWatchMap: controllermap.NewWatchMap(),
AnnotationWatchMap: controllermap.NewWatchMap(),
}, w.Blacklist)
gvks = append(gvks, w.GroupVersionKind)
}

operatorName, err := k8sutil.GetOperatorName()
if err != nil {
log.Error(err, "Failed to get the operator name")
os.Exit(1)
}

// Become the leader before proceeding
err = leader.Become(context.TODO(), operatorName+"-lock")
if err != nil {
log.Error(err, "Failed to become leader.")
os.Exit(1)
}

addMetrics(context.TODO(), cfg, gvks)
err = mgr.AddHealthzCheck("ping", healthz.Ping)
if err != nil {
log.Error(err, "Failed to add Healthz check.")
}

done := make(chan error)

// start the proxy
err = proxy.Run(done, proxy.Options{
Address: "localhost",
Port: 8888,
KubeConfig: mgr.GetConfig(),
Cache: mgr.GetCache(),
RESTMapper: mgr.GetRESTMapper(),
ControllerMap: cMap,
OwnerInjection: f.InjectOwnerRef,
WatchedNamespaces: []string{namespace},
})
if err != nil {
log.Error(err, "Error starting proxy.")
os.Exit(1)
}

// start the operator
go func() {
done <- mgr.Start(signals.SetupSignalHandler())
}()

// wait for either to finish
err = <-done
if err != nil {
log.Error(err, "Proxy or operator exited with error.")
os.Exit(1)
}
log.Info("Exiting.")
}

// addMetrics will create the Services and Service Monitors to allow the operator export the metrics by using
// the Prometheus operator
func addMetrics(ctx context.Context, cfg *rest.Config, gvks []schema.GroupVersionKind) {
// Get the namespace the operator is currently deployed in.
operatorNs, err := k8sutil.GetOperatorNamespace()
if err != nil {
if errors.Is(err, k8sutil.ErrRunLocal) {
log.Info("Skipping CR metrics server creation; not running in a cluster.")
return
}
}

if err := serveCRMetrics(cfg, operatorNs, gvks); err != nil {
log.Info("Could not generate and serve custom resource metrics", "error", err.Error())
}

// Add to the below struct any other metrics ports you want to expose.
servicePorts := []v1.ServicePort{
{Port: metricsPort, Name: metrics.OperatorPortName, Protocol: v1.ProtocolTCP,
TargetPort: intstr.IntOrString{Type: intstr.Int, IntVal: metricsPort}},
{Port: operatorMetricsPort, Name: metrics.CRPortName, Protocol: v1.ProtocolTCP,
TargetPort: intstr.IntOrString{Type: intstr.Int, IntVal: operatorMetricsPort}},
}

// Create Service object to expose the metrics port(s).
service, err := metrics.CreateMetricsService(ctx, cfg, servicePorts)
if err != nil {
log.Info("Could not create metrics Service", "error", err.Error())
return
}

// CreateServiceMonitors will automatically create the prometheus-operator ServiceMonitor resources
// necessary to configure Prometheus to scrape metrics from this operator.
services := []*v1.Service{service}

// The ServiceMonitor is created in the same namespace where the operator is deployed
_, err = metrics.CreateServiceMonitors(cfg, operatorNs, services)
if err != nil {
log.Info("Could not create ServiceMonitor object", "error", err.Error())
// If this operator is deployed to a cluster without the prometheus-operator running, it will return
// ErrServiceMonitorNotPresent, which can be used to safely skip ServiceMonitor creation.
if err == metrics.ErrServiceMonitorNotPresent {
log.Info("Install prometheus-operator in your cluster to create ServiceMonitor objects", "error", err.Error())
}
}
}

// serveCRMetrics takes GVKs retrieved from watches and generates metrics based on those types.
// It serves those metrics on "http://metricsHost:operatorMetricsPort".
func serveCRMetrics(cfg *rest.Config, operatorNs string, gvks []schema.GroupVersionKind) error {
// The metrics will be generated from the namespaces which are returned here.
// NOTE that passing nil or an empty list of namespaces in GenerateAndServeCRMetrics will result in an error.
ns, err := kubemetrics.GetNamespacesForMetrics(operatorNs)
if err != nil {
return err
}

// Generate and serve custom resource specific metrics.
err = kubemetrics.GenerateAndServeCRMetrics(cfg, ns, gvks, metricsHost, operatorMetricsPort)
if err != nil {
return err
}
return nil
}

// getAnsibleDebugLog return the value from the ANSIBLE_DEBUG_LOGS it order to
// print the full Ansible logs
func getAnsibleDebugLog() bool {
const envVar = "ANSIBLE_DEBUG_LOGS"
val := false
if envVal, ok := os.LookupEnv(envVar); ok {
if i, err := strconv.ParseBool(envVal); err != nil {
log.Info("Could not parse environment variable as an boolean; using default value",
"envVar", envVar, "default", val)
} else {
val = i
}
} else if !ok {
log.Info("Environment variable not set; using default value", "envVar", envVar,
envVar, val)
}
return val
}
Loading