From a3b8fc69abcd1ca3dd8b6cbaad2039b82f480bad Mon Sep 17 00:00:00 2001 From: Simon Mo Date: Thu, 14 Jul 2022 20:12:33 -0700 Subject: [PATCH] [Serve] Unify logger and add user facing events (#378) --- .../controllers/ray/rayservice_controller.go | 76 ++++++++++--------- 1 file changed, 39 insertions(+), 37 deletions(-) diff --git a/ray-operator/controllers/ray/rayservice_controller.go b/ray-operator/controllers/ray/rayservice_controller.go index 27f0b6411f..f810d7e2fe 100644 --- a/ray-operator/controllers/ray/rayservice_controller.go +++ b/ray-operator/controllers/ray/rayservice_controller.go @@ -26,14 +26,13 @@ import ( "k8s.io/apimachinery/pkg/runtime" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" - logf "sigs.k8s.io/controller-runtime/pkg/log" rayv1alpha1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1alpha1" ) var ( - rayServiceLog = logf.Log.WithName("rayservice-controller") - ServeDeploymentUnhealthySecondThreshold = 60.0 // Move to var for unit testing. + // This variable is mutable for unit testing purpose. + ServeDeploymentUnhealthySecondThreshold = 60.0 ) const ( @@ -85,26 +84,27 @@ func NewRayServiceReconciler(mgr manager.Manager) *RayServiceReconciler { // Reconcile is part of the main kubernetes reconciliation loop which aims to // move the current state of the cluster closer to the desired state. -// TODO(user): Modify the Reconcile function to compare the state specified by -// the RayService object against the actual cluster state, and then -// perform operations to make the cluster state reflect the state specified by -// the user. +// +// This the top level reconciliation flow for RayService. // // For more details, check Reconcile and its Result here: // - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.11.2/pkg/reconcile func (r *RayServiceReconciler) Reconcile(ctx context.Context, request ctrl.Request) (ctrl.Result, error) { - _ = r.Log.WithValues("rayservice", request.NamespacedName) - rayServiceLog.Info("reconciling RayService", "service NamespacedName", request.NamespacedName) + var logger = r.Log.WithValues("ServiceName", request.NamespacedName) + var isHealthy bool = false - // Get serving cluster instance - var err error var rayServiceInstance *rayv1alpha1.RayService + var err error + var ctrlResult ctrl.Result + // Resolve the CR from request. if rayServiceInstance, err = r.getRayServiceInstance(ctx, request); err != nil { return ctrl.Result{}, client.IgnoreNotFound(err) } + r.cleanUpServeConfigCache(rayServiceInstance) - // Reconcile RayCluster, check if we need to create a new RayCluster. + logger.Info("Reconciling the cluster component.") + // Find active and pending ray cluster objects given current service name. var activeRayClusterInstance *rayv1alpha1.RayCluster var pendingRayClusterInstance *rayv1alpha1.RayCluster if activeRayClusterInstance, pendingRayClusterInstance, err = r.reconcileRayCluster(ctx, rayServiceInstance); err != nil { @@ -112,26 +112,18 @@ func (r *RayServiceReconciler) Reconcile(ctx context.Context, request ctrl.Reque return ctrl.Result{}, client.IgnoreNotFound(err) } - // Reconcile Serve Configs. - r.cleanUpServeConfigCache(rayServiceInstance) - - rayServiceLog.Info("Done reconcileRayCluster") - // Check if we need to create pending RayCluster. if rayServiceInstance.Status.PendingServiceStatus.RayClusterName != "" && pendingRayClusterInstance == nil { // Update RayService Status since reconcileRayCluster may mark RayCluster restart. if errStatus := r.Status().Update(ctx, rayServiceInstance); errStatus != nil { - rayServiceLog.Error(errStatus, "Fail to update status of RayService", "rayServiceInstance", rayServiceInstance) + logger.Error(errStatus, "Fail to update status of RayService after RayCluster changes", "rayServiceInstance", rayServiceInstance) return ctrl.Result{}, err } - rayServiceLog.Info("Done reconcileRayCluster update status") - rayServiceLog.Info("Enter next loop to create new ray cluster.") + logger.Info("Done reconcileRayCluster update status, enter next loop to create new ray cluster.") return ctrl.Result{}, nil } - isHealthy := false - var ctrlResult ctrl.Result - + logger.Info("Reconciling the Serve component.") /* Update ray cluster for 4 possible situations. If a ray cluster does not exist, clear its status. @@ -140,20 +132,20 @@ func (r *RayServiceReconciler) Reconcile(ctx context.Context, request ctrl.Reque */ if activeRayClusterInstance != nil && pendingRayClusterInstance == nil { rayServiceInstance.Status.PendingServiceStatus = rayv1alpha1.RayServiceStatus{} - if ctrlResult, isHealthy, err = r.reconcileServe(ctx, rayServiceInstance, activeRayClusterInstance, true); err != nil { + if ctrlResult, isHealthy, err = r.reconcileServe(ctx, rayServiceInstance, activeRayClusterInstance, true, logger); err != nil { return ctrlResult, err } } else if activeRayClusterInstance != nil && pendingRayClusterInstance != nil { - if err = r.updateStatusForActiveCluster(ctx, rayServiceInstance, activeRayClusterInstance); err != nil { - rayServiceLog.Error(err, "Fail when check the status for active ray cluster while we have pending cluster.") + if err = r.updateStatusForActiveCluster(ctx, rayServiceInstance, activeRayClusterInstance, logger); err != nil { + logger.Error(err, "The updating of the status for active ray cluster while we have pending cluster failed") } - if ctrlResult, isHealthy, err = r.reconcileServe(ctx, rayServiceInstance, pendingRayClusterInstance, false); err != nil { + if ctrlResult, isHealthy, err = r.reconcileServe(ctx, rayServiceInstance, pendingRayClusterInstance, false, logger); err != nil { return ctrlResult, err } } else if activeRayClusterInstance == nil && pendingRayClusterInstance != nil { rayServiceInstance.Status.ActiveServiceStatus = rayv1alpha1.RayServiceStatus{} - if ctrlResult, isHealthy, err = r.reconcileServe(ctx, rayServiceInstance, pendingRayClusterInstance, false); err != nil { + if ctrlResult, isHealthy, err = r.reconcileServe(ctx, rayServiceInstance, pendingRayClusterInstance, false, logger); err != nil { return ctrlResult, err } } else { @@ -162,9 +154,12 @@ func (r *RayServiceReconciler) Reconcile(ctx context.Context, request ctrl.Reque } if !isHealthy { + logger.Info(fmt.Sprintf("Cluster is not healthy, checking again in %s", ServiceRestartRequeueDuration)) + r.Recorder.Eventf(rayServiceInstance, "Normal", "ServiceUnhealthy", "The service is in an unhealthy state. Controller will perform a round of actions in %s.", ServiceRestartRequeueDuration) return ctrl.Result{RequeueAfter: ServiceRestartRequeueDuration}, nil } + logger.Info("Reconciling the ingress and service resources.") // Get the healthy ray cluster instance for service and ingress update. rayClusterInstance := activeRayClusterInstance if pendingRayClusterInstance != nil { @@ -190,8 +185,9 @@ func (r *RayServiceReconciler) Reconcile(ctx context.Context, request ctrl.Reque } } + // Final status update for any CR modification. if errStatus := r.Status().Update(ctx, rayServiceInstance); errStatus != nil { - rayServiceLog.Error(errStatus, "Fail to update status of RayService", "rayServiceInstance", rayServiceInstance) + logger.Error(errStatus, "Fail to update status of RayService", "rayServiceInstance", rayServiceInstance) return ctrl.Result{}, err } @@ -226,6 +222,7 @@ func (r *RayServiceReconciler) updateState(ctx context.Context, rayServiceInstan if errStatus := r.Status().Update(ctx, rayServiceInstance); errStatus != nil { return fmtErrors.Errorf("combined error: %v %v", err, errStatus) } + r.Recorder.Event(rayServiceInstance, "Normal", string(status), err.Error()) return err } @@ -717,7 +714,7 @@ func (r *RayServiceReconciler) reconcileServices(ctx context.Context, rayService return nil } -func (r *RayServiceReconciler) updateStatusForActiveCluster(ctx context.Context, rayServiceInstance *rayv1alpha1.RayService, rayClusterInstance *rayv1alpha1.RayCluster) error { +func (r *RayServiceReconciler) updateStatusForActiveCluster(ctx context.Context, rayServiceInstance *rayv1alpha1.RayService, rayClusterInstance *rayv1alpha1.RayCluster, logger logr.Logger) error { rayServiceInstance.Status.ActiveServiceStatus.RayClusterStatus = rayClusterInstance.Status var err error @@ -740,12 +737,12 @@ func (r *RayServiceReconciler) updateStatusForActiveCluster(ctx context.Context, r.updateAndCheckDashboardStatus(rayServiceStatus, true) - rayServiceLog.Info("Check serve health", "isHealthy", isHealthy) + logger.Info("Check serve health", "isHealthy", isHealthy) return err } -func (r *RayServiceReconciler) reconcileServe(ctx context.Context, rayServiceInstance *rayv1alpha1.RayService, rayClusterInstance *rayv1alpha1.RayCluster, isActive bool) (ctrl.Result, bool, error) { +func (r *RayServiceReconciler) reconcileServe(ctx context.Context, rayServiceInstance *rayv1alpha1.RayService, rayClusterInstance *rayv1alpha1.RayCluster, isActive bool, logger logr.Logger) (ctrl.Result, bool, error) { rayServiceInstance.Status.ActiveServiceStatus.RayClusterStatus = rayClusterInstance.Status var err error var clientURL string @@ -760,7 +757,7 @@ func (r *RayServiceReconciler) reconcileServe(ctx context.Context, rayServiceIns if clientURL, err = r.fetchDashboardAgentURL(ctx, rayClusterInstance); err != nil || clientURL == "" { if !r.updateAndCheckDashboardStatus(rayServiceStatus, false) { - rayServiceLog.Info("Dashboard is unhealthy, restart the cluster.") + logger.Info("Dashboard is unhealthy, restart the cluster.") r.markRestart(rayServiceInstance) } err = r.updateState(ctx, rayServiceInstance, rayv1alpha1.WaitForDashboard, err) @@ -775,18 +772,21 @@ func (r *RayServiceReconciler) reconcileServe(ctx context.Context, rayServiceIns if shouldUpdate { if err = r.updateServeDeployment(rayServiceInstance, rayDashboardClient, rayClusterInstance.Name); err != nil { if !r.updateAndCheckDashboardStatus(rayServiceStatus, false) { - rayServiceLog.Info("Dashboard is unhealthy, restart the cluster.") + logger.Info("Dashboard is unhealthy, restart the cluster.") r.markRestart(rayServiceInstance) } err = r.updateState(ctx, rayServiceInstance, rayv1alpha1.FailedServeDeploy, err) return ctrl.Result{}, false, err } + + r.Recorder.Eventf(rayServiceInstance, "Normal", "SubmittedServeDeployment", + "Controller sent API request to update Serve deployments on cluster %s", rayClusterInstance.Name) } var isHealthy bool if isHealthy, err = r.getAndCheckServeStatus(rayDashboardClient, rayServiceStatus); err != nil { if !r.updateAndCheckDashboardStatus(rayServiceStatus, false) { - rayServiceLog.Info("Dashboard is unhealthy, restart the cluster.") + logger.Info("Dashboard is unhealthy, restart the cluster.") r.markRestart(rayServiceInstance) } err = r.updateState(ctx, rayServiceInstance, rayv1alpha1.FailedToGetServeDeploymentStatus, err) @@ -795,13 +795,14 @@ func (r *RayServiceReconciler) reconcileServe(ctx context.Context, rayServiceIns r.updateAndCheckDashboardStatus(rayServiceStatus, true) - rayServiceLog.Info("Check serve health", "isHealthy", isHealthy, "isActive", isActive) + logger.Info("Check serve health", "isHealthy", isHealthy, "isActive", isActive) if isHealthy { rayServiceInstance.Status.ServiceStatus = rayv1alpha1.Running if r.allServeDeploymentsHealthy(rayServiceInstance, rayServiceStatus) { // Preparing RayCluster is ready. r.updateRayClusterInfo(rayServiceInstance, rayClusterInstance.Name) + r.Recorder.Event(rayServiceInstance, "Normal", "Running", "The Serve applicaton is now running and healthy.") } } else { r.markRestart(rayServiceInstance) @@ -810,7 +811,8 @@ func (r *RayServiceReconciler) reconcileServe(ctx context.Context, rayServiceIns return ctrl.Result{}, false, err } - rayServiceLog.V(1).Info("Mark cluster as unhealthy", "rayCluster", rayClusterInstance) + logger.Info("Mark cluster as unhealthy", "rayCluster", rayClusterInstance) + r.Recorder.Eventf(rayServiceInstance, "Normal", "Restarting", "The cluster will restart after %s", ServiceRestartRequeueDuration) // Wait a while for the cluster delete return ctrl.Result{RequeueAfter: ServiceRestartRequeueDuration}, false, nil }