Skip to content

Commit

Permalink
fix(scheduler): Controller to update the list of servers to scheduler…
Browse files Browse the repository at this point in the history
… on reconnect (#5893)

* Make servernotify message a list

* update copyright

* Convert operator to use adjust ServerNotify api

* Adjust scheduler with ServerNotify changes

* remove dead code from grpc proxy

* Remove unused parameter

* add test for server notify

* make helper func deal with a list of servers

* remove extra condition for 0 servers

* add utility to send servers on reconnect to scheduler

* add handle servers on reconnects

* add test for handler

* lint fixes

* tidy up note

* add mock server

* add test scaffolding for server status

* add server subscribe test

* fix typo
  • Loading branch information
sakoush authored Sep 12, 2024
1 parent ffe28f2 commit 2d8a398
Show file tree
Hide file tree
Showing 19 changed files with 819 additions and 283 deletions.
542 changes: 304 additions & 238 deletions apis/go/mlops/scheduler/scheduler.pb.go

Large diffs are not rendered by default.

5 changes: 4 additions & 1 deletion apis/mlops/scheduler/scheduler.proto
Original file line number Diff line number Diff line change
Expand Up @@ -185,14 +185,17 @@ message ModelStatusRequest {
}

message ServerNotifyRequest {
repeated ServerNotify servers = 1;
}

message ServerNotify {
string name = 1;
int32 expectedReplicas = 2;
bool shared = 3;
optional KubernetesMeta kubernetesMeta = 4;
}

message ServerNotifyResponse {

}

message ServerSubscriptionRequest {
Expand Down
3 changes: 2 additions & 1 deletion operator/controllers/mlops/server_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

"github.com/seldonio/seldon-core/operator/v2/apis/mlops/v1alpha1"
mlopsv1alpha1 "github.com/seldonio/seldon-core/operator/v2/apis/mlops/v1alpha1"
"github.com/seldonio/seldon-core/operator/v2/controllers/reconcilers/common"
serverreconcile "github.com/seldonio/seldon-core/operator/v2/controllers/reconcilers/server"
Expand Down Expand Up @@ -89,7 +90,7 @@ func (r *ServerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
return reconcile.Result{}, nil
}

err := r.Scheduler.ServerNotify(ctx, server)
err := r.Scheduler.ServerNotify(ctx, nil, []v1alpha1.Server{*server})
if err != nil {
r.updateStatusFromError(ctx, logger, server, err)
return reconcile.Result{}, err
Expand Down
4 changes: 2 additions & 2 deletions operator/scheduler/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ func TestSubscribePipelineEvents(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
// note that responses_pipelines is nill -> scheduler state is not existing
// note that if responses_pipelines is nil -> scheduler state is not existing
var grpcClient mockSchedulerGrpcClient
if !test.noSchedulerState {
grpcClient = mockSchedulerGrpcClient{
Expand All @@ -388,7 +388,7 @@ func TestSubscribePipelineEvents(t *testing.T) {
}
}

// check that we have reloaded the correct resources if the stata of the scheduler is not correct
// check that we have reloaded the correct resources if the state of the scheduler is not correct
if test.noSchedulerState {
activeResources := 0
for idx, req := range test.existing_resources {
Expand Down
62 changes: 40 additions & 22 deletions operator/scheduler/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,40 +23,54 @@ import (
"github.com/seldonio/seldon-core/operator/v2/apis/mlops/v1alpha1"
)

func (s *SchedulerClient) ServerNotify(ctx context.Context, server *v1alpha1.Server) error {
func (s *SchedulerClient) ServerNotify(ctx context.Context, grpcClient scheduler.SchedulerClient, servers []v1alpha1.Server) error {
logger := s.logger.WithName("NotifyServer")
conn, err := s.getConnection(server.Namespace)
if err != nil {
return err
if len(servers) == 0 {
return nil
}
grpcClient := scheduler.NewSchedulerClient(conn)

var replicas int32
if !server.ObjectMeta.DeletionTimestamp.IsZero() {
replicas = 0
} else if server.Spec.Replicas != nil {
replicas = *server.Spec.Replicas
} else {
replicas = 1

if grpcClient == nil {
// we assume that all servers are in the same namespace
namespace := servers[0].Namespace
conn, err := s.getConnection(namespace)
if err != nil {
return err
}
grpcClient = scheduler.NewSchedulerClient(conn)
}

var requests []*scheduler.ServerNotify
for _, server := range servers {
var replicas int32
if !server.ObjectMeta.DeletionTimestamp.IsZero() {
replicas = 0
} else if server.Spec.Replicas != nil {
replicas = *server.Spec.Replicas
} else {
replicas = 1
}

logger.Info("Notify server", "name", server.GetName(), "namespace", server.GetNamespace(), "replicas", replicas)
requests = append(requests, &scheduler.ServerNotify{
Name: server.GetName(),
ExpectedReplicas: replicas,
KubernetesMeta: &scheduler.KubernetesMeta{
Namespace: server.GetNamespace(),
Generation: server.GetGeneration(),
},
})
}
request := &scheduler.ServerNotifyRequest{
Name: server.GetName(),
ExpectedReplicas: replicas,
KubernetesMeta: &scheduler.KubernetesMeta{
Namespace: server.GetNamespace(),
Generation: server.GetGeneration(),
},
Servers: requests,
}
logger.Info("Notify server", "name", server.GetName(), "namespace", server.GetNamespace(), "replicas", replicas)
_, err = grpcClient.ServerNotify(
_, err := grpcClient.ServerNotify(
ctx,
request,
grpc_retry.WithMax(SchedulerConnectMaxRetries),
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(SchedulerConnectBackoffScalar)),
)
if err != nil {
logger.Error(err, "Failed to send notify server to scheduler", "name", server.GetName(), "namespace", server.GetNamespace())
logger.Error(err, "Failed to send notify server to scheduler")
return err
}
return nil
Expand All @@ -75,6 +89,10 @@ func (s *SchedulerClient) SubscribeServerEvents(ctx context.Context, grpcClient
if err != nil {
return err
}

// on new reconnects we send a list of servers to the schedule
go handleRegisteredServers(ctx, namespace, s, grpcClient)

for {
event, err := stream.Recv()
if err != nil {
Expand Down
Loading

0 comments on commit 2d8a398

Please sign in to comment.