forked from ray-project/kuberay
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[Serve] Unify logger and add user facing events (ray-project#378)
- Loading branch information
Showing
1 changed file
with
39 additions
and
37 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,14 +26,13 @@ import ( | |
"k8s.io/apimachinery/pkg/runtime" | ||
ctrl "sigs.k8s.io/controller-runtime" | ||
"sigs.k8s.io/controller-runtime/pkg/client" | ||
logf "sigs.k8s.io/controller-runtime/pkg/log" | ||
|
||
rayv1alpha1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1alpha1" | ||
) | ||
|
||
var ( | ||
rayServiceLog = logf.Log.WithName("rayservice-controller") | ||
ServeDeploymentUnhealthySecondThreshold = 60.0 // Move to var for unit testing. | ||
// This variable is mutable for unit testing purpose. | ||
ServeDeploymentUnhealthySecondThreshold = 60.0 | ||
) | ||
|
||
const ( | ||
|
@@ -85,53 +84,46 @@ func NewRayServiceReconciler(mgr manager.Manager) *RayServiceReconciler { | |
|
||
// Reconcile is part of the main kubernetes reconciliation loop which aims to | ||
// move the current state of the cluster closer to the desired state. | ||
// TODO(user): Modify the Reconcile function to compare the state specified by | ||
// the RayService object against the actual cluster state, and then | ||
// perform operations to make the cluster state reflect the state specified by | ||
// the user. | ||
// | ||
// This the top level reconciliation flow for RayService. | ||
// | ||
// For more details, check Reconcile and its Result here: | ||
// - https://pkg.go.dev/sigs.k8s.io/[email protected]/pkg/reconcile | ||
func (r *RayServiceReconciler) Reconcile(ctx context.Context, request ctrl.Request) (ctrl.Result, error) { | ||
_ = r.Log.WithValues("rayservice", request.NamespacedName) | ||
rayServiceLog.Info("reconciling RayService", "service NamespacedName", request.NamespacedName) | ||
var logger = r.Log.WithValues("ServiceName", request.NamespacedName) | ||
var isHealthy bool = false | ||
|
||
// Get serving cluster instance | ||
var err error | ||
var rayServiceInstance *rayv1alpha1.RayService | ||
var err error | ||
var ctrlResult ctrl.Result | ||
|
||
// Resolve the CR from request. | ||
if rayServiceInstance, err = r.getRayServiceInstance(ctx, request); err != nil { | ||
return ctrl.Result{}, client.IgnoreNotFound(err) | ||
} | ||
r.cleanUpServeConfigCache(rayServiceInstance) | ||
|
||
// Reconcile RayCluster, check if we need to create a new RayCluster. | ||
logger.Info("Reconciling the cluster component.") | ||
// Find active and pending ray cluster objects given current service name. | ||
var activeRayClusterInstance *rayv1alpha1.RayCluster | ||
var pendingRayClusterInstance *rayv1alpha1.RayCluster | ||
if activeRayClusterInstance, pendingRayClusterInstance, err = r.reconcileRayCluster(ctx, rayServiceInstance); err != nil { | ||
err = r.updateState(ctx, rayServiceInstance, rayv1alpha1.FailedToGetOrCreateRayCluster, err) | ||
return ctrl.Result{}, client.IgnoreNotFound(err) | ||
} | ||
|
||
// Reconcile Serve Configs. | ||
r.cleanUpServeConfigCache(rayServiceInstance) | ||
|
||
rayServiceLog.Info("Done reconcileRayCluster") | ||
|
||
// Check if we need to create pending RayCluster. | ||
if rayServiceInstance.Status.PendingServiceStatus.RayClusterName != "" && pendingRayClusterInstance == nil { | ||
// Update RayService Status since reconcileRayCluster may mark RayCluster restart. | ||
if errStatus := r.Status().Update(ctx, rayServiceInstance); errStatus != nil { | ||
rayServiceLog.Error(errStatus, "Fail to update status of RayService", "rayServiceInstance", rayServiceInstance) | ||
logger.Error(errStatus, "Fail to update status of RayService after RayCluster changes", "rayServiceInstance", rayServiceInstance) | ||
return ctrl.Result{}, err | ||
} | ||
rayServiceLog.Info("Done reconcileRayCluster update status") | ||
rayServiceLog.Info("Enter next loop to create new ray cluster.") | ||
logger.Info("Done reconcileRayCluster update status, enter next loop to create new ray cluster.") | ||
return ctrl.Result{}, nil | ||
} | ||
|
||
isHealthy := false | ||
var ctrlResult ctrl.Result | ||
|
||
logger.Info("Reconciling the Serve component.") | ||
/* | ||
Update ray cluster for 4 possible situations. | ||
If a ray cluster does not exist, clear its status. | ||
|
@@ -140,20 +132,20 @@ func (r *RayServiceReconciler) Reconcile(ctx context.Context, request ctrl.Reque | |
*/ | ||
if activeRayClusterInstance != nil && pendingRayClusterInstance == nil { | ||
rayServiceInstance.Status.PendingServiceStatus = rayv1alpha1.RayServiceStatus{} | ||
if ctrlResult, isHealthy, err = r.reconcileServe(ctx, rayServiceInstance, activeRayClusterInstance, true); err != nil { | ||
if ctrlResult, isHealthy, err = r.reconcileServe(ctx, rayServiceInstance, activeRayClusterInstance, true, logger); err != nil { | ||
return ctrlResult, err | ||
} | ||
} else if activeRayClusterInstance != nil && pendingRayClusterInstance != nil { | ||
if err = r.updateStatusForActiveCluster(ctx, rayServiceInstance, activeRayClusterInstance); err != nil { | ||
rayServiceLog.Error(err, "Fail when check the status for active ray cluster while we have pending cluster.") | ||
if err = r.updateStatusForActiveCluster(ctx, rayServiceInstance, activeRayClusterInstance, logger); err != nil { | ||
logger.Error(err, "The updating of the status for active ray cluster while we have pending cluster failed") | ||
} | ||
|
||
if ctrlResult, isHealthy, err = r.reconcileServe(ctx, rayServiceInstance, pendingRayClusterInstance, false); err != nil { | ||
if ctrlResult, isHealthy, err = r.reconcileServe(ctx, rayServiceInstance, pendingRayClusterInstance, false, logger); err != nil { | ||
return ctrlResult, err | ||
} | ||
} else if activeRayClusterInstance == nil && pendingRayClusterInstance != nil { | ||
rayServiceInstance.Status.ActiveServiceStatus = rayv1alpha1.RayServiceStatus{} | ||
if ctrlResult, isHealthy, err = r.reconcileServe(ctx, rayServiceInstance, pendingRayClusterInstance, false); err != nil { | ||
if ctrlResult, isHealthy, err = r.reconcileServe(ctx, rayServiceInstance, pendingRayClusterInstance, false, logger); err != nil { | ||
return ctrlResult, err | ||
} | ||
} else { | ||
|
@@ -162,9 +154,12 @@ func (r *RayServiceReconciler) Reconcile(ctx context.Context, request ctrl.Reque | |
} | ||
|
||
if !isHealthy { | ||
logger.Info(fmt.Sprintf("Cluster is not healthy, checking again in %s", ServiceRestartRequeueDuration)) | ||
r.Recorder.Eventf(rayServiceInstance, "Normal", "ServiceUnhealthy", "The service is in an unhealthy state. Controller will perform a round of actions in %s.", ServiceRestartRequeueDuration) | ||
return ctrl.Result{RequeueAfter: ServiceRestartRequeueDuration}, nil | ||
} | ||
|
||
logger.Info("Reconciling the ingress and service resources.") | ||
// Get the healthy ray cluster instance for service and ingress update. | ||
rayClusterInstance := activeRayClusterInstance | ||
if pendingRayClusterInstance != nil { | ||
|
@@ -190,8 +185,9 @@ func (r *RayServiceReconciler) Reconcile(ctx context.Context, request ctrl.Reque | |
} | ||
} | ||
|
||
// Final status update for any CR modification. | ||
if errStatus := r.Status().Update(ctx, rayServiceInstance); errStatus != nil { | ||
rayServiceLog.Error(errStatus, "Fail to update status of RayService", "rayServiceInstance", rayServiceInstance) | ||
logger.Error(errStatus, "Fail to update status of RayService", "rayServiceInstance", rayServiceInstance) | ||
return ctrl.Result{}, err | ||
} | ||
|
||
|
@@ -226,6 +222,7 @@ func (r *RayServiceReconciler) updateState(ctx context.Context, rayServiceInstan | |
if errStatus := r.Status().Update(ctx, rayServiceInstance); errStatus != nil { | ||
return fmtErrors.Errorf("combined error: %v %v", err, errStatus) | ||
} | ||
r.Recorder.Event(rayServiceInstance, "Normal", string(status), err.Error()) | ||
return err | ||
} | ||
|
||
|
@@ -717,7 +714,7 @@ func (r *RayServiceReconciler) reconcileServices(ctx context.Context, rayService | |
return nil | ||
} | ||
|
||
func (r *RayServiceReconciler) updateStatusForActiveCluster(ctx context.Context, rayServiceInstance *rayv1alpha1.RayService, rayClusterInstance *rayv1alpha1.RayCluster) error { | ||
func (r *RayServiceReconciler) updateStatusForActiveCluster(ctx context.Context, rayServiceInstance *rayv1alpha1.RayService, rayClusterInstance *rayv1alpha1.RayCluster, logger logr.Logger) error { | ||
rayServiceInstance.Status.ActiveServiceStatus.RayClusterStatus = rayClusterInstance.Status | ||
|
||
var err error | ||
|
@@ -740,12 +737,12 @@ func (r *RayServiceReconciler) updateStatusForActiveCluster(ctx context.Context, | |
|
||
r.updateAndCheckDashboardStatus(rayServiceStatus, true) | ||
|
||
rayServiceLog.Info("Check serve health", "isHealthy", isHealthy) | ||
logger.Info("Check serve health", "isHealthy", isHealthy) | ||
|
||
return err | ||
} | ||
|
||
func (r *RayServiceReconciler) reconcileServe(ctx context.Context, rayServiceInstance *rayv1alpha1.RayService, rayClusterInstance *rayv1alpha1.RayCluster, isActive bool) (ctrl.Result, bool, error) { | ||
func (r *RayServiceReconciler) reconcileServe(ctx context.Context, rayServiceInstance *rayv1alpha1.RayService, rayClusterInstance *rayv1alpha1.RayCluster, isActive bool, logger logr.Logger) (ctrl.Result, bool, error) { | ||
rayServiceInstance.Status.ActiveServiceStatus.RayClusterStatus = rayClusterInstance.Status | ||
var err error | ||
var clientURL string | ||
|
@@ -760,7 +757,7 @@ func (r *RayServiceReconciler) reconcileServe(ctx context.Context, rayServiceIns | |
|
||
if clientURL, err = r.fetchDashboardAgentURL(ctx, rayClusterInstance); err != nil || clientURL == "" { | ||
if !r.updateAndCheckDashboardStatus(rayServiceStatus, false) { | ||
rayServiceLog.Info("Dashboard is unhealthy, restart the cluster.") | ||
logger.Info("Dashboard is unhealthy, restart the cluster.") | ||
r.markRestart(rayServiceInstance) | ||
} | ||
err = r.updateState(ctx, rayServiceInstance, rayv1alpha1.WaitForDashboard, err) | ||
|
@@ -775,18 +772,21 @@ func (r *RayServiceReconciler) reconcileServe(ctx context.Context, rayServiceIns | |
if shouldUpdate { | ||
if err = r.updateServeDeployment(rayServiceInstance, rayDashboardClient, rayClusterInstance.Name); err != nil { | ||
if !r.updateAndCheckDashboardStatus(rayServiceStatus, false) { | ||
rayServiceLog.Info("Dashboard is unhealthy, restart the cluster.") | ||
logger.Info("Dashboard is unhealthy, restart the cluster.") | ||
r.markRestart(rayServiceInstance) | ||
} | ||
err = r.updateState(ctx, rayServiceInstance, rayv1alpha1.FailedServeDeploy, err) | ||
return ctrl.Result{}, false, err | ||
} | ||
|
||
r.Recorder.Eventf(rayServiceInstance, "Normal", "SubmittedServeDeployment", | ||
"Controller sent API request to update Serve deployments on cluster %s", rayClusterInstance.Name) | ||
} | ||
|
||
var isHealthy bool | ||
if isHealthy, err = r.getAndCheckServeStatus(rayDashboardClient, rayServiceStatus); err != nil { | ||
if !r.updateAndCheckDashboardStatus(rayServiceStatus, false) { | ||
rayServiceLog.Info("Dashboard is unhealthy, restart the cluster.") | ||
logger.Info("Dashboard is unhealthy, restart the cluster.") | ||
r.markRestart(rayServiceInstance) | ||
} | ||
err = r.updateState(ctx, rayServiceInstance, rayv1alpha1.FailedToGetServeDeploymentStatus, err) | ||
|
@@ -795,13 +795,14 @@ func (r *RayServiceReconciler) reconcileServe(ctx context.Context, rayServiceIns | |
|
||
r.updateAndCheckDashboardStatus(rayServiceStatus, true) | ||
|
||
rayServiceLog.Info("Check serve health", "isHealthy", isHealthy, "isActive", isActive) | ||
logger.Info("Check serve health", "isHealthy", isHealthy, "isActive", isActive) | ||
|
||
if isHealthy { | ||
rayServiceInstance.Status.ServiceStatus = rayv1alpha1.Running | ||
if r.allServeDeploymentsHealthy(rayServiceInstance, rayServiceStatus) { | ||
// Preparing RayCluster is ready. | ||
r.updateRayClusterInfo(rayServiceInstance, rayClusterInstance.Name) | ||
r.Recorder.Event(rayServiceInstance, "Normal", "Running", "The Serve applicaton is now running and healthy.") | ||
} | ||
} else { | ||
r.markRestart(rayServiceInstance) | ||
|
@@ -810,7 +811,8 @@ func (r *RayServiceReconciler) reconcileServe(ctx context.Context, rayServiceIns | |
return ctrl.Result{}, false, err | ||
} | ||
|
||
rayServiceLog.V(1).Info("Mark cluster as unhealthy", "rayCluster", rayClusterInstance) | ||
logger.Info("Mark cluster as unhealthy", "rayCluster", rayClusterInstance) | ||
r.Recorder.Eventf(rayServiceInstance, "Normal", "Restarting", "The cluster will restart after %s", ServiceRestartRequeueDuration) | ||
// Wait a while for the cluster delete | ||
return ctrl.Result{RequeueAfter: ServiceRestartRequeueDuration}, false, nil | ||
} | ||
|