Skip to content

Commit

Permalink
Handle scheduler errors in controller and decide if retryable (#484)
Browse files Browse the repository at this point in the history
* Handle errors from scheduler load and unload pipeline

* Handle scheduler errors and decide if retryable

* stability fixes for when scheduler restarted

* review fixes

* decrease non-retryable errors for controller grpc calls to scheduler
  • Loading branch information
ukclivecox authored Oct 11, 2022
1 parent 4e50c80 commit 85b8c83
Show file tree
Hide file tree
Showing 15 changed files with 200 additions and 71 deletions.
34 changes: 28 additions & 6 deletions operator/controllers/mlops/experiment_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package mlops
import (
"context"

"github.com/go-logr/logr"
"sigs.k8s.io/controller-runtime/pkg/predicate"

"github.com/seldonio/seldon-core/operatorv2/pkg/constants"
Expand All @@ -44,7 +45,7 @@ type ExperimentReconciler struct {
Recorder record.EventRecorder
}

func (r *ExperimentReconciler) handleFinalizer(ctx context.Context, experiment *mlopsv1alpha1.Experiment) (bool, error) {
func (r *ExperimentReconciler) handleFinalizer(ctx context.Context, logger logr.Logger, experiment *mlopsv1alpha1.Experiment) (bool, error) {

// Check if we are being deleted or not
if experiment.ObjectMeta.DeletionTimestamp.IsZero() { // Not being deleted
Expand All @@ -59,8 +60,16 @@ func (r *ExperimentReconciler) handleFinalizer(ctx context.Context, experiment *
} else { // experiment is being deleted
if utils.ContainsStr(experiment.ObjectMeta.Finalizers, constants.ExperimentFinalizerName) {
// Handle unload in scheduler
if err := r.Scheduler.StopExperiment(ctx, experiment); err != nil {
return true, err
if err, retry := r.Scheduler.StopExperiment(ctx, experiment); err != nil {
if retry {
return true, err
} else {
experiment.ObjectMeta.Finalizers = utils.RemoveStr(experiment.ObjectMeta.Finalizers, constants.ExperimentFinalizerName)
if errUpdate := r.Update(ctx, experiment); errUpdate != nil {
logger.Error(err, "Failed to remove finalizer", "experiment", experiment.Name)
return true, err
}
}
}
}
// Stop reconciliation as the item is being deleted
Expand Down Expand Up @@ -97,19 +106,32 @@ func (r *ExperimentReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return reconcile.Result{}, err
}

stop, err := r.handleFinalizer(ctx, experiment)
stop, err := r.handleFinalizer(ctx, logger, experiment)
if stop {
return reconcile.Result{}, err
}

err = r.Scheduler.StartExperiment(ctx, experiment)
err, retry := r.Scheduler.StartExperiment(ctx, experiment)
if err != nil {
return reconcile.Result{}, err
r.updateStatusFromError(ctx, logger, experiment, err)
if retry {
return ctrl.Result{}, err
} else {
return ctrl.Result{}, nil
}

}

return ctrl.Result{}, nil
}

func (r *ExperimentReconciler) updateStatusFromError(ctx context.Context, logger logr.Logger, experiment *mlopsv1alpha1.Experiment, err error) {
experiment.Status.CreateAndSetCondition(mlopsv1alpha1.ModelReady, false, err.Error())
if errSet := r.Status().Update(ctx, experiment); errSet != nil {
logger.Error(errSet, "Failed to set status for experiment on error", "model", experiment.Name, "error", err.Error())
}
}

// SetupWithManager sets up the controller with the Manager.
func (r *ExperimentReconciler) SetupWithManager(mgr ctrl.Manager) error {
pred := predicate.GenerationChangedPredicate{}
Expand Down
35 changes: 28 additions & 7 deletions operator/controllers/mlops/model_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package mlops
import (
"context"

"github.com/go-logr/logr"

"github.com/seldonio/seldon-core/operatorv2/pkg/constants"

"k8s.io/client-go/tools/record"
Expand All @@ -45,7 +47,7 @@ type ModelReconciler struct {
Recorder record.EventRecorder
}

func (r *ModelReconciler) handleFinalizer(ctx context.Context, model *mlopsv1alpha1.Model) (bool, error) {
func (r *ModelReconciler) handleFinalizer(ctx context.Context, logger logr.Logger, model *mlopsv1alpha1.Model) (bool, error) {

// Check if we are being deleted or not
if model.ObjectMeta.DeletionTimestamp.IsZero() { // Not being deleted
Expand All @@ -60,8 +62,16 @@ func (r *ModelReconciler) handleFinalizer(ctx context.Context, model *mlopsv1alp
} else { // model is being deleted
if utils.ContainsStr(model.ObjectMeta.Finalizers, constants.ModelFinalizerName) {
// Handle unload in scheduler
if err := r.Scheduler.UnloadModel(ctx, model); err != nil {
return true, err
if err, retry := r.Scheduler.UnloadModel(ctx, model); err != nil {
if retry {
return true, err
} else {
model.ObjectMeta.Finalizers = utils.RemoveStr(model.ObjectMeta.Finalizers, constants.ModelFinalizerName)
if errUpdate := r.Update(ctx, model); errUpdate != nil {
logger.Error(err, "Failed to remove finalizer", "model", model.Name)
return true, err
}
}
}
}
// Stop reconciliation as the item is being deleted
Expand Down Expand Up @@ -90,20 +100,31 @@ func (r *ModelReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl
return reconcile.Result{}, err
}

stop, err := r.handleFinalizer(ctx, model)
stop, err := r.handleFinalizer(ctx, logger, model)
if stop {
return reconcile.Result{}, err
}

err = r.Scheduler.LoadModel(ctx, model)
err, retry := r.Scheduler.LoadModel(ctx, model)
if err != nil {
logger.Error(err, "Failed in call to load model")
return ctrl.Result{}, err
r.updateStatusFromError(ctx, logger, model, err)
if retry {
return ctrl.Result{}, err
} else {
return ctrl.Result{}, nil
}
}

return ctrl.Result{}, nil
}

func (r *ModelReconciler) updateStatusFromError(ctx context.Context, logger logr.Logger, model *mlopsv1alpha1.Model, err error) {
model.Status.CreateAndSetCondition(mlopsv1alpha1.ModelReady, false, err.Error())
if errSet := r.Status().Update(ctx, model); errSet != nil {
logger.Error(errSet, "Failed to set status for model on error", "model", model.Name, "error", err.Error())
}
}

// SetupWithManager sets up the controller with the Manager.
// Uses https://github.com/kubernetes-sigs/kubebuilder/issues/618#issuecomment-698018831
// This ensures we don't reconcile when just the status is updated by checking if generation changed
Expand Down
36 changes: 29 additions & 7 deletions operator/controllers/mlops/pipeline_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package mlops
import (
"context"

"github.com/go-logr/logr"

"sigs.k8s.io/controller-runtime/pkg/predicate"

"github.com/seldonio/seldon-core/operatorv2/pkg/constants"
Expand All @@ -44,7 +46,7 @@ type PipelineReconciler struct {
Recorder record.EventRecorder
}

func (r *PipelineReconciler) handleFinalizer(ctx context.Context, pipeline *mlopsv1alpha1.Pipeline) (bool, error) {
func (r *PipelineReconciler) handleFinalizer(ctx context.Context, logger logr.Logger, pipeline *mlopsv1alpha1.Pipeline) (bool, error) {

// Check if we are being deleted or not
if pipeline.ObjectMeta.DeletionTimestamp.IsZero() { // Not being deleted
Expand All @@ -59,8 +61,17 @@ func (r *PipelineReconciler) handleFinalizer(ctx context.Context, pipeline *mlop
} else { // pipeline is being deleted
if utils.ContainsStr(pipeline.ObjectMeta.Finalizers, constants.PipelineFinalizerName) {
// Handle unload in scheduler
if err := r.Scheduler.UnloadPipeline(ctx, pipeline); err != nil {
return true, err
if err, retry := r.Scheduler.UnloadPipeline(ctx, pipeline); err != nil {
if retry {
return true, err
} else {
// Remove pipeline anyway on error as we assume errors from scheduler are fatal here
pipeline.ObjectMeta.Finalizers = utils.RemoveStr(pipeline.ObjectMeta.Finalizers, constants.PipelineFinalizerName)
if errUpdate := r.Update(ctx, pipeline); errUpdate != nil {
logger.Error(err, "Failed to remove finalizer", "pipeline", pipeline.Name)
return true, err
}
}
}
}
// Stop reconciliation as the item is being deleted
Expand Down Expand Up @@ -98,19 +109,30 @@ func (r *PipelineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
return reconcile.Result{}, err
}

stop, err := r.handleFinalizer(ctx, pipeline)
stop, err := r.handleFinalizer(ctx, logger, pipeline)
if stop {
return reconcile.Result{}, err
}

err = r.Scheduler.LoadPipeline(ctx, pipeline)
err, retry := r.Scheduler.LoadPipeline(ctx, pipeline)
if err != nil {
return reconcile.Result{}, err
r.updateStatusFromError(ctx, logger, pipeline, err)
if retry {
return ctrl.Result{}, err
} else {
return ctrl.Result{}, nil
}
}

return ctrl.Result{}, nil
}

func (r *PipelineReconciler) updateStatusFromError(ctx context.Context, logger logr.Logger, pipeline *mlopsv1alpha1.Pipeline, err error) {
pipeline.Status.CreateAndSetCondition(mlopsv1alpha1.PipelineReady, false, err.Error())
if errSet := r.Status().Update(ctx, pipeline); errSet != nil {
logger.Error(errSet, "Failed to set status on pipeline on error", "pipeline", pipeline.Name, "error", err.Error())
}
}

// SetupWithManager sets up the controller with the Manager.
func (r *PipelineReconciler) SetupWithManager(mgr ctrl.Manager) error {
pred := predicate.GenerationChangedPredicate{}
Expand Down
26 changes: 26 additions & 0 deletions operator/scheduler/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import (
"math"
"time"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"google.golang.org/grpc/credentials/insecure"

"github.com/seldonio/seldon-core-v2/components/tls/pkg/tls"
Expand Down Expand Up @@ -76,3 +79,26 @@ func (s *SchedulerClient) ConnectToScheduler(host string, plainTxtPort int, tlsP
s.logger.Info("Connected to scheduler", "host", host, "port", port)
return nil
}

func (s *SchedulerClient) checkErrorRetryable(resource string, resourceName string, err error) bool {
if err != nil {
if st, ok := status.FromError(err); ok {
s.logger.Info("Got grpc status code", "err", err.Error(), "code", st.Code(), "resource", resource, "resourceName", resourceName)
switch st.Code() {
case codes.FailedPrecondition,
codes.Unimplemented:
s.logger.Info("Non retryable error", "code", st.Code(), "resource", resource, "resourceName", resourceName)
return false
default:
s.logger.Info("retryable error", "code", st.Code(), "resource", resource, "resourceName", resourceName)
return true
}
} else {
s.logger.Info("Got non grpc error", "error", err.Error(), "resource", resource, "resourceName", resourceName)
return true
}
} else {
return false
}

}
8 changes: 4 additions & 4 deletions operator/scheduler/experiment.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,26 +14,26 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
)

func (s *SchedulerClient) StartExperiment(ctx context.Context, experiment *v1alpha1.Experiment) error {
func (s *SchedulerClient) StartExperiment(ctx context.Context, experiment *v1alpha1.Experiment) (error, bool) {
logger := s.logger.WithName("StartExperiment")
grcpClient := scheduler.NewSchedulerClient(s.conn)
req := &scheduler.StartExperimentRequest{
Experiment: experiment.AsSchedulerExperimentRequest(),
}
logger.Info("Start", "experiment name", experiment.Name)
_, err := grcpClient.StartExperiment(ctx, req, grpc_retry.WithMax(2))
return err
return err, s.checkErrorRetryable(experiment.Kind, experiment.Name, err)
}

func (s *SchedulerClient) StopExperiment(ctx context.Context, experiment *v1alpha1.Experiment) error {
func (s *SchedulerClient) StopExperiment(ctx context.Context, experiment *v1alpha1.Experiment) (error, bool) {
logger := s.logger.WithName("StopExperiment")
grcpClient := scheduler.NewSchedulerClient(s.conn)
req := &scheduler.StopExperimentRequest{
Name: experiment.Name,
}
logger.Info("Stop", "experiment name", experiment.Name)
_, err := grcpClient.StopExperiment(ctx, req, grpc_retry.WithMax(2))
return err
return err, s.checkErrorRetryable(experiment.Kind, experiment.Name, err)
}

func (s *SchedulerClient) SubscribeExperimentEvents(ctx context.Context) error {
Expand Down
14 changes: 7 additions & 7 deletions operator/scheduler/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
)

func (s *SchedulerClient) LoadModel(ctx context.Context, model *v1alpha1.Model) error {
func (s *SchedulerClient) LoadModel(ctx context.Context, model *v1alpha1.Model) (error, bool) {
logger := s.logger.WithName("LoadModel")
grcpClient := scheduler.NewSchedulerClient(s.conn)

md, err := model.AsSchedulerModel()
if err != nil {
return err
return err, false
}
loadModelRequest := scheduler.LoadModelRequest{
Model: md,
Expand All @@ -34,12 +34,12 @@ func (s *SchedulerClient) LoadModel(ctx context.Context, model *v1alpha1.Model)
logger.Info("Load", "model name", model.Name)
_, err = grcpClient.LoadModel(ctx, &loadModelRequest, grpc_retry.WithMax(2))
if err != nil {
return err
return err, s.checkErrorRetryable(model.Kind, model.Name, err)
}
return nil
return nil, false
}

func (s *SchedulerClient) UnloadModel(ctx context.Context, model *v1alpha1.Model) error {
func (s *SchedulerClient) UnloadModel(ctx context.Context, model *v1alpha1.Model) (error, bool) {
logger := s.logger.WithName("UnloadModel")
grcpClient := scheduler.NewSchedulerClient(s.conn)

Expand All @@ -55,9 +55,9 @@ func (s *SchedulerClient) UnloadModel(ctx context.Context, model *v1alpha1.Model
logger.Info("Unload", "model name", model.Name)
_, err := grcpClient.UnloadModel(ctx, modelRef, grpc_retry.WithMax(2))
if err != nil {
return err
return err, s.checkErrorRetryable(model.Kind, model.Name, err)
}
return nil
return nil, false
}

func (s *SchedulerClient) SubscribeModelEvents(ctx context.Context) error {
Expand Down
12 changes: 6 additions & 6 deletions operator/scheduler/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,18 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
)

func (s *SchedulerClient) LoadPipeline(ctx context.Context, pipeline *v1alpha1.Pipeline) error {
func (s *SchedulerClient) LoadPipeline(ctx context.Context, pipeline *v1alpha1.Pipeline) (error, bool) {
logger := s.logger.WithName("LoadPipeline")
grcpClient := scheduler.NewSchedulerClient(s.conn)
req := scheduler.LoadPipelineRequest{
Pipeline: pipeline.AsSchedulerPipeline(),
}
logger.Info("Load", "pipeline name", pipeline.Name)
_, err := grcpClient.LoadPipeline(ctx, &req, grpc_retry.WithMax(2))
return err
return err, s.checkErrorRetryable(pipeline.Kind, pipeline.Name, err)
}

func (s *SchedulerClient) UnloadPipeline(ctx context.Context, pipeline *v1alpha1.Pipeline) error {
func (s *SchedulerClient) UnloadPipeline(ctx context.Context, pipeline *v1alpha1.Pipeline) (error, bool) {
logger := s.logger.WithName("UnloadPipeline")
grcpClient := scheduler.NewSchedulerClient(s.conn)
req := scheduler.UnloadPipelineRequest{
Expand All @@ -34,11 +34,11 @@ func (s *SchedulerClient) UnloadPipeline(ctx context.Context, pipeline *v1alpha1
logger.Info("Unload", "pipeline name", pipeline.Name)
_, err := grcpClient.UnloadPipeline(ctx, &req, grpc_retry.WithMax(2))
if err != nil {
return err
return err, s.checkErrorRetryable(pipeline.Kind, pipeline.Name, err)
}
pipeline.Status.CreateAndSetCondition(v1alpha1.PipelineReady, false, "Pipeline terminating")
err = s.updatePipelineStatusImpl(pipeline)
return err
_ = s.updatePipelineStatusImpl(pipeline)
return nil, false
}

func (s *SchedulerClient) SubscribePipelineEvents(ctx context.Context) error {
Expand Down
Loading

0 comments on commit 85b8c83

Please sign in to comment.