Skip to content

Commit

Permalink
Add more metrics to Flinkk8soperator (#41)
Browse files Browse the repository at this point in the history
  • Loading branch information
anandswaminathan authored Jul 9, 2019
1 parent 9a95850 commit 7fda241
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 25 deletions.
17 changes: 10 additions & 7 deletions pkg/controller/flinkapplication/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,19 @@ type ReconcileFlinkApplication struct {
}

type reconcilerMetrics struct {
scope promutils.Scope
cacheHit labeled.Counter
cacheMiss labeled.Counter
scope promutils.Scope
cacheHit labeled.Counter
cacheMiss labeled.Counter
reconcileError labeled.Counter
}

func newReconcilerMetrics(scope promutils.Scope) *reconcilerMetrics {
reconcilerScope := scope.NewSubScope("reconciler")
return &reconcilerMetrics{
scope: reconcilerScope,
cacheHit: labeled.NewCounter("cache_hit", "Flink application resource fetched from cache", reconcilerScope),
cacheMiss: labeled.NewCounter("cache_miss", "Flink application resource missing from cache", reconcilerScope),
scope: reconcilerScope,
cacheHit: labeled.NewCounter("cache_hit", "Flink application resource fetched from cache", reconcilerScope),
cacheMiss: labeled.NewCounter("cache_miss", "Flink application resource missing from cache", reconcilerScope),
reconcileError: labeled.NewCounter("reconcile_error", "Reconcile for application failed", reconcilerScope),
}
}

Expand Down Expand Up @@ -108,6 +110,7 @@ func (r *ReconcileFlinkApplication) Reconcile(request reconcile.Request) (reconc
ctx = contextutils.WithPhase(ctx, string(instance.Status.Phase))
err = r.flinkStateMachine.Handle(ctx, instance)
if err != nil {
r.metrics.reconcileError.Inc(ctx)
logger.Warnf(ctx, "Failed to reconcile resource %v: %v", request.NamespacedName, err)
}
return r.getReconcileResultForError(err), err
Expand All @@ -116,7 +119,7 @@ func (r *ReconcileFlinkApplication) Reconcile(request reconcile.Request) (reconc
// Add creates a new FlinkApplication Controller and adds it to the Manager. The Manager will set fields on the Controller
// and Start it when the Manager is Started.
func Add(ctx context.Context, mgr manager.Manager, cfg config.RuntimeConfig) error {
k8sCluster := k8.NewK8Cluster(mgr)
k8sCluster := k8.NewK8Cluster(mgr, cfg)
eventRecorder := mgr.GetEventRecorderFor(config.AppName)
flinkStateMachine := NewFlinkStateMachine(k8sCluster, eventRecorder, cfg)

Expand Down
96 changes: 78 additions & 18 deletions pkg/controller/k8/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ package k8
import (
"context"

"github.com/lyft/flinkk8soperator/pkg/controller/config"
"github.com/lyft/flytestdlib/logger"
"github.com/lyft/flytestdlib/promutils"
"github.com/lyft/flytestdlib/promutils/labeled"
v1 "k8s.io/api/apps/v1"
coreV1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -36,16 +39,48 @@ type ClusterInterface interface {
DeleteK8Object(ctx context.Context, object runtime.Object) error
}

func NewK8Cluster(mgr manager.Manager) ClusterInterface {
func NewK8Cluster(mgr manager.Manager, cfg config.RuntimeConfig) ClusterInterface {
metrics := newK8ClusterMetrics(cfg.MetricsScope)
return &Cluster{
cache: mgr.GetCache(),
client: mgr.GetClient(),
cache: mgr.GetCache(),
client: mgr.GetClient(),
metrics: metrics,
}
}

func newK8ClusterMetrics(scope promutils.Scope) *k8ClusterMetrics {
k8ClusterScope := scope.NewSubScope("k8_cluster")
return &k8ClusterMetrics{
scope: k8ClusterScope,
createSuccess: labeled.NewCounter("create_success", "K8 object created successfully", k8ClusterScope),
createFailure: labeled.NewCounter("create_failure", "K8 object creation failed", k8ClusterScope),
updateSuccess: labeled.NewCounter("update_success", "K8 object updated successfully", k8ClusterScope),
updateFailure: labeled.NewCounter("update_failure", "K8 object update failed", k8ClusterScope),
deleteSuccess: labeled.NewCounter("delete_success", "K8 object deleted successfully", k8ClusterScope),
deleteFailure: labeled.NewCounter("delete_failure", "K8 object deletion failed", k8ClusterScope),
getDeploymentCacheHit: labeled.NewCounter("get_deployment_cache_hit", "Deployment fetched from cache", k8ClusterScope),
getDeploymentCacheMiss: labeled.NewCounter("get_deployment_cache_miss", "Deployment not present in the cache", k8ClusterScope),
getDeploymentFailure: labeled.NewCounter("get_deployment_failure", "Get Deployment failed", k8ClusterScope),
}
}

type Cluster struct {
cache cache.Cache
client client.Client
cache cache.Cache
client client.Client
metrics *k8ClusterMetrics
}

type k8ClusterMetrics struct {
scope promutils.Scope
createSuccess labeled.Counter
createFailure labeled.Counter
updateSuccess labeled.Counter
updateFailure labeled.Counter
deleteSuccess labeled.Counter
deleteFailure labeled.Counter
getDeploymentCacheHit labeled.Counter
getDeploymentCacheMiss labeled.Counter
getDeploymentFailure labeled.Counter
}

func (k *Cluster) GetService(ctx context.Context, namespace string, name string) (*coreV1.Service, error) {
Expand Down Expand Up @@ -87,18 +122,22 @@ func (k *Cluster) GetDeploymentsWithLabel(ctx context.Context, namespace string,
}
listOptionsFunc := client.UseListOptions(options)
err := k.cache.List(ctx, deploymentList, listOptionsFunc)
if err != nil {
if IsK8sObjectDoesNotExist(err) {
err := k.client.List(ctx, deploymentList, listOptionsFunc)
if err != nil {
logger.Warnf(ctx, "Failed to list deployments %v", err)
return nil, err
}
if err == nil {
k.metrics.getDeploymentCacheHit.Inc(ctx)
return deploymentList, nil
}
if IsK8sObjectDoesNotExist(err) {
k.metrics.getDeploymentCacheMiss.Inc(ctx)
err := k.client.List(ctx, deploymentList, listOptionsFunc)
if err != nil {
k.metrics.getDeploymentFailure.Inc(ctx)
logger.Warnf(ctx, "Failed to list deployments %v", err)
return nil, err
}
logger.Warnf(ctx, "Failed to list deployments from cache %v", err)
return nil, err
return deploymentList, nil
}
return deploymentList, nil
logger.Warnf(ctx, "Failed to list deployments from cache %v", err)
return nil, err
}

func (k *Cluster) GetServicesWithLabel(ctx context.Context, namespace string, labelMap map[string]string) (*coreV1.ServiceList, error) {
Expand Down Expand Up @@ -131,15 +170,36 @@ func (k *Cluster) GetServicesWithLabel(ctx context.Context, namespace string, la

func (k *Cluster) CreateK8Object(ctx context.Context, object runtime.Object) error {
objCreate := object.DeepCopyObject()
return k.client.Create(ctx, objCreate)
err := k.client.Create(ctx, objCreate)
if err != nil {
logger.Errorf(ctx, "K8 object creation failed %v", err)
k.metrics.createFailure.Inc(ctx)
return err
}
k.metrics.createSuccess.Inc(ctx)
return nil
}

func (k *Cluster) UpdateK8Object(ctx context.Context, object runtime.Object) error {
objUpdate := object.DeepCopyObject()
return k.client.Update(ctx, objUpdate)
err := k.client.Update(ctx, objUpdate)
if err != nil {
logger.Errorf(ctx, "K8 object update failed %v", err)
k.metrics.updateFailure.Inc(ctx)
return err
}
k.metrics.updateSuccess.Inc(ctx)
return nil
}

func (k *Cluster) DeleteK8Object(ctx context.Context, object runtime.Object) error {
objDelete := object.DeepCopyObject()
return k.client.Delete(ctx, objDelete)
err := k.client.Delete(ctx, objDelete)
if err != nil {
logger.Errorf(ctx, "K8 object delete failed %v", err)
k.metrics.deleteFailure.Inc(ctx)
return err
}
k.metrics.deleteSuccess.Inc(ctx)
return nil
}

0 comments on commit 7fda241

Please sign in to comment.