From b42479383b0a2bb0de13616ab9895abfb0776b18 Mon Sep 17 00:00:00 2001 From: "Dr. Stefan Schimanski" Date: Thu, 3 Aug 2023 08:59:53 +0200 Subject: [PATCH 1/6] reconciler/managed: add crossplane_resource_drift_seconds metric Signed-off-by: Dr. Stefan Schimanski --- go.mod | 5 +- go.sum | 2 + pkg/reconciler/managed/metrics.go | 100 +++++++++++++++++++++++++++ pkg/reconciler/managed/reconciler.go | 18 +++++ pkg/resource/fake/mocks.go | 5 ++ 5 files changed, 128 insertions(+), 2 deletions(-) create mode 100644 pkg/reconciler/managed/metrics.go diff --git a/go.mod b/go.mod index 8bab58e35..4d563cbf1 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,7 @@ require ( github.com/evanphx/json-patch v5.6.0+incompatible github.com/go-logr/logr v1.4.1 github.com/google/go-cmp v0.6.0 + github.com/prometheus/client_golang v1.18.0 github.com/spf13/afero v1.11.0 golang.org/x/time v0.5.0 google.golang.org/grpc v1.61.0 @@ -19,6 +20,7 @@ require ( k8s.io/apiextensions-apiserver v0.29.1 k8s.io/apimachinery v0.29.1 k8s.io/client-go v0.29.1 + k8s.io/component-base v0.29.1 k8s.io/klog/v2 v2.110.1 sigs.k8s.io/controller-runtime v0.17.0 sigs.k8s.io/controller-tools v0.14.0 @@ -31,6 +33,7 @@ require ( github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 // indirect github.com/Microsoft/go-winio v0.6.1 // indirect github.com/beorn7/perks v1.0.1 // indirect + github.com/blang/semver/v4 v4.0.0 // indirect github.com/bufbuild/protocompile v0.6.0 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/containerd/log v0.1.0 // indirect @@ -87,7 +90,6 @@ require ( github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pkg/profile v1.7.0 // indirect - github.com/prometheus/client_golang v1.18.0 // indirect github.com/prometheus/client_model v0.5.0 // indirect github.com/prometheus/common v0.45.0 // indirect github.com/prometheus/procfs v0.12.0 // indirect @@ -123,7 +125,6 @@ require ( gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect - k8s.io/component-base v0.29.1 // indirect k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00 // indirect k8s.io/utils v0.0.0-20230726121419-3b25d923346b // indirect sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect diff --git a/go.sum b/go.sum index 9a5cd7427..46cd0c925 100644 --- a/go.sum +++ b/go.sum @@ -10,6 +10,8 @@ github.com/Microsoft/go-winio v0.6.1 h1:9/kr64B9VUZrLm5YYwbGtUJnMgqWVOdUAXu6Migc github.com/Microsoft/go-winio v0.6.1/go.mod h1:LRdKpFKfdobln8UmuiYcKPot9D2v6svN5+sAH+4kjUM= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/blang/semver/v4 v4.0.0 h1:1PFHFE6yCCTv8C1TeyNNarDzntLi7wMI5i/pzqYIsAM= +github.com/blang/semver/v4 v4.0.0/go.mod h1:IbckMUScFkM3pff0VJDNKRiT6TG/YpiHIM2yvyW5YoQ= github.com/bufbuild/buf v1.27.2 h1:uX2kvZfPfRoOsrxUW4LwpykSyH+wI5dUnIG0QWHDCCU= github.com/bufbuild/buf v1.27.2/go.mod h1:7RImDhFDqhEsdK5wbuMhoVSlnrMggGGcd3s9WozvHtM= github.com/bufbuild/protocompile v0.6.0 h1:Uu7WiSQ6Yj9DbkdnOe7U4mNKp58y9WDMKDn28/ZlunY= diff --git a/pkg/reconciler/managed/metrics.go b/pkg/reconciler/managed/metrics.go new file mode 100644 index 000000000..a9b07f85c --- /dev/null +++ b/pkg/reconciler/managed/metrics.go @@ -0,0 +1,100 @@ +/* +Copyright 2023 The Crossplane Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package managed + +import ( + "context" + "sync" + "time" + + "github.com/prometheus/client_golang/prometheus" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/tools/cache" + kmetrics "k8s.io/component-base/metrics" + "sigs.k8s.io/controller-runtime/pkg/cluster" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/metrics" + + "github.com/crossplane/crossplane-runtime/pkg/errors" + "github.com/crossplane/crossplane-runtime/pkg/resource" +) + +func init() { + metrics.Registry.MustRegister(drift) +} + +var subSystem = "crossplane" + +var ( + drift = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Subsystem: subSystem, + Name: "resource_drift_seconds", + Help: "ALPHA: How long since the previous successful reconcile when a resource was found to be out of sync; excludes restart of the provider", + Buckets: kmetrics.ExponentialBuckets(10e-9, 10, 10), + }, []string{"group", "kind"}) +) + +// driftRecorder records the time since the last observation of a resource +// and records the time since on update as a metric. This represents an upper +// bound for the duration the drift existed. +type driftRecorder struct { + lastObservation sync.Map + gvk schema.GroupVersionKind + + cluster cluster.Cluster +} + +var _ manager.Runnable = &driftRecorder{} + +func (r *driftRecorder) Start(ctx context.Context) error { + inf, err := r.cluster.GetCache().GetInformerForKind(ctx, r.gvk) + if err != nil { + return errors.Wrapf(err, "cannot get informer for drift recorder for resource %s", r.gvk) + } + + registered, err := inf.AddEventHandler(cache.ResourceEventHandlerFuncs{ + DeleteFunc: func(obj interface{}) { + if final, ok := obj.(cache.DeletedFinalStateUnknown); ok { + obj = final.Obj + } + managed := obj.(resource.Managed) + r.lastObservation.Delete(managed.GetName()) + }, + }) + if err != nil { + return errors.Wrap(err, "cannot add delete event handler to informer for drift recorder") + } + defer inf.RemoveEventHandler(registered) //nolint:errcheck // this happens on destruction. We cannot do anything anyway. + + <-ctx.Done() + + return nil +} + +func (r *driftRecorder) recordUnchanged(name string) { + r.lastObservation.Store(name, time.Now()) +} + +func (r *driftRecorder) recordUpdate(name string) { + last, ok := r.lastObservation.Load(name) + if !ok { + return + } + drift.WithLabelValues(r.gvk.Group, r.gvk.Kind).Observe(time.Since(last.(time.Time)).Seconds()) + + r.lastObservation.Store(name, time.Now()) +} diff --git a/pkg/reconciler/managed/reconciler.go b/pkg/reconciler/managed/reconciler.go index e0325eae7..74c1a5873 100644 --- a/pkg/reconciler/managed/reconciler.go +++ b/pkg/reconciler/managed/reconciler.go @@ -476,6 +476,8 @@ type Reconciler struct { features feature.Flags + driftRecorder driftRecorder + // The below structs embed the set of interfaces used to implement the // managed resource reconciler. We do this primarily for readability, so // that the reconciler logic reads r.external.Connect(), @@ -697,6 +699,7 @@ func NewReconciler(m manager.Manager, of resource.ManagedKind, o ...ReconcilerOp creationGracePeriod: defaultGracePeriod, timeout: reconcileTimeout, managed: defaultMRManaged(m), + driftRecorder: driftRecorder{cluster: m}, external: defaultMRExternal(), supportedManagementPolicies: defaultSupportedManagementPolicies(), log: logging.NewNopLogger(), @@ -707,6 +710,11 @@ func NewReconciler(m manager.Manager, of resource.ManagedKind, o ...ReconcilerOp ro(r) } + if err := m.Add(&r.driftRecorder); err != nil { + r.log.Info("unable to register drift recorder with controller manager", "error", err) + // no way to recover from this + } + return r } @@ -1150,6 +1158,13 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (resu reconcileAfter := r.pollIntervalHook(managed, r.pollInterval) log.Debug("External resource is up to date", "requeue-after", time.Now().Add(reconcileAfter)) managed.SetConditions(xpv1.ReconcileSuccess()) + + // record that we intentionally did not update the managed resource + // because no drift was detected. We call this so late in the reconcile + // because all the cases above could contribute (for different reasons) + // that the external object would not have been updated. + r.driftRecorder.recordUnchanged(managed.GetName()) + return reconcile.Result{RequeueAfter: reconcileAfter}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus) } @@ -1178,6 +1193,9 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (resu return reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus) } + // record the drift after the successful update. + r.driftRecorder.recordUpdate(managed.GetName()) + if _, err := r.managed.PublishConnection(ctx, managed, update.ConnectionDetails); err != nil { // If this is the first time we encounter this issue we'll be requeued // implicitly when we update our status with the new error condition. If diff --git a/pkg/resource/fake/mocks.go b/pkg/resource/fake/mocks.go index 38d9310c5..aae5ac28f 100644 --- a/pkg/resource/fake/mocks.go +++ b/pkg/resource/fake/mocks.go @@ -478,6 +478,11 @@ func (m *Manager) GetRESTMapper() meta.RESTMapper { return m.RESTMapper } // GetLogger returns the logger. func (m *Manager) GetLogger() logr.Logger { return m.Logger } +// Add adds a runnable to the manager. +func (m *Manager) Add(_ manager.Runnable) error { + return nil // do nothing +} + // GV returns a mock schema.GroupVersion. var GV = schema.GroupVersion{Group: "g", Version: "v"} //nolint:gochecknoglobals // We treat this as a constant. From 64074c3e5da03c47aa594d2baad046f2e7a7da07 Mon Sep 17 00:00:00 2001 From: ezgidemirel Date: Tue, 2 Apr 2024 17:19:35 +0300 Subject: [PATCH 2/6] Add High Level MR metrics Signed-off-by: ezgidemirel --- pkg/reconciler/managed/metrics.go | 144 +++++++++++++++++++++++---- pkg/reconciler/managed/reconciler.go | 57 +++++++++-- pkg/resource/fake/mocks.go | 5 - 3 files changed, 172 insertions(+), 34 deletions(-) diff --git a/pkg/reconciler/managed/metrics.go b/pkg/reconciler/managed/metrics.go index a9b07f85c..d9b913309 100644 --- a/pkg/reconciler/managed/metrics.go +++ b/pkg/reconciler/managed/metrics.go @@ -22,48 +22,84 @@ import ( "time" "github.com/prometheus/client_golang/prometheus" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/tools/cache" kmetrics "k8s.io/component-base/metrics" "sigs.k8s.io/controller-runtime/pkg/cluster" - "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/metrics" + xpv1 "github.com/crossplane/crossplane-runtime/apis/common/v1" "github.com/crossplane/crossplane-runtime/pkg/errors" "github.com/crossplane/crossplane-runtime/pkg/resource" ) -func init() { - metrics.Registry.MustRegister(drift) +func init() { //nolint:gochecknoinits // metrics should be registered once + metrics.Registry.MustRegister(drift, mr, mrReady, mrSynced, mrDetected, mrReadyDuration, mrDeletionDuration) } -var subSystem = "crossplane" +const subSystem = "crossplane" var ( - drift = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + drift = prometheus.NewHistogramVec(prometheus.HistogramOpts{ //nolint:gochecknoglobals // metrics should be registered once in init Subsystem: subSystem, Name: "resource_drift_seconds", Help: "ALPHA: How long since the previous successful reconcile when a resource was found to be out of sync; excludes restart of the provider", Buckets: kmetrics.ExponentialBuckets(10e-9, 10, 10), }, []string{"group", "kind"}) + + mr = prometheus.NewGaugeVec(prometheus.GaugeOpts{ //nolint:gochecknoglobals // metrics should be registered once in init + Subsystem: subSystem, + Name: "managed_resource_created", + Help: "The number of managed resources created", + }, []string{"gvk", "name", "claim", "composite"}) + + mrReady = prometheus.NewGaugeVec(prometheus.GaugeOpts{ //nolint:gochecknoglobals // metrics should be registered once in init + Subsystem: subSystem, + Name: "managed_resource_ready", + Help: "The number of managed resources in Ready=True state", + }, []string{"gvk", "name", "claim", "composite"}) + + mrReadyDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{ //nolint:gochecknoglobals // metrics should be registered once in init + Subsystem: subSystem, + Name: "managed_resource_ready_duration_seconds", + Help: "The time it took for a managed resource to become ready first time after creation", + Buckets: []float64{1, 5, 10, 15, 30, 60, 120, 300, 600, 1800, 3600}, + }, []string{"gvk", "name", "claim", "composite"}) + + mrDetected = prometheus.NewHistogramVec(prometheus.HistogramOpts{ //nolint:gochecknoglobals // metrics should be registered once in init + Subsystem: subSystem, + Name: "managed_resource_detected_time_seconds", + Help: "The time it took for a managed resource to be detected by the controller", + Buckets: kmetrics.ExponentialBuckets(10e-9, 10, 10), + }, []string{"gvk", "name", "claim", "composite"}) + + mrSynced = prometheus.NewGaugeVec(prometheus.GaugeOpts{ //nolint:gochecknoglobals // metrics should be registered once in init + Subsystem: subSystem, + Name: "managed_resource_synced", + Help: "The number of managed resources in Synced=True state", + }, []string{"gvk", "name", "claim", "composite"}) + + mrDeletionDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{ //nolint:gochecknoglobals // metrics should be registered once in init + Subsystem: subSystem, + Name: "managed_resource_deletion_seconds", + Help: "The time it took for a managed resource to be deleted", + Buckets: []float64{1, 5, 10, 15, 30, 60, 120, 300, 600, 1800, 3600}, + }, []string{"gvk", "name", "claim", "composite"}) ) -// driftRecorder records the time since the last observation of a resource -// and records the time since on update as a metric. This represents an upper -// bound for the duration the drift existed. -type driftRecorder struct { - lastObservation sync.Map - gvk schema.GroupVersionKind +type metricRecorder struct { + firstObservation sync.Map + lastObservation sync.Map cluster cluster.Cluster + gvk schema.GroupVersionKind } -var _ manager.Runnable = &driftRecorder{} - -func (r *driftRecorder) Start(ctx context.Context) error { +func (r *metricRecorder) Start(ctx context.Context) error { inf, err := r.cluster.GetCache().GetInformerForKind(ctx, r.gvk) if err != nil { - return errors.Wrapf(err, "cannot get informer for drift recorder for resource %s", r.gvk) + return errors.Wrapf(err, "cannot get informer for metric recorder for resource %s", r.gvk) } registered, err := inf.AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -71,12 +107,16 @@ func (r *driftRecorder) Start(ctx context.Context) error { if final, ok := obj.(cache.DeletedFinalStateUnknown); ok { obj = final.Obj } - managed := obj.(resource.Managed) + managed, ok := obj.(resource.Managed) + if !ok { + return + } + r.firstObservation.Delete(managed.GetName()) r.lastObservation.Delete(managed.GetName()) }, }) if err != nil { - return errors.Wrap(err, "cannot add delete event handler to informer for drift recorder") + return errors.Wrap(err, "cannot add delete event handler to informer for metric recorder") } defer inf.RemoveEventHandler(registered) //nolint:errcheck // this happens on destruction. We cannot do anything anyway. @@ -85,16 +125,80 @@ func (r *driftRecorder) Start(ctx context.Context) error { return nil } -func (r *driftRecorder) recordUnchanged(name string) { +func (r *metricRecorder) recordUnchanged(name string) { r.lastObservation.Store(name, time.Now()) } -func (r *driftRecorder) recordUpdate(name string) { +func (r *metricRecorder) recordUpdate(name string) { last, ok := r.lastObservation.Load(name) if !ok { return } - drift.WithLabelValues(r.gvk.Group, r.gvk.Kind).Observe(time.Since(last.(time.Time)).Seconds()) + lt, ok := last.(time.Time) + if !ok { + return + } + + drift.WithLabelValues(r.gvk.Group, r.gvk.Kind).Observe(time.Since(lt).Seconds()) r.lastObservation.Store(name, time.Now()) } + +func (r *metricRecorder) recordDetected(managed resource.Managed) { + if managed.GetCondition(xpv1.TypeSynced).Status == corev1.ConditionUnknown { + mr.With(getMRMetricLabels(managed)).Set(1) + mrDetected.With(getMRMetricLabels(managed)).Observe(time.Since(managed.GetCreationTimestamp().Time).Seconds()) + r.firstObservation.Store(managed.GetName(), time.Now()) // this is the first time we reconciled on this resource + } +} + +func (r *metricRecorder) recordSyncedState(managed resource.Managed, v float64) { + mrSynced.With(getMRMetricLabels(managed)).Set(v) +} + +func (r *metricRecorder) recordNotReady(managed resource.Managed) { + mrReady.With(getMRMetricLabels(managed)).Set(0) +} + +func (r *metricRecorder) recordDeleted(managed resource.Managed) { + labels := getMRMetricLabels(managed) + + if managed.GetDeletionTimestamp() != nil { + mrDeletionDuration.With(getMRMetricLabels(managed)).Observe(time.Since(managed.GetDeletionTimestamp().Time).Seconds()) + } + mr.With(labels).Set(0) + mrReady.With(labels).Set(0) + mrSynced.With(labels).Set(0) +} + +func (r *metricRecorder) recordUpToDate(managed resource.Managed) { + mrSynced.With(getMRMetricLabels(managed)).Set(1) + // Note that providers may set the ready condition to "True", so we need + // to check the value here to send the ready metric + if managed.GetCondition(xpv1.TypeReady).Status == corev1.ConditionTrue { + mrReady.With(getMRMetricLabels(managed)).Set(1) + name := managed.GetName() + _, ok := r.firstObservation.Load(name) // This map is used to identify the first time to readiness + if !ok { + return + } + + mrReadyDuration.With(getMRMetricLabels(managed)).Observe(time.Since(managed.GetCreationTimestamp().Time).Seconds()) + r.firstObservation.Delete(managed.GetName()) + } +} + +func getMRMetricLabels(managed resource.Managed) prometheus.Labels { + l := prometheus.Labels{ + "gvk": managed.GetObjectKind().GroupVersionKind().String(), + "name": managed.GetName(), + "claim": "", + "composite": managed.GetLabels()["crossplane.io/composite"], + } + + if managed.GetLabels()["crossplane.io/claim-namespace"] != "" && managed.GetLabels()["crossplane.io/claim-name"] != "" { + l["claim"] = managed.GetLabels()["crossplane.io/claim-namespace"] + "/" + managed.GetLabels()["crossplane.io/claim-name"] + } + + return l +} diff --git a/pkg/reconciler/managed/reconciler.go b/pkg/reconciler/managed/reconciler.go index 74c1a5873..ecf699686 100644 --- a/pkg/reconciler/managed/reconciler.go +++ b/pkg/reconciler/managed/reconciler.go @@ -476,7 +476,7 @@ type Reconciler struct { features feature.Flags - driftRecorder driftRecorder + metricRecorder metricRecorder // The below structs embed the set of interfaces used to implement the // managed resource reconciler. We do this primarily for readability, so @@ -699,7 +699,7 @@ func NewReconciler(m manager.Manager, of resource.ManagedKind, o ...ReconcilerOp creationGracePeriod: defaultGracePeriod, timeout: reconcileTimeout, managed: defaultMRManaged(m), - driftRecorder: driftRecorder{cluster: m}, + metricRecorder: metricRecorder{cluster: m, gvk: schema.GroupVersionKind(of)}, external: defaultMRExternal(), supportedManagementPolicies: defaultSupportedManagementPolicies(), log: logging.NewNopLogger(), @@ -710,11 +710,6 @@ func NewReconciler(m manager.Manager, of resource.ManagedKind, o ...ReconcilerOp ro(r) } - if err := m.Add(&r.driftRecorder); err != nil { - r.log.Info("unable to register drift recorder with controller manager", "error", err) - // no way to recover from this - } - return r } @@ -739,9 +734,12 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (resu // There's no need to requeue if we no longer exist. Otherwise we'll be // requeued implicitly because we return an error. log.Debug("Cannot get managed resource", "error", err) + r.metricRecorder.recordDeleted(managed) return reconcile.Result{}, errors.Wrap(resource.IgnoreNotFound(err), errGetManaged) } + r.metricRecorder.recordDetected(managed) + record := r.record.WithAnnotations("external-name", meta.GetExternalName(managed)) log = log.WithValues( "uid", managed.GetUID(), @@ -767,6 +765,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (resu record.Event(managed, event.Normal(reasonReconciliationPaused, "Reconciliation is paused either through the `spec.managementPolicies` or the pause annotation", "annotation", meta.AnnotationKeyReconciliationPaused)) managed.SetConditions(xpv1.ReconcilePaused()) + r.metricRecorder.recordSyncedState(managed, 0) // if the pause annotation is removed or the management policies changed, we will have a chance to reconcile // again and resume and if status update fails, we will reconcile again to retry to update the status return reconcile.Result{}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus) @@ -787,6 +786,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (resu } record.Event(managed, event.Warning(reasonManagementPolicyInvalid, err)) managed.SetConditions(xpv1.ReconcileError(err)) + r.metricRecorder.recordSyncedState(managed, 0) return reconcile.Result{}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus) } @@ -812,6 +812,8 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (resu } record.Event(managed, event.Warning(reasonCannotUnpublish, err)) managed.SetConditions(xpv1.Deleting(), xpv1.ReconcileError(err)) + r.metricRecorder.recordNotReady(managed) + r.metricRecorder.recordSyncedState(managed, 0) return reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus) } if err := r.managed.RemoveFinalizer(ctx, managed); err != nil { @@ -824,6 +826,8 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (resu return reconcile.Result{Requeue: true}, nil } managed.SetConditions(xpv1.Deleting(), xpv1.ReconcileError(err)) + r.metricRecorder.recordSyncedState(managed, 0) + r.metricRecorder.recordNotReady(managed) return reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus) } @@ -831,6 +835,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (resu // details and removed our finalizer. If we assume we were the only // controller that added a finalizer to this resource then it should no // longer exist and thus there is no point trying to update its status. + r.metricRecorder.recordDeleted(managed) log.Debug("Successfully deleted managed resource") return reconcile.Result{Requeue: false}, nil } @@ -845,6 +850,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (resu } record.Event(managed, event.Warning(reasonCannotInitialize, err)) managed.SetConditions(xpv1.ReconcileError(err)) + r.metricRecorder.recordSyncedState(managed, 0) return reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus) } @@ -856,6 +862,8 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (resu log.Debug(errCreateIncomplete) record.Event(managed, event.Warning(reasonCannotInitialize, errors.New(errCreateIncomplete))) managed.SetConditions(xpv1.Creating(), xpv1.ReconcileError(errors.New(errCreateIncomplete))) + r.metricRecorder.recordSyncedState(managed, 0) + r.metricRecorder.recordNotReady(managed) return reconcile.Result{Requeue: false}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus) } @@ -881,6 +889,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (resu } record.Event(managed, event.Warning(reasonCannotResolveRefs, err)) managed.SetConditions(xpv1.ReconcileError(err)) + r.metricRecorder.recordSyncedState(managed, 0) return reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus) } } @@ -898,6 +907,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (resu } record.Event(managed, event.Warning(reasonCannotConnect, err)) managed.SetConditions(xpv1.ReconcileError(errors.Wrap(err, errReconcileConnect))) + r.metricRecorder.recordSyncedState(managed, 0) return reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus) } defer func() { @@ -921,6 +931,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (resu } record.Event(managed, event.Warning(reasonCannotObserve, err)) managed.SetConditions(xpv1.ReconcileError(errors.Wrap(err, errReconcileObserve))) + r.metricRecorder.recordSyncedState(managed, 0) return reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus) } @@ -929,6 +940,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (resu if !observation.ResourceExists && policy.ShouldOnlyObserve() { record.Event(managed, event.Warning(reasonCannotObserve, errors.New(errExternalResourceNotExist))) managed.SetConditions(xpv1.ReconcileError(errors.Wrap(errors.New(errExternalResourceNotExist), errReconcileObserve))) + r.metricRecorder.recordSyncedState(managed, 0) return reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus) } @@ -957,6 +969,8 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (resu log.Debug("Cannot delete external resource", "error", err) record.Event(managed, event.Warning(reasonCannotDelete, err)) managed.SetConditions(xpv1.Deleting(), xpv1.ReconcileError(errors.Wrap(err, errReconcileDelete))) + r.metricRecorder.recordSyncedState(managed, 0) + r.metricRecorder.recordNotReady(managed) return reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus) } @@ -970,6 +984,8 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (resu log.Debug("Successfully requested deletion of external resource") record.Event(managed, event.Normal(reasonDeleted, "Successfully requested deletion of external resource")) managed.SetConditions(xpv1.Deleting(), xpv1.ReconcileSuccess()) + r.metricRecorder.recordSyncedState(managed, 1) + r.metricRecorder.recordNotReady(managed) return reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus) } if err := r.managed.UnpublishConnection(ctx, managed, observation.ConnectionDetails); err != nil { @@ -983,6 +999,8 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (resu } record.Event(managed, event.Warning(reasonCannotUnpublish, err)) managed.SetConditions(xpv1.Deleting(), xpv1.ReconcileError(err)) + r.metricRecorder.recordSyncedState(managed, 0) + r.metricRecorder.recordNotReady(managed) return reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus) } if err := r.managed.RemoveFinalizer(ctx, managed); err != nil { @@ -995,6 +1013,8 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (resu return reconcile.Result{Requeue: true}, nil } managed.SetConditions(xpv1.Deleting(), xpv1.ReconcileError(err)) + r.metricRecorder.recordSyncedState(managed, 0) + r.metricRecorder.recordNotReady(managed) return reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus) } @@ -1002,6 +1022,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (resu // removed our finalizer. If we assume we were the only controller that // added a finalizer to this resource then it should no longer exist and // thus there is no point trying to update its status. + r.metricRecorder.recordDeleted(managed) log.Debug("Successfully deleted managed resource") return reconcile.Result{Requeue: false}, nil } @@ -1016,6 +1037,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (resu } record.Event(managed, event.Warning(reasonCannotPublish, err)) managed.SetConditions(xpv1.ReconcileError(err)) + r.metricRecorder.recordSyncedState(managed, 0) return reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus) } @@ -1028,6 +1050,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (resu return reconcile.Result{Requeue: true}, nil } managed.SetConditions(xpv1.ReconcileError(err)) + r.metricRecorder.recordSyncedState(managed, 0) return reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus) } @@ -1047,6 +1070,8 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (resu } record.Event(managed, event.Warning(reasonCannotUpdateManaged, errors.Wrap(err, errUpdateManaged))) managed.SetConditions(xpv1.Creating(), xpv1.ReconcileError(errors.Wrap(err, errUpdateManaged))) + r.metricRecorder.recordSyncedState(managed, 0) + r.metricRecorder.recordNotReady(managed) return reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus) } @@ -1081,6 +1106,8 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (resu } managed.SetConditions(xpv1.Creating(), xpv1.ReconcileError(errors.Wrap(err, errReconcileCreate))) + r.metricRecorder.recordNotReady(managed) + r.metricRecorder.recordSyncedState(managed, 0) return reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus) } @@ -1106,6 +1133,8 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (resu } record.Event(managed, event.Warning(reasonCannotUpdateManaged, errors.Wrap(err, errUpdateManagedAnnotations))) managed.SetConditions(xpv1.Creating(), xpv1.ReconcileError(errors.Wrap(err, errUpdateManagedAnnotations))) + r.metricRecorder.recordNotReady(managed) + r.metricRecorder.recordSyncedState(managed, 0) return reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus) } @@ -1119,6 +1148,8 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (resu } record.Event(managed, event.Warning(reasonCannotPublish, err)) managed.SetConditions(xpv1.Creating(), xpv1.ReconcileError(err)) + r.metricRecorder.recordNotReady(managed) + r.metricRecorder.recordSyncedState(managed, 0) return reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus) } @@ -1129,6 +1160,8 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (resu log.Debug("Successfully requested creation of external resource") record.Event(managed, event.Normal(reasonCreated, "Successfully requested creation of external resource")) managed.SetConditions(xpv1.Creating(), xpv1.ReconcileSuccess()) + r.metricRecorder.recordNotReady(managed) + r.metricRecorder.recordSyncedState(managed, 1) return reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus) } @@ -1144,6 +1177,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (resu log.Debug(errUpdateManaged, "error", err) record.Event(managed, event.Warning(reasonCannotUpdateManaged, err)) managed.SetConditions(xpv1.ReconcileError(errors.Wrap(err, errUpdateManaged))) + r.metricRecorder.recordSyncedState(managed, 0) return reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus) } } @@ -1158,12 +1192,13 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (resu reconcileAfter := r.pollIntervalHook(managed, r.pollInterval) log.Debug("External resource is up to date", "requeue-after", time.Now().Add(reconcileAfter)) managed.SetConditions(xpv1.ReconcileSuccess()) + r.metricRecorder.recordUpToDate(managed) // record that we intentionally did not update the managed resource // because no drift was detected. We call this so late in the reconcile // because all the cases above could contribute (for different reasons) // that the external object would not have been updated. - r.driftRecorder.recordUnchanged(managed.GetName()) + r.metricRecorder.recordUnchanged(managed.GetName()) return reconcile.Result{RequeueAfter: reconcileAfter}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus) } @@ -1177,6 +1212,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (resu reconcileAfter := r.pollIntervalHook(managed, r.pollInterval) log.Debug("Skipping update due to managementPolicies. Reconciliation succeeded", "requeue-after", time.Now().Add(reconcileAfter)) managed.SetConditions(xpv1.ReconcileSuccess()) + r.metricRecorder.recordSyncedState(managed, 1) return reconcile.Result{RequeueAfter: reconcileAfter}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus) } @@ -1190,11 +1226,12 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (resu log.Debug("Cannot update external resource") record.Event(managed, event.Warning(reasonCannotUpdate, err)) managed.SetConditions(xpv1.ReconcileError(errors.Wrap(err, errReconcileUpdate))) + r.metricRecorder.recordSyncedState(managed, 0) return reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus) } // record the drift after the successful update. - r.driftRecorder.recordUpdate(managed.GetName()) + r.metricRecorder.recordUpdate(managed.GetName()) if _, err := r.managed.PublishConnection(ctx, managed, update.ConnectionDetails); err != nil { // If this is the first time we encounter this issue we'll be requeued @@ -1203,6 +1240,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (resu log.Debug("Cannot publish connection details", "error", err) record.Event(managed, event.Warning(reasonCannotPublish, err)) managed.SetConditions(xpv1.ReconcileError(err)) + r.metricRecorder.recordSyncedState(managed, 0) return reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus) } @@ -1215,5 +1253,6 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (resu log.Debug("Successfully requested update of external resource", "requeue-after", time.Now().Add(reconcileAfter)) record.Event(managed, event.Normal(reasonUpdated, "Successfully requested update of external resource")) managed.SetConditions(xpv1.ReconcileSuccess()) + r.metricRecorder.recordSyncedState(managed, 1) return reconcile.Result{RequeueAfter: reconcileAfter}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus) } diff --git a/pkg/resource/fake/mocks.go b/pkg/resource/fake/mocks.go index aae5ac28f..38d9310c5 100644 --- a/pkg/resource/fake/mocks.go +++ b/pkg/resource/fake/mocks.go @@ -478,11 +478,6 @@ func (m *Manager) GetRESTMapper() meta.RESTMapper { return m.RESTMapper } // GetLogger returns the logger. func (m *Manager) GetLogger() logr.Logger { return m.Logger } -// Add adds a runnable to the manager. -func (m *Manager) Add(_ manager.Runnable) error { - return nil // do nothing -} - // GV returns a mock schema.GroupVersion. var GV = schema.GroupVersion{Group: "g", Version: "v"} //nolint:gochecknoglobals // We treat this as a constant. From af6588856c2e2d10be27ec461f1d880c433c6b30 Mon Sep 17 00:00:00 2001 From: ezgidemirel Date: Mon, 8 Apr 2024 18:48:05 +0300 Subject: [PATCH 3/6] change init pattern and move state metrics to a go routine Signed-off-by: ezgidemirel --- pkg/controller/options.go | 5 + pkg/reconciler/managed/metrics.go | 224 +++++++++++------------- pkg/reconciler/managed/reconciler.go | 73 +++----- pkg/reconciler/managed/state_metrics.go | 183 +++++++++++++++++++ 4 files changed, 317 insertions(+), 168 deletions(-) create mode 100644 pkg/reconciler/managed/state_metrics.go diff --git a/pkg/controller/options.go b/pkg/controller/options.go index 630216979..ee55c1ae2 100644 --- a/pkg/controller/options.go +++ b/pkg/controller/options.go @@ -26,6 +26,7 @@ import ( "github.com/crossplane/crossplane-runtime/pkg/feature" "github.com/crossplane/crossplane-runtime/pkg/logging" "github.com/crossplane/crossplane-runtime/pkg/ratelimiter" + "github.com/crossplane/crossplane-runtime/pkg/reconciler/managed" ) // DefaultOptions returns a functional set of options with conservative @@ -61,6 +62,10 @@ type Options struct { // ESSOptions for External Secret Stores. ESSOptions *ESSOptions + + MetricRecorder managed.MetricRecorder + + StateRecorder managed.StateRecorder } // ForControllerRuntime extracts options for controller-runtime. diff --git a/pkg/reconciler/managed/metrics.go b/pkg/reconciler/managed/metrics.go index d9b913309..0255ae34d 100644 --- a/pkg/reconciler/managed/metrics.go +++ b/pkg/reconciler/managed/metrics.go @@ -1,5 +1,5 @@ /* -Copyright 2023 The Crossplane Authors. +Copyright 2024 The Crossplane Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -17,119 +17,105 @@ limitations under the License. package managed import ( - "context" "sync" "time" "github.com/prometheus/client_golang/prometheus" corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/client-go/tools/cache" kmetrics "k8s.io/component-base/metrics" - "sigs.k8s.io/controller-runtime/pkg/cluster" - "sigs.k8s.io/controller-runtime/pkg/metrics" xpv1 "github.com/crossplane/crossplane-runtime/apis/common/v1" - "github.com/crossplane/crossplane-runtime/pkg/errors" "github.com/crossplane/crossplane-runtime/pkg/resource" ) -func init() { //nolint:gochecknoinits // metrics should be registered once - metrics.Registry.MustRegister(drift, mr, mrReady, mrSynced, mrDetected, mrReadyDuration, mrDeletionDuration) -} - const subSystem = "crossplane" -var ( - drift = prometheus.NewHistogramVec(prometheus.HistogramOpts{ //nolint:gochecknoglobals // metrics should be registered once in init - Subsystem: subSystem, - Name: "resource_drift_seconds", - Help: "ALPHA: How long since the previous successful reconcile when a resource was found to be out of sync; excludes restart of the provider", - Buckets: kmetrics.ExponentialBuckets(10e-9, 10, 10), - }, []string{"group", "kind"}) - - mr = prometheus.NewGaugeVec(prometheus.GaugeOpts{ //nolint:gochecknoglobals // metrics should be registered once in init - Subsystem: subSystem, - Name: "managed_resource_created", - Help: "The number of managed resources created", - }, []string{"gvk", "name", "claim", "composite"}) - - mrReady = prometheus.NewGaugeVec(prometheus.GaugeOpts{ //nolint:gochecknoglobals // metrics should be registered once in init - Subsystem: subSystem, - Name: "managed_resource_ready", - Help: "The number of managed resources in Ready=True state", - }, []string{"gvk", "name", "claim", "composite"}) - - mrReadyDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{ //nolint:gochecknoglobals // metrics should be registered once in init - Subsystem: subSystem, - Name: "managed_resource_ready_duration_seconds", - Help: "The time it took for a managed resource to become ready first time after creation", - Buckets: []float64{1, 5, 10, 15, 30, 60, 120, 300, 600, 1800, 3600}, - }, []string{"gvk", "name", "claim", "composite"}) - - mrDetected = prometheus.NewHistogramVec(prometheus.HistogramOpts{ //nolint:gochecknoglobals // metrics should be registered once in init - Subsystem: subSystem, - Name: "managed_resource_detected_time_seconds", - Help: "The time it took for a managed resource to be detected by the controller", - Buckets: kmetrics.ExponentialBuckets(10e-9, 10, 10), - }, []string{"gvk", "name", "claim", "composite"}) - - mrSynced = prometheus.NewGaugeVec(prometheus.GaugeOpts{ //nolint:gochecknoglobals // metrics should be registered once in init - Subsystem: subSystem, - Name: "managed_resource_synced", - Help: "The number of managed resources in Synced=True state", - }, []string{"gvk", "name", "claim", "composite"}) - - mrDeletionDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{ //nolint:gochecknoglobals // metrics should be registered once in init - Subsystem: subSystem, - Name: "managed_resource_deletion_seconds", - Help: "The time it took for a managed resource to be deleted", - Buckets: []float64{1, 5, 10, 15, 30, 60, 120, 300, 600, 1800, 3600}, - }, []string{"gvk", "name", "claim", "composite"}) -) +// MetricRecorder records the managed resource metrics. +type MetricRecorder interface { //nolint:interfacebloat // The first two methods are coming from Prometheus + Describe(ch chan<- *prometheus.Desc) + Collect(ch chan<- prometheus.Metric) + + recordUnchanged(name string) + recordFirstTimeReconciled(managed resource.Managed) + recordFirstTimeReady(managed resource.Managed) + recordDrift(managed resource.Managed) + recordDeleted(managed resource.Managed) +} -type metricRecorder struct { +// MRMetricRecorder records the lifecycle metrics of managed resources. +type MRMetricRecorder struct { firstObservation sync.Map lastObservation sync.Map - cluster cluster.Cluster - gvk schema.GroupVersionKind + mrDetected *prometheus.HistogramVec + mrFirstTimeReady *prometheus.HistogramVec + mrDeletion *prometheus.HistogramVec + mrDrift *prometheus.HistogramVec } -func (r *metricRecorder) Start(ctx context.Context) error { - inf, err := r.cluster.GetCache().GetInformerForKind(ctx, r.gvk) - if err != nil { - return errors.Wrapf(err, "cannot get informer for metric recorder for resource %s", r.gvk) +// NewMRMetricRecorder returns a new MRMetricRecorder which records metrics for managed resources. +func NewMRMetricRecorder() *MRMetricRecorder { + return &MRMetricRecorder{ + mrDetected: prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Subsystem: subSystem, + Name: "managed_resource_time_to_first_reconcile_seconds", + Help: "The time it took for a managed resource to be detected by the controller", + Buckets: kmetrics.ExponentialBuckets(10e-9, 10, 10), + }, []string{"gvk", "claim", "composite"}), + mrFirstTimeReady: prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Subsystem: subSystem, + Name: "managed_resource_first_time_to_readiness_seconds", + Help: "The time it took for a managed resource to become ready first time after creation", + Buckets: []float64{1, 5, 10, 15, 30, 60, 120, 300, 600, 1800, 3600}, + }, []string{"gvk", "claim", "composite"}), + mrDeletion: prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Subsystem: subSystem, + Name: "managed_resource_deletion_seconds", + Help: "The time it took for a managed resource to be deleted", + Buckets: []float64{1, 5, 10, 15, 30, 60, 120, 300, 600, 1800, 3600}, + }, []string{"gvk", "claim", "composite"}), + mrDrift: prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Subsystem: subSystem, + Name: "managed_resource_drift_seconds", + Help: "ALPHA: How long since the previous successful reconcile when a resource was found to be out of sync; excludes restart of the provider", + Buckets: kmetrics.ExponentialBuckets(10e-9, 10, 10), + }, []string{"gvk", "claim", "composite"}), } +} - registered, err := inf.AddEventHandler(cache.ResourceEventHandlerFuncs{ - DeleteFunc: func(obj interface{}) { - if final, ok := obj.(cache.DeletedFinalStateUnknown); ok { - obj = final.Obj - } - managed, ok := obj.(resource.Managed) - if !ok { - return - } - r.firstObservation.Delete(managed.GetName()) - r.lastObservation.Delete(managed.GetName()) - }, - }) - if err != nil { - return errors.Wrap(err, "cannot add delete event handler to informer for metric recorder") - } - defer inf.RemoveEventHandler(registered) //nolint:errcheck // this happens on destruction. We cannot do anything anyway. - - <-ctx.Done() +// Describe sends the super-set of all possible descriptors of metrics +// collected by this Collector to the provided channel and returns once +// the last descriptor has been sent. +func (r *MRMetricRecorder) Describe(ch chan<- *prometheus.Desc) { + r.mrDetected.Describe(ch) + r.mrFirstTimeReady.Describe(ch) + r.mrDeletion.Describe(ch) + r.mrDrift.Describe(ch) +} - return nil +// Collect is called by the Prometheus registry when collecting +// metrics. The implementation sends each collected metric via the +// provided channel and returns once the last metric has been sent. +func (r *MRMetricRecorder) Collect(ch chan<- prometheus.Metric) { + r.mrDetected.Collect(ch) + r.mrFirstTimeReady.Collect(ch) + r.mrDeletion.Collect(ch) + r.mrDrift.Collect(ch) } -func (r *metricRecorder) recordUnchanged(name string) { +func (r *MRMetricRecorder) recordUnchanged(name string) { r.lastObservation.Store(name, time.Now()) } -func (r *metricRecorder) recordUpdate(name string) { +func (r *MRMetricRecorder) recordFirstTimeReconciled(managed resource.Managed) { + if managed.GetCondition(xpv1.TypeSynced).Status == corev1.ConditionUnknown { + r.mrDetected.With(getMRMetricLabels(managed)).Observe(time.Since(managed.GetCreationTimestamp().Time).Seconds()) + r.firstObservation.Store(managed.GetName(), time.Now()) // this is the first time we reconciled on this resource + } +} + +func (r *MRMetricRecorder) recordDrift(managed resource.Managed) { + name := managed.GetName() last, ok := r.lastObservation.Load(name) if !ok { return @@ -139,59 +125,55 @@ func (r *metricRecorder) recordUpdate(name string) { return } - drift.WithLabelValues(r.gvk.Group, r.gvk.Kind).Observe(time.Since(lt).Seconds()) + r.mrDrift.With(getMRMetricLabels(managed)).Observe(time.Since(lt).Seconds()) r.lastObservation.Store(name, time.Now()) } -func (r *metricRecorder) recordDetected(managed resource.Managed) { - if managed.GetCondition(xpv1.TypeSynced).Status == corev1.ConditionUnknown { - mr.With(getMRMetricLabels(managed)).Set(1) - mrDetected.With(getMRMetricLabels(managed)).Observe(time.Since(managed.GetCreationTimestamp().Time).Seconds()) - r.firstObservation.Store(managed.GetName(), time.Now()) // this is the first time we reconciled on this resource - } -} - -func (r *metricRecorder) recordSyncedState(managed resource.Managed, v float64) { - mrSynced.With(getMRMetricLabels(managed)).Set(v) -} - -func (r *metricRecorder) recordNotReady(managed resource.Managed) { - mrReady.With(getMRMetricLabels(managed)).Set(0) +func (r *MRMetricRecorder) recordDeleted(managed resource.Managed) { + r.mrDeletion.With(getMRMetricLabels(managed)).Observe(time.Since(managed.GetDeletionTimestamp().Time).Seconds()) } -func (r *metricRecorder) recordDeleted(managed resource.Managed) { - labels := getMRMetricLabels(managed) - - if managed.GetDeletionTimestamp() != nil { - mrDeletionDuration.With(getMRMetricLabels(managed)).Observe(time.Since(managed.GetDeletionTimestamp().Time).Seconds()) - } - mr.With(labels).Set(0) - mrReady.With(labels).Set(0) - mrSynced.With(labels).Set(0) -} - -func (r *metricRecorder) recordUpToDate(managed resource.Managed) { - mrSynced.With(getMRMetricLabels(managed)).Set(1) +func (r *MRMetricRecorder) recordFirstTimeReady(managed resource.Managed) { // Note that providers may set the ready condition to "True", so we need // to check the value here to send the ready metric if managed.GetCondition(xpv1.TypeReady).Status == corev1.ConditionTrue { - mrReady.With(getMRMetricLabels(managed)).Set(1) - name := managed.GetName() - _, ok := r.firstObservation.Load(name) // This map is used to identify the first time to readiness + _, ok := r.firstObservation.Load(managed.GetName()) // This map is used to identify the first time to readiness if !ok { return } - - mrReadyDuration.With(getMRMetricLabels(managed)).Observe(time.Since(managed.GetCreationTimestamp().Time).Seconds()) + r.mrFirstTimeReady.With(getMRMetricLabels(managed)).Observe(time.Since(managed.GetCreationTimestamp().Time).Seconds()) r.firstObservation.Delete(managed.GetName()) } } +// A NopMetricRecorder does nothing. +type NopMetricRecorder struct{} + +// NewNopMetricRecorder returns a MRMetricRecorder that does nothing. +func NewNopMetricRecorder() *NopMetricRecorder { + return &NopMetricRecorder{} +} + +// Describe does nothing. +func (r *NopMetricRecorder) Describe(_ chan<- *prometheus.Desc) {} + +// Collect does nothing. +func (r *NopMetricRecorder) Collect(_ chan<- prometheus.Metric) {} + +func (r *NopMetricRecorder) recordUnchanged(_ string) {} + +func (r *NopMetricRecorder) recordFirstTimeReconciled(_ resource.Managed) {} + +func (r *NopMetricRecorder) recordDrift(_ resource.Managed) {} + +func (r *NopMetricRecorder) recordDeleted(_ resource.Managed) {} + +func (r *NopMetricRecorder) recordFirstTimeReady(_ resource.Managed) {} + func getMRMetricLabels(managed resource.Managed) prometheus.Labels { l := prometheus.Labels{ "gvk": managed.GetObjectKind().GroupVersionKind().String(), - "name": managed.GetName(), "claim": "", "composite": managed.GetLabels()["crossplane.io/composite"], } diff --git a/pkg/reconciler/managed/reconciler.go b/pkg/reconciler/managed/reconciler.go index ecf699686..0f2b99d4e 100644 --- a/pkg/reconciler/managed/reconciler.go +++ b/pkg/reconciler/managed/reconciler.go @@ -476,8 +476,6 @@ type Reconciler struct { features feature.Flags - metricRecorder metricRecorder - // The below structs embed the set of interfaces used to implement the // managed resource reconciler. We do this primarily for readability, so // that the reconciler logic reads r.external.Connect(), @@ -487,8 +485,10 @@ type Reconciler struct { supportedManagementPolicies []sets.Set[xpv1.ManagementAction] - log logging.Logger - record event.Recorder + log logging.Logger + record event.Recorder + metricRecorder MetricRecorder + stateRecorder StateRecorder } type mrManaged struct { @@ -546,6 +546,20 @@ func WithPollInterval(after time.Duration) ReconcilerOption { } } +// WithMetricRecorder configures the Reconciler to use the supplied MetricRecorder. +func WithMetricRecorder(recorder MetricRecorder) ReconcilerOption { + return func(r *Reconciler) { + r.metricRecorder = recorder + } +} + +// WithStateRecorder configures the Reconciler to use the supplied StateRecorder. +func WithStateRecorder(recorder StateRecorder) ReconcilerOption { + return func(r *Reconciler) { + r.stateRecorder = recorder + } +} + // PollIntervalHook represents the function type passed to the // WithPollIntervalHook option to support dynamic computation of the poll // interval. @@ -699,17 +713,21 @@ func NewReconciler(m manager.Manager, of resource.ManagedKind, o ...ReconcilerOp creationGracePeriod: defaultGracePeriod, timeout: reconcileTimeout, managed: defaultMRManaged(m), - metricRecorder: metricRecorder{cluster: m, gvk: schema.GroupVersionKind(of)}, external: defaultMRExternal(), supportedManagementPolicies: defaultSupportedManagementPolicies(), log: logging.NewNopLogger(), record: event.NewNopRecorder(), + metricRecorder: NewNopMetricRecorder(), + stateRecorder: NewNopStateRecorder(), } for _, ro := range o { ro(r) } + // State recorder is started in the background to record MR states. + go r.stateRecorder.Run(context.Background(), schema.GroupVersionKind(of)) + return r } @@ -734,11 +752,10 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (resu // There's no need to requeue if we no longer exist. Otherwise we'll be // requeued implicitly because we return an error. log.Debug("Cannot get managed resource", "error", err) - r.metricRecorder.recordDeleted(managed) return reconcile.Result{}, errors.Wrap(resource.IgnoreNotFound(err), errGetManaged) } - r.metricRecorder.recordDetected(managed) + r.metricRecorder.recordFirstTimeReconciled(managed) record := r.record.WithAnnotations("external-name", meta.GetExternalName(managed)) log = log.WithValues( @@ -765,7 +782,6 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (resu record.Event(managed, event.Normal(reasonReconciliationPaused, "Reconciliation is paused either through the `spec.managementPolicies` or the pause annotation", "annotation", meta.AnnotationKeyReconciliationPaused)) managed.SetConditions(xpv1.ReconcilePaused()) - r.metricRecorder.recordSyncedState(managed, 0) // if the pause annotation is removed or the management policies changed, we will have a chance to reconcile // again and resume and if status update fails, we will reconcile again to retry to update the status return reconcile.Result{}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus) @@ -786,7 +802,6 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (resu } record.Event(managed, event.Warning(reasonManagementPolicyInvalid, err)) managed.SetConditions(xpv1.ReconcileError(err)) - r.metricRecorder.recordSyncedState(managed, 0) return reconcile.Result{}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus) } @@ -812,8 +827,6 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (resu } record.Event(managed, event.Warning(reasonCannotUnpublish, err)) managed.SetConditions(xpv1.Deleting(), xpv1.ReconcileError(err)) - r.metricRecorder.recordNotReady(managed) - r.metricRecorder.recordSyncedState(managed, 0) return reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus) } if err := r.managed.RemoveFinalizer(ctx, managed); err != nil { @@ -826,8 +839,6 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (resu return reconcile.Result{Requeue: true}, nil } managed.SetConditions(xpv1.Deleting(), xpv1.ReconcileError(err)) - r.metricRecorder.recordSyncedState(managed, 0) - r.metricRecorder.recordNotReady(managed) return reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus) } @@ -850,7 +861,6 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (resu } record.Event(managed, event.Warning(reasonCannotInitialize, err)) managed.SetConditions(xpv1.ReconcileError(err)) - r.metricRecorder.recordSyncedState(managed, 0) return reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus) } @@ -862,8 +872,6 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (resu log.Debug(errCreateIncomplete) record.Event(managed, event.Warning(reasonCannotInitialize, errors.New(errCreateIncomplete))) managed.SetConditions(xpv1.Creating(), xpv1.ReconcileError(errors.New(errCreateIncomplete))) - r.metricRecorder.recordSyncedState(managed, 0) - r.metricRecorder.recordNotReady(managed) return reconcile.Result{Requeue: false}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus) } @@ -889,7 +897,6 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (resu } record.Event(managed, event.Warning(reasonCannotResolveRefs, err)) managed.SetConditions(xpv1.ReconcileError(err)) - r.metricRecorder.recordSyncedState(managed, 0) return reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus) } } @@ -907,7 +914,6 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (resu } record.Event(managed, event.Warning(reasonCannotConnect, err)) managed.SetConditions(xpv1.ReconcileError(errors.Wrap(err, errReconcileConnect))) - r.metricRecorder.recordSyncedState(managed, 0) return reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus) } defer func() { @@ -931,7 +937,6 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (resu } record.Event(managed, event.Warning(reasonCannotObserve, err)) managed.SetConditions(xpv1.ReconcileError(errors.Wrap(err, errReconcileObserve))) - r.metricRecorder.recordSyncedState(managed, 0) return reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus) } @@ -940,7 +945,6 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (resu if !observation.ResourceExists && policy.ShouldOnlyObserve() { record.Event(managed, event.Warning(reasonCannotObserve, errors.New(errExternalResourceNotExist))) managed.SetConditions(xpv1.ReconcileError(errors.Wrap(errors.New(errExternalResourceNotExist), errReconcileObserve))) - r.metricRecorder.recordSyncedState(managed, 0) return reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus) } @@ -969,8 +973,6 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (resu log.Debug("Cannot delete external resource", "error", err) record.Event(managed, event.Warning(reasonCannotDelete, err)) managed.SetConditions(xpv1.Deleting(), xpv1.ReconcileError(errors.Wrap(err, errReconcileDelete))) - r.metricRecorder.recordSyncedState(managed, 0) - r.metricRecorder.recordNotReady(managed) return reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus) } @@ -984,8 +986,6 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (resu log.Debug("Successfully requested deletion of external resource") record.Event(managed, event.Normal(reasonDeleted, "Successfully requested deletion of external resource")) managed.SetConditions(xpv1.Deleting(), xpv1.ReconcileSuccess()) - r.metricRecorder.recordSyncedState(managed, 1) - r.metricRecorder.recordNotReady(managed) return reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus) } if err := r.managed.UnpublishConnection(ctx, managed, observation.ConnectionDetails); err != nil { @@ -999,8 +999,6 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (resu } record.Event(managed, event.Warning(reasonCannotUnpublish, err)) managed.SetConditions(xpv1.Deleting(), xpv1.ReconcileError(err)) - r.metricRecorder.recordSyncedState(managed, 0) - r.metricRecorder.recordNotReady(managed) return reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus) } if err := r.managed.RemoveFinalizer(ctx, managed); err != nil { @@ -1013,8 +1011,6 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (resu return reconcile.Result{Requeue: true}, nil } managed.SetConditions(xpv1.Deleting(), xpv1.ReconcileError(err)) - r.metricRecorder.recordSyncedState(managed, 0) - r.metricRecorder.recordNotReady(managed) return reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus) } @@ -1037,7 +1033,6 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (resu } record.Event(managed, event.Warning(reasonCannotPublish, err)) managed.SetConditions(xpv1.ReconcileError(err)) - r.metricRecorder.recordSyncedState(managed, 0) return reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus) } @@ -1050,7 +1045,6 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (resu return reconcile.Result{Requeue: true}, nil } managed.SetConditions(xpv1.ReconcileError(err)) - r.metricRecorder.recordSyncedState(managed, 0) return reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus) } @@ -1070,8 +1064,6 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (resu } record.Event(managed, event.Warning(reasonCannotUpdateManaged, errors.Wrap(err, errUpdateManaged))) managed.SetConditions(xpv1.Creating(), xpv1.ReconcileError(errors.Wrap(err, errUpdateManaged))) - r.metricRecorder.recordSyncedState(managed, 0) - r.metricRecorder.recordNotReady(managed) return reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus) } @@ -1106,8 +1098,6 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (resu } managed.SetConditions(xpv1.Creating(), xpv1.ReconcileError(errors.Wrap(err, errReconcileCreate))) - r.metricRecorder.recordNotReady(managed) - r.metricRecorder.recordSyncedState(managed, 0) return reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus) } @@ -1133,8 +1123,6 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (resu } record.Event(managed, event.Warning(reasonCannotUpdateManaged, errors.Wrap(err, errUpdateManagedAnnotations))) managed.SetConditions(xpv1.Creating(), xpv1.ReconcileError(errors.Wrap(err, errUpdateManagedAnnotations))) - r.metricRecorder.recordNotReady(managed) - r.metricRecorder.recordSyncedState(managed, 0) return reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus) } @@ -1148,8 +1136,6 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (resu } record.Event(managed, event.Warning(reasonCannotPublish, err)) managed.SetConditions(xpv1.Creating(), xpv1.ReconcileError(err)) - r.metricRecorder.recordNotReady(managed) - r.metricRecorder.recordSyncedState(managed, 0) return reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus) } @@ -1160,8 +1146,6 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (resu log.Debug("Successfully requested creation of external resource") record.Event(managed, event.Normal(reasonCreated, "Successfully requested creation of external resource")) managed.SetConditions(xpv1.Creating(), xpv1.ReconcileSuccess()) - r.metricRecorder.recordNotReady(managed) - r.metricRecorder.recordSyncedState(managed, 1) return reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus) } @@ -1177,7 +1161,6 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (resu log.Debug(errUpdateManaged, "error", err) record.Event(managed, event.Warning(reasonCannotUpdateManaged, err)) managed.SetConditions(xpv1.ReconcileError(errors.Wrap(err, errUpdateManaged))) - r.metricRecorder.recordSyncedState(managed, 0) return reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus) } } @@ -1192,7 +1175,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (resu reconcileAfter := r.pollIntervalHook(managed, r.pollInterval) log.Debug("External resource is up to date", "requeue-after", time.Now().Add(reconcileAfter)) managed.SetConditions(xpv1.ReconcileSuccess()) - r.metricRecorder.recordUpToDate(managed) + r.metricRecorder.recordFirstTimeReady(managed) // record that we intentionally did not update the managed resource // because no drift was detected. We call this so late in the reconcile @@ -1212,7 +1195,6 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (resu reconcileAfter := r.pollIntervalHook(managed, r.pollInterval) log.Debug("Skipping update due to managementPolicies. Reconciliation succeeded", "requeue-after", time.Now().Add(reconcileAfter)) managed.SetConditions(xpv1.ReconcileSuccess()) - r.metricRecorder.recordSyncedState(managed, 1) return reconcile.Result{RequeueAfter: reconcileAfter}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus) } @@ -1226,12 +1208,11 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (resu log.Debug("Cannot update external resource") record.Event(managed, event.Warning(reasonCannotUpdate, err)) managed.SetConditions(xpv1.ReconcileError(errors.Wrap(err, errReconcileUpdate))) - r.metricRecorder.recordSyncedState(managed, 0) return reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus) } // record the drift after the successful update. - r.metricRecorder.recordUpdate(managed.GetName()) + r.metricRecorder.recordDrift(managed) if _, err := r.managed.PublishConnection(ctx, managed, update.ConnectionDetails); err != nil { // If this is the first time we encounter this issue we'll be requeued @@ -1240,7 +1221,6 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (resu log.Debug("Cannot publish connection details", "error", err) record.Event(managed, event.Warning(reasonCannotPublish, err)) managed.SetConditions(xpv1.ReconcileError(err)) - r.metricRecorder.recordSyncedState(managed, 0) return reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus) } @@ -1253,6 +1233,5 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (resu log.Debug("Successfully requested update of external resource", "requeue-after", time.Now().Add(reconcileAfter)) record.Event(managed, event.Normal(reasonUpdated, "Successfully requested update of external resource")) managed.SetConditions(xpv1.ReconcileSuccess()) - r.metricRecorder.recordSyncedState(managed, 1) return reconcile.Result{RequeueAfter: reconcileAfter}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus) } diff --git a/pkg/reconciler/managed/state_metrics.go b/pkg/reconciler/managed/state_metrics.go new file mode 100644 index 000000000..0c63b4ece --- /dev/null +++ b/pkg/reconciler/managed/state_metrics.go @@ -0,0 +1,183 @@ +/* +Copyright 2024 The Crossplane Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package managed + +import ( + "context" + "time" + + "github.com/prometheus/client_golang/prometheus" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "sigs.k8s.io/controller-runtime/pkg/client" + + xpv1 "github.com/crossplane/crossplane-runtime/apis/common/v1" + "github.com/crossplane/crossplane-runtime/pkg/fieldpath" + "github.com/crossplane/crossplane-runtime/pkg/logging" +) + +// A StateRecorder records the state of given GroupVersionKind. +type StateRecorder interface { + Describe(ch chan<- *prometheus.Desc) + Collect(ch chan<- prometheus.Metric) + + Record(ctx context.Context, gvk schema.GroupVersionKind) + Run(ctx context.Context, gvk schema.GroupVersionKind) +} + +// A MRStateRecorder records the state of managed resources. +type MRStateRecorder struct { + client client.Client + log logging.Logger + frequency time.Duration + + mrExists *prometheus.GaugeVec + mrReady *prometheus.GaugeVec + mrSynced *prometheus.GaugeVec +} + +// NewMRStateRecorder returns a new MRStateRecorder which records the state of managed resources. +func NewMRStateRecorder(client client.Client, log logging.Logger, o ...StateRecorderOption) *MRStateRecorder { + r := &MRStateRecorder{ + client: client, + log: log, + + mrExists: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Subsystem: subSystem, + Name: "managed_resource_exists", + Help: "The number of managed resources that exist", + }, []string{"gvk"}), + mrReady: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Subsystem: subSystem, + Name: "managed_resource_ready", + Help: "The number of managed resources in Ready=True state", + }, []string{"gvk"}), + mrSynced: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Subsystem: subSystem, + Name: "managed_resource_synced", + Help: "The number of managed resources in Synced=True state", + }, []string{"gvk"}), + } + + for _, ro := range o { + ro(r) + } + + return r +} + +// A StateRecorderOption configures a MRStateRecorder. +type StateRecorderOption func(*MRStateRecorder) + +// WithFrequency configures the frequency at which the MRStateRecorder +// will record. +func WithFrequency(f time.Duration) StateRecorderOption { + return func(r *MRStateRecorder) { + r.frequency = f + } +} + +// Describe sends the super-set of all possible descriptors of metrics +// collected by this Collector to the provided channel and returns once +// the last descriptor has been sent. +func (r *MRStateRecorder) Describe(ch chan<- *prometheus.Desc) { + r.mrExists.Describe(ch) + r.mrReady.Describe(ch) + r.mrSynced.Describe(ch) +} + +// Collect is called by the Prometheus registry when collecting +// metrics. The implementation sends each collected metric via the +// provided channel and returns once the last metric has been sent. +func (r *MRStateRecorder) Collect(ch chan<- prometheus.Metric) { + r.mrExists.Collect(ch) + r.mrReady.Collect(ch) + r.mrSynced.Collect(ch) +} + +// Record records the state of managed resources. +func (r *MRStateRecorder) Record(ctx context.Context, gvk schema.GroupVersionKind) { + l := &unstructured.UnstructuredList{} + l.SetGroupVersionKind(gvk) + err := r.client.List(ctx, l) + if err != nil { + r.log.Info("Failed to list managed resources", "error", err) + return + } + + label := gvk.String() + r.mrExists.WithLabelValues(label).Set(float64(len(l.Items))) + + var numReady, numSynced float64 = 0, 0 + for _, o := range l.Items { + conditioned := xpv1.ConditionedStatus{} + err := fieldpath.Pave(o.Object).GetValueInto("status", &conditioned) + if err != nil { + r.log.Info("Failed to get conditions of managed resource", "error", err) + continue + } + + for _, condition := range conditioned.Conditions { + if condition.Type == xpv1.TypeReady && condition.Status == corev1.ConditionTrue { + numReady++ + } else if condition.Type == xpv1.TypeSynced && condition.Status == corev1.ConditionTrue { + numSynced++ + } + } + } + + r.mrReady.WithLabelValues(label).Set(numReady) + r.mrSynced.WithLabelValues(label).Set(numSynced) +} + +// Run records state of managed resources with given frequency. +func (r *MRStateRecorder) Run(ctx context.Context, gvk schema.GroupVersionKind) { + ticker := time.NewTicker(r.frequency) + quit := make(chan struct{}) + go func() { + for { + select { + case <-ticker.C: + r.Record(ctx, gvk) + case <-quit: + ticker.Stop() + return + } + } + }() +} + +// A NopStateRecorder does nothing. +type NopStateRecorder struct{} + +// NewNopStateRecorder returns a NopStateRecorder that does nothing. +func NewNopStateRecorder() *NopStateRecorder { + return &NopStateRecorder{} +} + +// Describe does nothing. +func (r *NopStateRecorder) Describe(_ chan<- *prometheus.Desc) {} + +// Collect does nothing. +func (r *NopStateRecorder) Collect(_ chan<- prometheus.Metric) {} + +// Record does nothing. +func (r *NopStateRecorder) Record(_ context.Context, _ schema.GroupVersionKind) {} + +// Run does nothing. +func (r *NopStateRecorder) Run(_ context.Context, _ schema.GroupVersionKind) {} From 4b6c2de666514360ecf7938796e8ad2b870931d8 Mon Sep 17 00:00:00 2001 From: ezgidemirel Date: Tue, 9 Apr 2024 19:36:08 +0300 Subject: [PATCH 4/6] add XR state metrics Signed-off-by: ezgidemirel --- go.mod | 1 + go.sum | 2 + pkg/controller/options.go | 3 +- pkg/reconciler/managed/metrics.go | 34 ++-- pkg/reconciler/managed/reconciler.go | 7 +- .../mr_state_metrics.go} | 82 ++------ pkg/statemetrics/state_recorder.go | 56 ++++++ pkg/statemetrics/xr_state_metrics.go | 181 ++++++++++++++++++ 8 files changed, 279 insertions(+), 87 deletions(-) rename pkg/{reconciler/managed/state_metrics.go => statemetrics/mr_state_metrics.go} (67%) create mode 100644 pkg/statemetrics/state_recorder.go create mode 100644 pkg/statemetrics/xr_state_metrics.go diff --git a/go.mod b/go.mod index 4d563cbf1..53a3490ee 100644 --- a/go.mod +++ b/go.mod @@ -19,6 +19,7 @@ require ( k8s.io/api v0.29.1 k8s.io/apiextensions-apiserver v0.29.1 k8s.io/apimachinery v0.29.1 + k8s.io/apiserver v0.29.1 k8s.io/client-go v0.29.1 k8s.io/component-base v0.29.1 k8s.io/klog/v2 v2.110.1 diff --git a/go.sum b/go.sum index 46cd0c925..4f88534dc 100644 --- a/go.sum +++ b/go.sum @@ -352,6 +352,8 @@ k8s.io/apiextensions-apiserver v0.29.1 h1:S9xOtyk9M3Sk1tIpQMu9wXHm5O2MX6Y1kIpPMi k8s.io/apiextensions-apiserver v0.29.1/go.mod h1:zZECpujY5yTW58co8V2EQR4BD6A9pktVgHhvc0uLfeU= k8s.io/apimachinery v0.29.1 h1:KY4/E6km/wLBguvCZv8cKTeOwwOBqFNjwJIdMkMbbRc= k8s.io/apimachinery v0.29.1/go.mod h1:6HVkd1FwxIagpYrHSwJlQqZI3G9LfYWRPAkUvLnXTKU= +k8s.io/apiserver v0.29.1 h1:e2wwHUfEmMsa8+cuft8MT56+16EONIEK8A/gpBSco+g= +k8s.io/apiserver v0.29.1/go.mod h1:V0EpkTRrJymyVT3M49we8uh2RvXf7fWC5XLB0P3SwRw= k8s.io/client-go v0.29.1 h1:19B/+2NGEwnFLzt0uB5kNJnfTsbV8w6TgQRz9l7ti7A= k8s.io/client-go v0.29.1/go.mod h1:TDG/psL9hdet0TI9mGyHJSgRkW3H9JZk2dNEUS7bRks= k8s.io/component-base v0.29.1 h1:MUimqJPCRnnHsskTTjKD+IC1EHBbRCVyi37IoFBrkYw= diff --git a/pkg/controller/options.go b/pkg/controller/options.go index ee55c1ae2..b4afe37f4 100644 --- a/pkg/controller/options.go +++ b/pkg/controller/options.go @@ -27,6 +27,7 @@ import ( "github.com/crossplane/crossplane-runtime/pkg/logging" "github.com/crossplane/crossplane-runtime/pkg/ratelimiter" "github.com/crossplane/crossplane-runtime/pkg/reconciler/managed" + "github.com/crossplane/crossplane-runtime/pkg/statemetrics" ) // DefaultOptions returns a functional set of options with conservative @@ -65,7 +66,7 @@ type Options struct { MetricRecorder managed.MetricRecorder - StateRecorder managed.StateRecorder + StateRecorder statemetrics.StateRecorder } // ForControllerRuntime extracts options for controller-runtime. diff --git a/pkg/reconciler/managed/metrics.go b/pkg/reconciler/managed/metrics.go index 0255ae34d..0099ddf2b 100644 --- a/pkg/reconciler/managed/metrics.go +++ b/pkg/reconciler/managed/metrics.go @@ -61,25 +61,25 @@ func NewMRMetricRecorder() *MRMetricRecorder { Name: "managed_resource_time_to_first_reconcile_seconds", Help: "The time it took for a managed resource to be detected by the controller", Buckets: kmetrics.ExponentialBuckets(10e-9, 10, 10), - }, []string{"gvk", "claim", "composite"}), + }, []string{"gvk"}), mrFirstTimeReady: prometheus.NewHistogramVec(prometheus.HistogramOpts{ Subsystem: subSystem, Name: "managed_resource_first_time_to_readiness_seconds", Help: "The time it took for a managed resource to become ready first time after creation", Buckets: []float64{1, 5, 10, 15, 30, 60, 120, 300, 600, 1800, 3600}, - }, []string{"gvk", "claim", "composite"}), + }, []string{"gvk"}), mrDeletion: prometheus.NewHistogramVec(prometheus.HistogramOpts{ Subsystem: subSystem, Name: "managed_resource_deletion_seconds", Help: "The time it took for a managed resource to be deleted", Buckets: []float64{1, 5, 10, 15, 30, 60, 120, 300, 600, 1800, 3600}, - }, []string{"gvk", "claim", "composite"}), + }, []string{"gvk"}), mrDrift: prometheus.NewHistogramVec(prometheus.HistogramOpts{ Subsystem: subSystem, Name: "managed_resource_drift_seconds", Help: "ALPHA: How long since the previous successful reconcile when a resource was found to be out of sync; excludes restart of the provider", Buckets: kmetrics.ExponentialBuckets(10e-9, 10, 10), - }, []string{"gvk", "claim", "composite"}), + }, []string{"gvk"}), } } @@ -109,7 +109,8 @@ func (r *MRMetricRecorder) recordUnchanged(name string) { func (r *MRMetricRecorder) recordFirstTimeReconciled(managed resource.Managed) { if managed.GetCondition(xpv1.TypeSynced).Status == corev1.ConditionUnknown { - r.mrDetected.With(getMRMetricLabels(managed)).Observe(time.Since(managed.GetCreationTimestamp().Time).Seconds()) + r.mrDetected.WithLabelValues("gvk", managed.GetObjectKind().GroupVersionKind().String()). + Observe(time.Since(managed.GetCreationTimestamp().Time).Seconds()) r.firstObservation.Store(managed.GetName(), time.Now()) // this is the first time we reconciled on this resource } } @@ -125,13 +126,15 @@ func (r *MRMetricRecorder) recordDrift(managed resource.Managed) { return } - r.mrDrift.With(getMRMetricLabels(managed)).Observe(time.Since(lt).Seconds()) + r.mrDrift.WithLabelValues("gvk", managed.GetObjectKind().GroupVersionKind().String()). + Observe(time.Since(lt).Seconds()) r.lastObservation.Store(name, time.Now()) } func (r *MRMetricRecorder) recordDeleted(managed resource.Managed) { - r.mrDeletion.With(getMRMetricLabels(managed)).Observe(time.Since(managed.GetDeletionTimestamp().Time).Seconds()) + r.mrDeletion.WithLabelValues("gvk", managed.GetObjectKind().GroupVersionKind().String()). + Observe(time.Since(managed.GetDeletionTimestamp().Time).Seconds()) } func (r *MRMetricRecorder) recordFirstTimeReady(managed resource.Managed) { @@ -142,7 +145,8 @@ func (r *MRMetricRecorder) recordFirstTimeReady(managed resource.Managed) { if !ok { return } - r.mrFirstTimeReady.With(getMRMetricLabels(managed)).Observe(time.Since(managed.GetCreationTimestamp().Time).Seconds()) + r.mrFirstTimeReady.WithLabelValues("gvk", managed.GetObjectKind().GroupVersionKind().String()). + Observe(time.Since(managed.GetCreationTimestamp().Time).Seconds()) r.firstObservation.Delete(managed.GetName()) } } @@ -170,17 +174,3 @@ func (r *NopMetricRecorder) recordDrift(_ resource.Managed) {} func (r *NopMetricRecorder) recordDeleted(_ resource.Managed) {} func (r *NopMetricRecorder) recordFirstTimeReady(_ resource.Managed) {} - -func getMRMetricLabels(managed resource.Managed) prometheus.Labels { - l := prometheus.Labels{ - "gvk": managed.GetObjectKind().GroupVersionKind().String(), - "claim": "", - "composite": managed.GetLabels()["crossplane.io/composite"], - } - - if managed.GetLabels()["crossplane.io/claim-namespace"] != "" && managed.GetLabels()["crossplane.io/claim-name"] != "" { - l["claim"] = managed.GetLabels()["crossplane.io/claim-namespace"] + "/" + managed.GetLabels()["crossplane.io/claim-name"] - } - - return l -} diff --git a/pkg/reconciler/managed/reconciler.go b/pkg/reconciler/managed/reconciler.go index 0f2b99d4e..2be38e18e 100644 --- a/pkg/reconciler/managed/reconciler.go +++ b/pkg/reconciler/managed/reconciler.go @@ -36,6 +36,7 @@ import ( "github.com/crossplane/crossplane-runtime/pkg/logging" "github.com/crossplane/crossplane-runtime/pkg/meta" "github.com/crossplane/crossplane-runtime/pkg/resource" + "github.com/crossplane/crossplane-runtime/pkg/statemetrics" ) const ( @@ -488,7 +489,7 @@ type Reconciler struct { log logging.Logger record event.Recorder metricRecorder MetricRecorder - stateRecorder StateRecorder + stateRecorder statemetrics.StateRecorder } type mrManaged struct { @@ -554,7 +555,7 @@ func WithMetricRecorder(recorder MetricRecorder) ReconcilerOption { } // WithStateRecorder configures the Reconciler to use the supplied StateRecorder. -func WithStateRecorder(recorder StateRecorder) ReconcilerOption { +func WithStateRecorder(recorder statemetrics.StateRecorder) ReconcilerOption { return func(r *Reconciler) { r.stateRecorder = recorder } @@ -718,7 +719,7 @@ func NewReconciler(m manager.Manager, of resource.ManagedKind, o ...ReconcilerOp log: logging.NewNopLogger(), record: event.NewNopRecorder(), metricRecorder: NewNopMetricRecorder(), - stateRecorder: NewNopStateRecorder(), + stateRecorder: statemetrics.NewNopStateRecorder(), } for _, ro := range o { diff --git a/pkg/reconciler/managed/state_metrics.go b/pkg/statemetrics/mr_state_metrics.go similarity index 67% rename from pkg/reconciler/managed/state_metrics.go rename to pkg/statemetrics/mr_state_metrics.go index 0c63b4ece..cb4eae966 100644 --- a/pkg/reconciler/managed/state_metrics.go +++ b/pkg/statemetrics/mr_state_metrics.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package managed +package statemetrics import ( "context" @@ -31,20 +31,14 @@ import ( "github.com/crossplane/crossplane-runtime/pkg/logging" ) -// A StateRecorder records the state of given GroupVersionKind. -type StateRecorder interface { - Describe(ch chan<- *prometheus.Desc) - Collect(ch chan<- prometheus.Metric) - - Record(ctx context.Context, gvk schema.GroupVersionKind) - Run(ctx context.Context, gvk schema.GroupVersionKind) -} +// A MRStateRecorderOption configures a MRStateRecorder. +type MRStateRecorderOption func(*MRStateRecorder) // A MRStateRecorder records the state of managed resources. type MRStateRecorder struct { - client client.Client - log logging.Logger - frequency time.Duration + client client.Client + log logging.Logger + interval time.Duration mrExists *prometheus.GaugeVec mrReady *prometheus.GaugeVec @@ -52,10 +46,11 @@ type MRStateRecorder struct { } // NewMRStateRecorder returns a new MRStateRecorder which records the state of managed resources. -func NewMRStateRecorder(client client.Client, log logging.Logger, o ...StateRecorderOption) *MRStateRecorder { - r := &MRStateRecorder{ - client: client, - log: log, +func NewMRStateRecorder(client client.Client, log logging.Logger, interval time.Duration) *MRStateRecorder { + return &MRStateRecorder{ + client: client, + log: log, + interval: interval, mrExists: prometheus.NewGaugeVec(prometheus.GaugeOpts{ Subsystem: subSystem, @@ -73,23 +68,6 @@ func NewMRStateRecorder(client client.Client, log logging.Logger, o ...StateReco Help: "The number of managed resources in Synced=True state", }, []string{"gvk"}), } - - for _, ro := range o { - ro(r) - } - - return r -} - -// A StateRecorderOption configures a MRStateRecorder. -type StateRecorderOption func(*MRStateRecorder) - -// WithFrequency configures the frequency at which the MRStateRecorder -// will record. -func WithFrequency(f time.Duration) StateRecorderOption { - return func(r *MRStateRecorder) { - r.frequency = f - } } // Describe sends the super-set of all possible descriptors of metrics @@ -133,10 +111,13 @@ func (r *MRStateRecorder) Record(ctx context.Context, gvk schema.GroupVersionKin } for _, condition := range conditioned.Conditions { - if condition.Type == xpv1.TypeReady && condition.Status == corev1.ConditionTrue { - numReady++ - } else if condition.Type == xpv1.TypeSynced && condition.Status == corev1.ConditionTrue { - numSynced++ + if condition.Status == corev1.ConditionTrue { + switch condition.Type { + case xpv1.TypeReady: + numReady++ + case xpv1.TypeSynced: + numSynced++ + } } } } @@ -145,39 +126,18 @@ func (r *MRStateRecorder) Record(ctx context.Context, gvk schema.GroupVersionKin r.mrSynced.WithLabelValues(label).Set(numSynced) } -// Run records state of managed resources with given frequency. +// Run records state of managed resources with given interval. func (r *MRStateRecorder) Run(ctx context.Context, gvk schema.GroupVersionKind) { - ticker := time.NewTicker(r.frequency) - quit := make(chan struct{}) + ticker := time.NewTicker(r.interval) go func() { for { select { case <-ticker.C: r.Record(ctx, gvk) - case <-quit: + case <-ctx.Done(): ticker.Stop() return } } }() } - -// A NopStateRecorder does nothing. -type NopStateRecorder struct{} - -// NewNopStateRecorder returns a NopStateRecorder that does nothing. -func NewNopStateRecorder() *NopStateRecorder { - return &NopStateRecorder{} -} - -// Describe does nothing. -func (r *NopStateRecorder) Describe(_ chan<- *prometheus.Desc) {} - -// Collect does nothing. -func (r *NopStateRecorder) Collect(_ chan<- prometheus.Metric) {} - -// Record does nothing. -func (r *NopStateRecorder) Record(_ context.Context, _ schema.GroupVersionKind) {} - -// Run does nothing. -func (r *NopStateRecorder) Run(_ context.Context, _ schema.GroupVersionKind) {} diff --git a/pkg/statemetrics/state_recorder.go b/pkg/statemetrics/state_recorder.go new file mode 100644 index 000000000..e0629bcf9 --- /dev/null +++ b/pkg/statemetrics/state_recorder.go @@ -0,0 +1,56 @@ +/* +Copyright 2024 The Crossplane Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package statemetrics contains utilities for recording Crossplane resource state metrics. +package statemetrics + +import ( + "context" + + "github.com/prometheus/client_golang/prometheus" + "k8s.io/apimachinery/pkg/runtime/schema" +) + +const subSystem = "crossplane" + +// A StateRecorder records the state of given GroupVersionKind. +type StateRecorder interface { + Describe(ch chan<- *prometheus.Desc) + Collect(ch chan<- prometheus.Metric) + + Record(ctx context.Context, gvk schema.GroupVersionKind) + Run(ctx context.Context, gvk schema.GroupVersionKind) +} + +// A NopStateRecorder does nothing. +type NopStateRecorder struct{} + +// NewNopStateRecorder returns a NopStateRecorder that does nothing. +func NewNopStateRecorder() *NopStateRecorder { + return &NopStateRecorder{} +} + +// Describe does nothing. +func (r *NopStateRecorder) Describe(_ chan<- *prometheus.Desc) {} + +// Collect does nothing. +func (r *NopStateRecorder) Collect(_ chan<- prometheus.Metric) {} + +// Record does nothing. +func (r *NopStateRecorder) Record(_ context.Context, _ schema.GroupVersionKind) {} + +// Run does nothing. +func (r *NopStateRecorder) Run(_ context.Context, _ schema.GroupVersionKind) {} diff --git a/pkg/statemetrics/xr_state_metrics.go b/pkg/statemetrics/xr_state_metrics.go new file mode 100644 index 000000000..8dcbb5140 --- /dev/null +++ b/pkg/statemetrics/xr_state_metrics.go @@ -0,0 +1,181 @@ +/* +Copyright 2024 The Crossplane Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package statemetrics + +import ( + "context" + "time" + + "github.com/prometheus/client_golang/prometheus" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + v1 "k8s.io/apiserver/pkg/apis/audit/v1" + "sigs.k8s.io/controller-runtime/pkg/client" + + xpv1 "github.com/crossplane/crossplane-runtime/apis/common/v1" + "github.com/crossplane/crossplane-runtime/pkg/fieldpath" + "github.com/crossplane/crossplane-runtime/pkg/logging" +) + +// A XRStateRecorder records the state of composite resources. +type XRStateRecorder struct { + client client.Client + log logging.Logger + interval time.Duration + + compositeExists *prometheus.GaugeVec + compositeReady *prometheus.GaugeVec + compositeSynced *prometheus.GaugeVec + compositeComposedCount *prometheus.GaugeVec +} + +// A APIExtStateRecorderOption configures a MRStateRecorder. +type APIExtStateRecorderOption func(*XRStateRecorder) + +// NewXRStateRecorder returns a new XRStateRecorder which records the state of claim, +// composite and composition metrics. +func NewXRStateRecorder(client client.Client, log logging.Logger, interval time.Duration) *XRStateRecorder { + return &XRStateRecorder{ + client: client, + log: log, + interval: interval, + + compositeExists: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Subsystem: subSystem, + Name: "composite_resource_exists", + Help: "The number of composite resources that exist", + }, []string{"gvk", "composition"}), + compositeReady: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Subsystem: subSystem, + Name: "composite_resource_ready", + Help: "The number of composite resources in Ready=True state", + }, []string{"gvk", "composition"}), + compositeSynced: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Subsystem: subSystem, + Name: "composite_resource_synced", + Help: "The number of composite resources in Synced=True state", + }, []string{"gvk", "composition"}), + compositeComposedCount: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Subsystem: subSystem, + Name: "composite_resource_composed_count", + Help: "The number of composed resources in total", + }, []string{"gvk", "composition"}), + } +} + +// Describe sends the super-set of all possible descriptors of metrics +// collected by this Collector to the provided channel and returns once +// the last descriptor has been sent. +func (r *XRStateRecorder) Describe(ch chan<- *prometheus.Desc) { + r.compositeExists.Describe(ch) + r.compositeReady.Describe(ch) + r.compositeSynced.Describe(ch) + r.compositeComposedCount.Describe(ch) +} + +// Collect is called by the Prometheus registry when collecting +// metrics. The implementation sends each collected metric via the +// provided channel and returns once the last metric has been sent. +func (r *XRStateRecorder) Collect(ch chan<- prometheus.Metric) { + r.compositeExists.Collect(ch) + r.compositeReady.Collect(ch) + r.compositeSynced.Collect(ch) + r.compositeComposedCount.Collect(ch) +} + +// Record records the state of managed resources. +func (r *XRStateRecorder) Record(ctx context.Context, gvk schema.GroupVersionKind) { + xrs := &unstructured.UnstructuredList{} + xrs.SetGroupVersionKind(gvk) + err := r.client.List(ctx, xrs) + if err != nil { + r.log.Info("Failed to list composite resources", "error", err) + return + } + + composition, err := getCompositionRef(xrs) + if err != nil { + r.log.Info("Failed to get composition reference of composite resource", "error", err) + return + } + + labels := prometheus.Labels{ + "gvk": gvk.String(), + "composition": composition, + } + r.compositeExists.With(labels).Set(float64(len(xrs.Items))) + + var numReady, numSynced, numComposed float64 = 0, 0, 0 + for _, xr := range xrs.Items { + conditioned := xpv1.ConditionedStatus{} + if err := fieldpath.Pave(xr.Object).GetValueInto("status", &conditioned); err != nil { + r.log.Info("Failed to get conditions of managed resource", "error", err) + continue + } + + for _, condition := range conditioned.Conditions { + if condition.Type == xpv1.TypeReady && condition.Status == corev1.ConditionTrue { + numReady++ + } else if condition.Type == xpv1.TypeSynced && condition.Status == corev1.ConditionTrue { + numSynced++ + } + } + + resourceRefs := make([]v1.ObjectReference, 0) + if err := fieldpath.Pave(xr.Object).GetValueInto("spec.resourceRefs", &resourceRefs); err != nil { + r.log.Info("Failed to get resource references of composed resource", "error", err) + continue + } + + numComposed += float64(len(resourceRefs)) + } + + r.compositeReady.With(labels).Set(numReady) + r.compositeSynced.With(labels).Set(numSynced) + r.compositeComposedCount.With(labels).Set(numComposed) +} + +// Run records state of managed resources with given interval. +func (r *XRStateRecorder) Run(ctx context.Context, gvk schema.GroupVersionKind) { + ticker := time.NewTicker(r.interval) + go func() { + for { + select { + case <-ticker.C: + r.Record(ctx, gvk) + case <-ctx.Done(): + ticker.Stop() + return + } + } + }() +} + +func getCompositionRef(l *unstructured.UnstructuredList) (string, error) { + if len(l.Items) == 0 { + return "", nil + } + + xr := l.Items[0].Object + compRef, err := fieldpath.Pave(xr).GetString("spec.compositionRef") + if err != nil { + return "", err + } + + return compRef, nil +} From aa7264baf4e06cccf1aaee0347d226170914e461 Mon Sep 17 00:00:00 2001 From: ezgidemirel Date: Tue, 16 Apr 2024 20:01:05 +0300 Subject: [PATCH 5/6] make state recorders Runnable and remove unstructured listing Signed-off-by: ezgidemirel --- go.mod | 1 - go.sum | 2 - pkg/controller/options.go | 9 +- pkg/reconciler/managed/reconciler.go | 13 -- pkg/resource/interfaces.go | 8 ++ pkg/statemetrics/mr_state_metrics.go | 140 ++++++++++----------- pkg/statemetrics/state_recorder.go | 16 +-- pkg/statemetrics/xr_state_metrics.go | 177 ++++++++++++--------------- 8 files changed, 170 insertions(+), 196 deletions(-) diff --git a/go.mod b/go.mod index 53a3490ee..4d563cbf1 100644 --- a/go.mod +++ b/go.mod @@ -19,7 +19,6 @@ require ( k8s.io/api v0.29.1 k8s.io/apiextensions-apiserver v0.29.1 k8s.io/apimachinery v0.29.1 - k8s.io/apiserver v0.29.1 k8s.io/client-go v0.29.1 k8s.io/component-base v0.29.1 k8s.io/klog/v2 v2.110.1 diff --git a/go.sum b/go.sum index 4f88534dc..46cd0c925 100644 --- a/go.sum +++ b/go.sum @@ -352,8 +352,6 @@ k8s.io/apiextensions-apiserver v0.29.1 h1:S9xOtyk9M3Sk1tIpQMu9wXHm5O2MX6Y1kIpPMi k8s.io/apiextensions-apiserver v0.29.1/go.mod h1:zZECpujY5yTW58co8V2EQR4BD6A9pktVgHhvc0uLfeU= k8s.io/apimachinery v0.29.1 h1:KY4/E6km/wLBguvCZv8cKTeOwwOBqFNjwJIdMkMbbRc= k8s.io/apimachinery v0.29.1/go.mod h1:6HVkd1FwxIagpYrHSwJlQqZI3G9LfYWRPAkUvLnXTKU= -k8s.io/apiserver v0.29.1 h1:e2wwHUfEmMsa8+cuft8MT56+16EONIEK8A/gpBSco+g= -k8s.io/apiserver v0.29.1/go.mod h1:V0EpkTRrJymyVT3M49we8uh2RvXf7fWC5XLB0P3SwRw= k8s.io/client-go v0.29.1 h1:19B/+2NGEwnFLzt0uB5kNJnfTsbV8w6TgQRz9l7ti7A= k8s.io/client-go v0.29.1/go.mod h1:TDG/psL9hdet0TI9mGyHJSgRkW3H9JZk2dNEUS7bRks= k8s.io/component-base v0.29.1 h1:MUimqJPCRnnHsskTTjKD+IC1EHBbRCVyi37IoFBrkYw= diff --git a/pkg/controller/options.go b/pkg/controller/options.go index b4afe37f4..fd88082df 100644 --- a/pkg/controller/options.go +++ b/pkg/controller/options.go @@ -55,6 +55,9 @@ type Options struct { // determine whether it has work to do. PollInterval time.Duration + // PollStateMetricInterval at which each controller should record state + PollStateMetricInterval time.Duration + // MaxConcurrentReconciles for each controller. MaxConcurrentReconciles int @@ -64,9 +67,11 @@ type Options struct { // ESSOptions for External Secret Stores. ESSOptions *ESSOptions - MetricRecorder managed.MetricRecorder + // MetricsRecorder to use for recording metrics. + MRMetrics managed.MetricRecorder - StateRecorder statemetrics.StateRecorder + // StateMetrics to use for recording state metrics. + StateMetrics *statemetrics.MRStateMetrics } // ForControllerRuntime extracts options for controller-runtime. diff --git a/pkg/reconciler/managed/reconciler.go b/pkg/reconciler/managed/reconciler.go index 2be38e18e..f559e77a4 100644 --- a/pkg/reconciler/managed/reconciler.go +++ b/pkg/reconciler/managed/reconciler.go @@ -36,7 +36,6 @@ import ( "github.com/crossplane/crossplane-runtime/pkg/logging" "github.com/crossplane/crossplane-runtime/pkg/meta" "github.com/crossplane/crossplane-runtime/pkg/resource" - "github.com/crossplane/crossplane-runtime/pkg/statemetrics" ) const ( @@ -489,7 +488,6 @@ type Reconciler struct { log logging.Logger record event.Recorder metricRecorder MetricRecorder - stateRecorder statemetrics.StateRecorder } type mrManaged struct { @@ -554,13 +552,6 @@ func WithMetricRecorder(recorder MetricRecorder) ReconcilerOption { } } -// WithStateRecorder configures the Reconciler to use the supplied StateRecorder. -func WithStateRecorder(recorder statemetrics.StateRecorder) ReconcilerOption { - return func(r *Reconciler) { - r.stateRecorder = recorder - } -} - // PollIntervalHook represents the function type passed to the // WithPollIntervalHook option to support dynamic computation of the poll // interval. @@ -719,16 +710,12 @@ func NewReconciler(m manager.Manager, of resource.ManagedKind, o ...ReconcilerOp log: logging.NewNopLogger(), record: event.NewNopRecorder(), metricRecorder: NewNopMetricRecorder(), - stateRecorder: statemetrics.NewNopStateRecorder(), } for _, ro := range o { ro(r) } - // State recorder is started in the background to record MR states. - go r.stateRecorder.Run(context.Background(), schema.GroupVersionKind(of)) - return r } diff --git a/pkg/resource/interfaces.go b/pkg/resource/interfaces.go index 75c600542..bd32b6ed9 100644 --- a/pkg/resource/interfaces.go +++ b/pkg/resource/interfaces.go @@ -247,6 +247,14 @@ type Composite interface { //nolint:interfacebloat // This interface has to be b ConnectionDetailsPublishedTimer } +// A CompositeList is a list of composite resources. +type CompositeList interface { + client.ObjectList + + // GetItems returns the list of composite resources. + GetItems() []Composite +} + // Composed resources can be a composed into a Composite resource. type Composed interface { Object diff --git a/pkg/statemetrics/mr_state_metrics.go b/pkg/statemetrics/mr_state_metrics.go index cb4eae966..28e52fb41 100644 --- a/pkg/statemetrics/mr_state_metrics.go +++ b/pkg/statemetrics/mr_state_metrics.go @@ -22,47 +22,34 @@ import ( "github.com/prometheus/client_golang/prometheus" corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/runtime/schema" "sigs.k8s.io/controller-runtime/pkg/client" xpv1 "github.com/crossplane/crossplane-runtime/apis/common/v1" - "github.com/crossplane/crossplane-runtime/pkg/fieldpath" "github.com/crossplane/crossplane-runtime/pkg/logging" + "github.com/crossplane/crossplane-runtime/pkg/resource" ) -// A MRStateRecorderOption configures a MRStateRecorder. -type MRStateRecorderOption func(*MRStateRecorder) - -// A MRStateRecorder records the state of managed resources. -type MRStateRecorder struct { - client client.Client - log logging.Logger - interval time.Duration - - mrExists *prometheus.GaugeVec - mrReady *prometheus.GaugeVec - mrSynced *prometheus.GaugeVec +// MRStateMetrics holds Prometheus metrics for managed resources. +type MRStateMetrics struct { + Exists *prometheus.GaugeVec + Ready *prometheus.GaugeVec + Synced *prometheus.GaugeVec } -// NewMRStateRecorder returns a new MRStateRecorder which records the state of managed resources. -func NewMRStateRecorder(client client.Client, log logging.Logger, interval time.Duration) *MRStateRecorder { - return &MRStateRecorder{ - client: client, - log: log, - interval: interval, - - mrExists: prometheus.NewGaugeVec(prometheus.GaugeOpts{ +// NewMRStateMetrics returns a new MRStateMetrics. +func NewMRStateMetrics() *MRStateMetrics { + return &MRStateMetrics{ + Exists: prometheus.NewGaugeVec(prometheus.GaugeOpts{ Subsystem: subSystem, Name: "managed_resource_exists", Help: "The number of managed resources that exist", }, []string{"gvk"}), - mrReady: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Ready: prometheus.NewGaugeVec(prometheus.GaugeOpts{ Subsystem: subSystem, Name: "managed_resource_ready", Help: "The number of managed resources in Ready=True state", }, []string{"gvk"}), - mrSynced: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Synced: prometheus.NewGaugeVec(prometheus.GaugeOpts{ Subsystem: subSystem, Name: "managed_resource_synced", Help: "The number of managed resources in Synced=True state", @@ -73,71 +60,86 @@ func NewMRStateRecorder(client client.Client, log logging.Logger, interval time. // Describe sends the super-set of all possible descriptors of metrics // collected by this Collector to the provided channel and returns once // the last descriptor has been sent. -func (r *MRStateRecorder) Describe(ch chan<- *prometheus.Desc) { - r.mrExists.Describe(ch) - r.mrReady.Describe(ch) - r.mrSynced.Describe(ch) +func (r *MRStateMetrics) Describe(ch chan<- *prometheus.Desc) { + r.Exists.Describe(ch) + r.Ready.Describe(ch) + r.Synced.Describe(ch) } // Collect is called by the Prometheus registry when collecting // metrics. The implementation sends each collected metric via the // provided channel and returns once the last metric has been sent. -func (r *MRStateRecorder) Collect(ch chan<- prometheus.Metric) { - r.mrExists.Collect(ch) - r.mrReady.Collect(ch) - r.mrSynced.Collect(ch) +func (r *MRStateMetrics) Collect(ch chan<- prometheus.Metric) { + r.Exists.Collect(ch) + r.Ready.Collect(ch) + r.Synced.Collect(ch) +} + +// A MRStateRecorder records the state of managed resources. +type MRStateRecorder struct { + client client.Client + log logging.Logger + interval time.Duration + managedList resource.ManagedList + + metrics *MRStateMetrics +} + +// NewMRStateRecorder returns a new MRStateRecorder which records the state of managed resources. +func NewMRStateRecorder(client client.Client, log logging.Logger, m *MRStateMetrics, managedList resource.ManagedList, interval time.Duration) *MRStateRecorder { + return &MRStateRecorder{ + client: client, + log: log, + metrics: m, + managedList: managedList, + interval: interval, + } } // Record records the state of managed resources. -func (r *MRStateRecorder) Record(ctx context.Context, gvk schema.GroupVersionKind) { - l := &unstructured.UnstructuredList{} - l.SetGroupVersionKind(gvk) - err := r.client.List(ctx, l) - if err != nil { +func (r *MRStateRecorder) Record(ctx context.Context, mrList resource.ManagedList) error { + if err := r.client.List(ctx, mrList); err != nil { r.log.Info("Failed to list managed resources", "error", err) - return + return err } - label := gvk.String() - r.mrExists.WithLabelValues(label).Set(float64(len(l.Items))) + mrs := mrList.GetItems() + if len(mrs) == 0 { + return nil + } + + label := mrs[0].GetObjectKind().GroupVersionKind().String() + r.metrics.Exists.WithLabelValues(label).Set(float64(len(mrs))) var numReady, numSynced float64 = 0, 0 - for _, o := range l.Items { - conditioned := xpv1.ConditionedStatus{} - err := fieldpath.Pave(o.Object).GetValueInto("status", &conditioned) - if err != nil { - r.log.Info("Failed to get conditions of managed resource", "error", err) - continue + for _, o := range mrs { + if o.GetCondition(xpv1.TypeReady).Status == corev1.ConditionTrue { + numReady++ } - for _, condition := range conditioned.Conditions { - if condition.Status == corev1.ConditionTrue { - switch condition.Type { - case xpv1.TypeReady: - numReady++ - case xpv1.TypeSynced: - numSynced++ - } - } + if o.GetCondition(xpv1.TypeSynced).Status == corev1.ConditionTrue { + numSynced++ } } - r.mrReady.WithLabelValues(label).Set(numReady) - r.mrSynced.WithLabelValues(label).Set(numSynced) + r.metrics.Ready.WithLabelValues(label).Set(numReady) + r.metrics.Synced.WithLabelValues(label).Set(numSynced) + + return nil } -// Run records state of managed resources with given interval. -func (r *MRStateRecorder) Run(ctx context.Context, gvk schema.GroupVersionKind) { +// Start records state of managed resources with given interval. +func (r *MRStateRecorder) Start(ctx context.Context) error { ticker := time.NewTicker(r.interval) - go func() { - for { - select { - case <-ticker.C: - r.Record(ctx, gvk) - case <-ctx.Done(): - ticker.Stop() - return + for { + select { + case <-ticker.C: + if err := r.Record(ctx, r.managedList); err != nil { + return err } + case <-ctx.Done(): + ticker.Stop() + return nil } - }() + } } diff --git a/pkg/statemetrics/state_recorder.go b/pkg/statemetrics/state_recorder.go index e0629bcf9..8604e81ff 100644 --- a/pkg/statemetrics/state_recorder.go +++ b/pkg/statemetrics/state_recorder.go @@ -20,7 +20,6 @@ package statemetrics import ( "context" - "github.com/prometheus/client_golang/prometheus" "k8s.io/apimachinery/pkg/runtime/schema" ) @@ -28,11 +27,8 @@ const subSystem = "crossplane" // A StateRecorder records the state of given GroupVersionKind. type StateRecorder interface { - Describe(ch chan<- *prometheus.Desc) - Collect(ch chan<- prometheus.Metric) - Record(ctx context.Context, gvk schema.GroupVersionKind) - Run(ctx context.Context, gvk schema.GroupVersionKind) + Start(ctx context.Context) error } // A NopStateRecorder does nothing. @@ -43,14 +39,8 @@ func NewNopStateRecorder() *NopStateRecorder { return &NopStateRecorder{} } -// Describe does nothing. -func (r *NopStateRecorder) Describe(_ chan<- *prometheus.Desc) {} - -// Collect does nothing. -func (r *NopStateRecorder) Collect(_ chan<- prometheus.Metric) {} - // Record does nothing. func (r *NopStateRecorder) Record(_ context.Context, _ schema.GroupVersionKind) {} -// Run does nothing. -func (r *NopStateRecorder) Run(_ context.Context, _ schema.GroupVersionKind) {} +// Start does nothing. +func (r *NopStateRecorder) Start(_ context.Context) error { return nil } diff --git a/pkg/statemetrics/xr_state_metrics.go b/pkg/statemetrics/xr_state_metrics.go index 8dcbb5140..f2ebe65c7 100644 --- a/pkg/statemetrics/xr_state_metrics.go +++ b/pkg/statemetrics/xr_state_metrics.go @@ -22,55 +22,40 @@ import ( "github.com/prometheus/client_golang/prometheus" corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/runtime/schema" - v1 "k8s.io/apiserver/pkg/apis/audit/v1" "sigs.k8s.io/controller-runtime/pkg/client" xpv1 "github.com/crossplane/crossplane-runtime/apis/common/v1" - "github.com/crossplane/crossplane-runtime/pkg/fieldpath" "github.com/crossplane/crossplane-runtime/pkg/logging" + "github.com/crossplane/crossplane-runtime/pkg/resource" ) -// A XRStateRecorder records the state of composite resources. -type XRStateRecorder struct { - client client.Client - log logging.Logger - interval time.Duration - - compositeExists *prometheus.GaugeVec - compositeReady *prometheus.GaugeVec - compositeSynced *prometheus.GaugeVec - compositeComposedCount *prometheus.GaugeVec +// A XRStateMetrics holds Prometheus metrics for composite resources. +type XRStateMetrics struct { + Exists *prometheus.GaugeVec + Ready *prometheus.GaugeVec + Synced *prometheus.GaugeVec + ComposedCount *prometheus.GaugeVec } -// A APIExtStateRecorderOption configures a MRStateRecorder. -type APIExtStateRecorderOption func(*XRStateRecorder) - -// NewXRStateRecorder returns a new XRStateRecorder which records the state of claim, -// composite and composition metrics. -func NewXRStateRecorder(client client.Client, log logging.Logger, interval time.Duration) *XRStateRecorder { - return &XRStateRecorder{ - client: client, - log: log, - interval: interval, - - compositeExists: prometheus.NewGaugeVec(prometheus.GaugeOpts{ +// NewXRStateMetrics returns a new XRStateMetrics. +func NewXRStateMetrics() *XRStateMetrics { + return &XRStateMetrics{ + Exists: prometheus.NewGaugeVec(prometheus.GaugeOpts{ Subsystem: subSystem, Name: "composite_resource_exists", Help: "The number of composite resources that exist", }, []string{"gvk", "composition"}), - compositeReady: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Ready: prometheus.NewGaugeVec(prometheus.GaugeOpts{ Subsystem: subSystem, Name: "composite_resource_ready", Help: "The number of composite resources in Ready=True state", }, []string{"gvk", "composition"}), - compositeSynced: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Synced: prometheus.NewGaugeVec(prometheus.GaugeOpts{ Subsystem: subSystem, Name: "composite_resource_synced", Help: "The number of composite resources in Synced=True state", }, []string{"gvk", "composition"}), - compositeComposedCount: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + ComposedCount: prometheus.NewGaugeVec(prometheus.GaugeOpts{ Subsystem: subSystem, Name: "composite_resource_composed_count", Help: "The number of composed resources in total", @@ -81,101 +66,101 @@ func NewXRStateRecorder(client client.Client, log logging.Logger, interval time. // Describe sends the super-set of all possible descriptors of metrics // collected by this Collector to the provided channel and returns once // the last descriptor has been sent. -func (r *XRStateRecorder) Describe(ch chan<- *prometheus.Desc) { - r.compositeExists.Describe(ch) - r.compositeReady.Describe(ch) - r.compositeSynced.Describe(ch) - r.compositeComposedCount.Describe(ch) +func (r *XRStateMetrics) Describe(ch chan<- *prometheus.Desc) { + r.Exists.Describe(ch) + r.Ready.Describe(ch) + r.Synced.Describe(ch) + r.ComposedCount.Describe(ch) } // Collect is called by the Prometheus registry when collecting // metrics. The implementation sends each collected metric via the // provided channel and returns once the last metric has been sent. -func (r *XRStateRecorder) Collect(ch chan<- prometheus.Metric) { - r.compositeExists.Collect(ch) - r.compositeReady.Collect(ch) - r.compositeSynced.Collect(ch) - r.compositeComposedCount.Collect(ch) +func (r *XRStateMetrics) Collect(ch chan<- prometheus.Metric) { + r.Exists.Collect(ch) + r.Ready.Collect(ch) + r.Synced.Collect(ch) + r.ComposedCount.Collect(ch) +} + +// A XRStateRecorder records the state of composite resources. +type XRStateRecorder struct { + client client.Client + log logging.Logger + interval time.Duration + compositeList resource.CompositeList + + metrics *XRStateMetrics +} + +// NewXRStateRecorder returns a new XRStateRecorder which records the state xr resources. +func NewXRStateRecorder(client client.Client, log logging.Logger, metrics *XRStateMetrics, compositeList resource.CompositeList, interval time.Duration) *XRStateRecorder { + return &XRStateRecorder{ + client: client, + log: log, + metrics: metrics, + compositeList: compositeList, + interval: interval, + } } // Record records the state of managed resources. -func (r *XRStateRecorder) Record(ctx context.Context, gvk schema.GroupVersionKind) { - xrs := &unstructured.UnstructuredList{} - xrs.SetGroupVersionKind(gvk) - err := r.client.List(ctx, xrs) - if err != nil { +func (r *XRStateRecorder) Record(ctx context.Context, xrList resource.CompositeList) error { + if err := r.client.List(ctx, xrList); err != nil { r.log.Info("Failed to list composite resources", "error", err) - return + return err } - composition, err := getCompositionRef(xrs) - if err != nil { - r.log.Info("Failed to get composition reference of composite resource", "error", err) - return + xrs := xrList.GetItems() + if len(xrs) == 0 { + return nil } - labels := prometheus.Labels{ - "gvk": gvk.String(), - "composition": composition, - } - r.compositeExists.With(labels).Set(float64(len(xrs.Items))) + labels := getLabels(xrs) + r.metrics.Exists.With(labels).Set(float64(len(xrs))) var numReady, numSynced, numComposed float64 = 0, 0, 0 - for _, xr := range xrs.Items { - conditioned := xpv1.ConditionedStatus{} - if err := fieldpath.Pave(xr.Object).GetValueInto("status", &conditioned); err != nil { - r.log.Info("Failed to get conditions of managed resource", "error", err) - continue - } - - for _, condition := range conditioned.Conditions { - if condition.Type == xpv1.TypeReady && condition.Status == corev1.ConditionTrue { - numReady++ - } else if condition.Type == xpv1.TypeSynced && condition.Status == corev1.ConditionTrue { - numSynced++ - } + for _, xr := range xrs { + if xr.GetCondition(xpv1.TypeReady).Status == corev1.ConditionTrue { + numReady++ } - resourceRefs := make([]v1.ObjectReference, 0) - if err := fieldpath.Pave(xr.Object).GetValueInto("spec.resourceRefs", &resourceRefs); err != nil { - r.log.Info("Failed to get resource references of composed resource", "error", err) - continue + if xr.GetCondition(xpv1.TypeSynced).Status == corev1.ConditionTrue { + numSynced++ } - numComposed += float64(len(resourceRefs)) + numComposed += float64(len(xr.GetResourceReferences())) } - r.compositeReady.With(labels).Set(numReady) - r.compositeSynced.With(labels).Set(numSynced) - r.compositeComposedCount.With(labels).Set(numComposed) + r.metrics.Ready.With(labels).Set(numReady) + r.metrics.Synced.With(labels).Set(numSynced) + r.metrics.ComposedCount.With(labels).Set(numComposed) + + return nil } -// Run records state of managed resources with given interval. -func (r *XRStateRecorder) Run(ctx context.Context, gvk schema.GroupVersionKind) { +// Start records state of managed resources with given interval. +func (r *XRStateRecorder) Start(ctx context.Context) error { ticker := time.NewTicker(r.interval) - go func() { - for { - select { - case <-ticker.C: - r.Record(ctx, gvk) - case <-ctx.Done(): - ticker.Stop() - return + for { + select { + case <-ticker.C: + if err := r.Record(ctx, r.compositeList); err != nil { + return err } + case <-ctx.Done(): + ticker.Stop() + return nil } - }() -} - -func getCompositionRef(l *unstructured.UnstructuredList) (string, error) { - if len(l.Items) == 0 { - return "", nil } +} - xr := l.Items[0].Object - compRef, err := fieldpath.Pave(xr).GetString("spec.compositionRef") - if err != nil { - return "", err +func getLabels(xrs []resource.Composite) prometheus.Labels { + xr := xrs[0] + labels := prometheus.Labels{ + "gvk": xr.GetObjectKind().GroupVersionKind().String(), + "composition": xr.GetCompositionReference().String(), } - return compRef, nil + return labels } From 249f4c09be05572976d52db555395f15d22185e4 Mon Sep 17 00:00:00 2001 From: ezgidemirel Date: Wed, 17 Apr 2024 17:42:36 +0300 Subject: [PATCH 6/6] remove XRStateMetrics Signed-off-by: ezgidemirel --- pkg/controller/options.go | 22 ++-- pkg/reconciler/managed/metrics.go | 20 ++-- pkg/resource/interfaces.go | 8 -- pkg/statemetrics/mr_state_metrics.go | 4 +- pkg/statemetrics/xr_state_metrics.go | 166 --------------------------- 5 files changed, 27 insertions(+), 193 deletions(-) delete mode 100644 pkg/statemetrics/xr_state_metrics.go diff --git a/pkg/controller/options.go b/pkg/controller/options.go index fd88082df..91132def2 100644 --- a/pkg/controller/options.go +++ b/pkg/controller/options.go @@ -55,9 +55,6 @@ type Options struct { // determine whether it has work to do. PollInterval time.Duration - // PollStateMetricInterval at which each controller should record state - PollStateMetricInterval time.Duration - // MaxConcurrentReconciles for each controller. MaxConcurrentReconciles int @@ -67,11 +64,8 @@ type Options struct { // ESSOptions for External Secret Stores. ESSOptions *ESSOptions - // MetricsRecorder to use for recording metrics. - MRMetrics managed.MetricRecorder - - // StateMetrics to use for recording state metrics. - StateMetrics *statemetrics.MRStateMetrics + // MetricOptions for recording metrics. + MetricOptions *MetricOptions } // ForControllerRuntime extracts options for controller-runtime. @@ -90,3 +84,15 @@ type ESSOptions struct { TLSConfig *tls.Config TLSSecretName *string } + +// MetricOptions for recording metrics. +type MetricOptions struct { + // PollStateMetricInterval at which each controller should record state + PollStateMetricInterval time.Duration + + // MetricsRecorder to use for recording metrics. + MRMetrics managed.MetricRecorder + + // MRStateMetrics to use for recording state metrics. + MRStateMetrics *statemetrics.MRStateMetrics +} diff --git a/pkg/reconciler/managed/metrics.go b/pkg/reconciler/managed/metrics.go index 0099ddf2b..fe5f717be 100644 --- a/pkg/reconciler/managed/metrics.go +++ b/pkg/reconciler/managed/metrics.go @@ -58,7 +58,7 @@ func NewMRMetricRecorder() *MRMetricRecorder { return &MRMetricRecorder{ mrDetected: prometheus.NewHistogramVec(prometheus.HistogramOpts{ Subsystem: subSystem, - Name: "managed_resource_time_to_first_reconcile_seconds", + Name: "managed_resource_first_time_to_reconcile_seconds", Help: "The time it took for a managed resource to be detected by the controller", Buckets: kmetrics.ExponentialBuckets(10e-9, 10, 10), }, []string{"gvk"}), @@ -109,8 +109,7 @@ func (r *MRMetricRecorder) recordUnchanged(name string) { func (r *MRMetricRecorder) recordFirstTimeReconciled(managed resource.Managed) { if managed.GetCondition(xpv1.TypeSynced).Status == corev1.ConditionUnknown { - r.mrDetected.WithLabelValues("gvk", managed.GetObjectKind().GroupVersionKind().String()). - Observe(time.Since(managed.GetCreationTimestamp().Time).Seconds()) + r.mrDetected.With(getLabels(managed)).Observe(time.Since(managed.GetCreationTimestamp().Time).Seconds()) r.firstObservation.Store(managed.GetName(), time.Now()) // this is the first time we reconciled on this resource } } @@ -126,15 +125,13 @@ func (r *MRMetricRecorder) recordDrift(managed resource.Managed) { return } - r.mrDrift.WithLabelValues("gvk", managed.GetObjectKind().GroupVersionKind().String()). - Observe(time.Since(lt).Seconds()) + r.mrDrift.With(getLabels(managed)).Observe(time.Since(lt).Seconds()) r.lastObservation.Store(name, time.Now()) } func (r *MRMetricRecorder) recordDeleted(managed resource.Managed) { - r.mrDeletion.WithLabelValues("gvk", managed.GetObjectKind().GroupVersionKind().String()). - Observe(time.Since(managed.GetDeletionTimestamp().Time).Seconds()) + r.mrDeletion.With(getLabels(managed)).Observe(time.Since(managed.GetDeletionTimestamp().Time).Seconds()) } func (r *MRMetricRecorder) recordFirstTimeReady(managed resource.Managed) { @@ -145,8 +142,7 @@ func (r *MRMetricRecorder) recordFirstTimeReady(managed resource.Managed) { if !ok { return } - r.mrFirstTimeReady.WithLabelValues("gvk", managed.GetObjectKind().GroupVersionKind().String()). - Observe(time.Since(managed.GetCreationTimestamp().Time).Seconds()) + r.mrFirstTimeReady.With(getLabels(managed)).Observe(time.Since(managed.GetCreationTimestamp().Time).Seconds()) r.firstObservation.Delete(managed.GetName()) } } @@ -174,3 +170,9 @@ func (r *NopMetricRecorder) recordDrift(_ resource.Managed) {} func (r *NopMetricRecorder) recordDeleted(_ resource.Managed) {} func (r *NopMetricRecorder) recordFirstTimeReady(_ resource.Managed) {} + +func getLabels(r resource.Managed) prometheus.Labels { + return prometheus.Labels{ + "gvk": r.GetObjectKind().GroupVersionKind().String(), + } +} diff --git a/pkg/resource/interfaces.go b/pkg/resource/interfaces.go index bd32b6ed9..75c600542 100644 --- a/pkg/resource/interfaces.go +++ b/pkg/resource/interfaces.go @@ -247,14 +247,6 @@ type Composite interface { //nolint:interfacebloat // This interface has to be b ConnectionDetailsPublishedTimer } -// A CompositeList is a list of composite resources. -type CompositeList interface { - client.ObjectList - - // GetItems returns the list of composite resources. - GetItems() []Composite -} - // Composed resources can be a composed into a Composite resource. type Composed interface { Object diff --git a/pkg/statemetrics/mr_state_metrics.go b/pkg/statemetrics/mr_state_metrics.go index 28e52fb41..eaa444ee5 100644 --- a/pkg/statemetrics/mr_state_metrics.go +++ b/pkg/statemetrics/mr_state_metrics.go @@ -86,11 +86,11 @@ type MRStateRecorder struct { } // NewMRStateRecorder returns a new MRStateRecorder which records the state of managed resources. -func NewMRStateRecorder(client client.Client, log logging.Logger, m *MRStateMetrics, managedList resource.ManagedList, interval time.Duration) *MRStateRecorder { +func NewMRStateRecorder(client client.Client, log logging.Logger, metrics *MRStateMetrics, managedList resource.ManagedList, interval time.Duration) *MRStateRecorder { return &MRStateRecorder{ client: client, log: log, - metrics: m, + metrics: metrics, managedList: managedList, interval: interval, } diff --git a/pkg/statemetrics/xr_state_metrics.go b/pkg/statemetrics/xr_state_metrics.go deleted file mode 100644 index f2ebe65c7..000000000 --- a/pkg/statemetrics/xr_state_metrics.go +++ /dev/null @@ -1,166 +0,0 @@ -/* -Copyright 2024 The Crossplane Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package statemetrics - -import ( - "context" - "time" - - "github.com/prometheus/client_golang/prometheus" - corev1 "k8s.io/api/core/v1" - "sigs.k8s.io/controller-runtime/pkg/client" - - xpv1 "github.com/crossplane/crossplane-runtime/apis/common/v1" - "github.com/crossplane/crossplane-runtime/pkg/logging" - "github.com/crossplane/crossplane-runtime/pkg/resource" -) - -// A XRStateMetrics holds Prometheus metrics for composite resources. -type XRStateMetrics struct { - Exists *prometheus.GaugeVec - Ready *prometheus.GaugeVec - Synced *prometheus.GaugeVec - ComposedCount *prometheus.GaugeVec -} - -// NewXRStateMetrics returns a new XRStateMetrics. -func NewXRStateMetrics() *XRStateMetrics { - return &XRStateMetrics{ - Exists: prometheus.NewGaugeVec(prometheus.GaugeOpts{ - Subsystem: subSystem, - Name: "composite_resource_exists", - Help: "The number of composite resources that exist", - }, []string{"gvk", "composition"}), - Ready: prometheus.NewGaugeVec(prometheus.GaugeOpts{ - Subsystem: subSystem, - Name: "composite_resource_ready", - Help: "The number of composite resources in Ready=True state", - }, []string{"gvk", "composition"}), - Synced: prometheus.NewGaugeVec(prometheus.GaugeOpts{ - Subsystem: subSystem, - Name: "composite_resource_synced", - Help: "The number of composite resources in Synced=True state", - }, []string{"gvk", "composition"}), - ComposedCount: prometheus.NewGaugeVec(prometheus.GaugeOpts{ - Subsystem: subSystem, - Name: "composite_resource_composed_count", - Help: "The number of composed resources in total", - }, []string{"gvk", "composition"}), - } -} - -// Describe sends the super-set of all possible descriptors of metrics -// collected by this Collector to the provided channel and returns once -// the last descriptor has been sent. -func (r *XRStateMetrics) Describe(ch chan<- *prometheus.Desc) { - r.Exists.Describe(ch) - r.Ready.Describe(ch) - r.Synced.Describe(ch) - r.ComposedCount.Describe(ch) -} - -// Collect is called by the Prometheus registry when collecting -// metrics. The implementation sends each collected metric via the -// provided channel and returns once the last metric has been sent. -func (r *XRStateMetrics) Collect(ch chan<- prometheus.Metric) { - r.Exists.Collect(ch) - r.Ready.Collect(ch) - r.Synced.Collect(ch) - r.ComposedCount.Collect(ch) -} - -// A XRStateRecorder records the state of composite resources. -type XRStateRecorder struct { - client client.Client - log logging.Logger - interval time.Duration - compositeList resource.CompositeList - - metrics *XRStateMetrics -} - -// NewXRStateRecorder returns a new XRStateRecorder which records the state xr resources. -func NewXRStateRecorder(client client.Client, log logging.Logger, metrics *XRStateMetrics, compositeList resource.CompositeList, interval time.Duration) *XRStateRecorder { - return &XRStateRecorder{ - client: client, - log: log, - metrics: metrics, - compositeList: compositeList, - interval: interval, - } -} - -// Record records the state of managed resources. -func (r *XRStateRecorder) Record(ctx context.Context, xrList resource.CompositeList) error { - if err := r.client.List(ctx, xrList); err != nil { - r.log.Info("Failed to list composite resources", "error", err) - return err - } - - xrs := xrList.GetItems() - if len(xrs) == 0 { - return nil - } - - labels := getLabels(xrs) - r.metrics.Exists.With(labels).Set(float64(len(xrs))) - - var numReady, numSynced, numComposed float64 = 0, 0, 0 - for _, xr := range xrs { - if xr.GetCondition(xpv1.TypeReady).Status == corev1.ConditionTrue { - numReady++ - } - - if xr.GetCondition(xpv1.TypeSynced).Status == corev1.ConditionTrue { - numSynced++ - } - - numComposed += float64(len(xr.GetResourceReferences())) - } - - r.metrics.Ready.With(labels).Set(numReady) - r.metrics.Synced.With(labels).Set(numSynced) - r.metrics.ComposedCount.With(labels).Set(numComposed) - - return nil -} - -// Start records state of managed resources with given interval. -func (r *XRStateRecorder) Start(ctx context.Context) error { - ticker := time.NewTicker(r.interval) - for { - select { - case <-ticker.C: - if err := r.Record(ctx, r.compositeList); err != nil { - return err - } - case <-ctx.Done(): - ticker.Stop() - return nil - } - } -} - -func getLabels(xrs []resource.Composite) prometheus.Labels { - xr := xrs[0] - labels := prometheus.Labels{ - "gvk": xr.GetObjectKind().GroupVersionKind().String(), - "composition": xr.GetCompositionReference().String(), - } - - return labels -}