Skip to content

Commit

Permalink
Sherif akoush/upgrade mlserver 1.1 (#294)
Browse files Browse the repository at this point in the history
* move mlserver to grpc control plane

* move to mlserver 1.1

* tidy up unload all models logic

* add test

* only unload models that are READY or LOADING

* fix test

* k8s yaml files
  • Loading branch information
sakoush authored Jun 16, 2022
1 parent 929ca75 commit 9dde04c
Show file tree
Hide file tree
Showing 13 changed files with 70 additions and 29 deletions.
2 changes: 1 addition & 1 deletion k8s/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ PIPELINEGATEWAY_IMG ?= ${DOCKERHUB_USERNAME}/seldon-pipelinegateway:${CUSTOM_IMA
DATAFLOW_IMG ?= ${DOCKERHUB_USERNAME}/seldon-dataflow-engine:${CUSTOM_IMAGE_TAG}
ENVOY_IMG ?= ${DOCKERHUB_USERNAME}/seldon-envoy:${CUSTOM_IMAGE_TAG}

MLSERVER_IMG ?= seldonio/mlserver:1.0.1
MLSERVER_IMG ?= seldonio/mlserver:1.1.0
TRITON_IMG ?= nvcr.io/nvidia/tritonserver:22.05-py3

.PHONY: create
Expand Down
2 changes: 1 addition & 1 deletion k8s/helm-charts/seldon-core-v2-setup/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ serverConfig:
pullPolicy: IfNotPresent
registry: docker.io
repository: seldonio/mlserver
tag: 1.0.1
tag: 1.1.0
serverCapabilities: "mlserver,alibi-detect,lightgbm,mlflow,python,sklearn,spark-mlib,xgboost"
overcommitPercentage: "10"
modelVolumeStorage: 1Gi
Expand Down
2 changes: 1 addition & 1 deletion k8s/yaml/seldon-v2-components.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -993,7 +993,7 @@ spec:
value: "1"
- name: MLSERVER_LOAD_MODELS_AT_STARTUP
value: "false"
image: seldonio/mlserver:1.0.1
image: seldonio/mlserver:1.1.0
imagePullPolicy: IfNotPresent
lifecycle:
preStop:
Expand Down
2 changes: 1 addition & 1 deletion operator/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ DOCKERHUB_USERNAME ?= seldonio
IMG ?= ${DOCKERHUB_USERNAME}/seldonv2-controller:${CUSTOM_IMAGE_TAG}
AGENT_IMG ?= ${DOCKERHUB_USERNAME}/seldon-agent:${CUSTOM_IMAGE_TAG}
RCLONE_IMG ?= ${DOCKERHUB_USERNAME}/seldon-rclone:${CUSTOM_IMAGE_TAG}
MLSERVER_IMG ?= seldonio/mlserver:1.0.0.rc1
MLSERVER_IMG ?= seldonio/mlserver:1.1.0
TRITON_IMG ?= nvcr.io/nvidia/tritonserver:21.12-py3
# ENVTEST_K8S_VERSION refers to the version of kubebuilder assets to be downloaded by envtest binary.
ENVTEST_K8S_VERSION = 1.22
Expand Down
2 changes: 1 addition & 1 deletion operator/config/serverconfigs/kustomization.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ images:
newTag: latest
- name: mlserver
newName: seldonio/mlserver
newTag: 1.0.1
newTag: 1.1.0
- name: rclone
newName: seldonio/seldon-rclone
newTag: latest
Expand Down
2 changes: 1 addition & 1 deletion scheduler/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ DATAFLOW_IMG ?= ${DOCKERHUB_USERNAME}/seldon-dataflow-engine:${CUSTOM_IMAGE_TAG}
ENVOY_IMG ?= ${DOCKERHUB_USERNAME}/seldon-envoy:${CUSTOM_IMAGE_TAG}
# Grafana image only used for Docker compose not k8s
GRAFANA_IMG ?= ${DOCKERHUB_USERNAME}/seldon-grafana:${CUSTOM_IMAGE_TAG}
MLSERVER_IMG ?= seldonio/mlserver:1.1.0.dev3
MLSERVER_IMG ?= seldonio/mlserver:1.1.0
TRITON_IMG ?= nvcr.io/nvidia/tritonserver:22.05-py3
KIND_NAME=ansible

Expand Down
8 changes: 1 addition & 7 deletions scheduler/cmd/agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,13 +160,7 @@ func main() {
)

// Create V2 Protocol Handler
// TODO: when issue https://github.com/SeldonIO/MLServer/issues/600 is done, let mlserver use grpc as well
var v2Client *agent.V2Client
if cli.ServerName == "triton" {
v2Client = agent.NewV2Client(cli.InferenceHost, cli.InferenceGrpcPort, logger, true)
} else {
v2Client = agent.NewV2Client(cli.InferenceHost, cli.InferenceHttpPort, logger, false)
}
v2Client := agent.NewV2Client(cli.InferenceHost, cli.InferenceGrpcPort, logger, true)

promMetrics, err := metrics.NewPrometheusMetrics(cli.ServerName, cli.ReplicaIdx, cli.Namespace, logger)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion scheduler/env.all
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ ENVOY_IMAGE_AND_TAG=seldonio/seldon-envoy:latest
RCLONE_IMAGE_AND_TAG=seldonio/seldon-rclone:latest
MODELGATEWAY_IMAGE_AND_TAG=seldonio/seldon-modelgateway:latest
PIPELINEGATEWAY_IMAGE_AND_TAG=seldonio/seldon-pipelinegateway:latest
SERVER_MLSERVER_IMAGE_AND_TAG=seldonio/mlserver:1.0.1
SERVER_MLSERVER_IMAGE_AND_TAG=seldonio/mlserver:1.1.0
SERVER_TRITON_IMAGE_AND_TAG=nvcr.io/nvidia/tritonserver:22.05-py3
SCHEDULER_IMAGE_AND_TAG=seldonio/seldon-scheduler:latest
KAFKA_IMAGE_AND_TAG=quay.io/strimzi/kafka:0.28.0-kafka-3.1.0
Expand Down
18 changes: 12 additions & 6 deletions scheduler/pkg/agent/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,14 +220,20 @@ func (c *Client) UnloadAllModels() error {
return err
}
for _, model := range models {
logger.Infof("Unloading existing model %s", model)
v2Err := c.stateManager.v2Client.UnloadModel(model)
if v2Err != nil {
return v2Err.err
if model.State == MLServerModelState_READY || model.State == MLServerModelState_LOADING {
logger.Infof("Unloading existing model %s", model)
v2Err := c.stateManager.v2Client.UnloadModel(model.Name)
if v2Err != nil {
if !v2Err.IsNotFound() {
return v2Err.err
} else {
c.logger.Warnf("Model %s not found on server", model)
}
}
}
err := c.ModelRepository.RemoveModelVersion(model)
err := c.ModelRepository.RemoveModelVersion(model.Name)
if err != nil {
return err
c.logger.WithError(err).Errorf("Model %s could not be removed from repository", model)
}
}
return nil
Expand Down
13 changes: 13 additions & 0 deletions scheduler/pkg/agent/rproxy_grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ const (
type mockGRPCMLServer struct {
listener net.Listener
server *grpc.Server
models []MLServerModelInfo
v2.UnimplementedGRPCInferenceServiceServer
}

Expand Down Expand Up @@ -80,6 +81,18 @@ func (mlserver *mockGRPCMLServer) RepositoryModelUnload(ctx context.Context, r *
return &v2.RepositoryModelUnloadResponse{}, nil
}

func (mlserver *mockGRPCMLServer) RepositoryIndex(ctx context.Context, r *v2.RepositoryIndexRequest) (*v2.RepositoryIndexResponse, error) {
ret := make([]*v2.RepositoryIndexResponse_ModelIndex, len(mlserver.models))
for idx, model := range mlserver.models {
ret[idx] = &v2.RepositoryIndexResponse_ModelIndex{Name: model.Name, State: string(model.State)}
}
return &v2.RepositoryIndexResponse{Models: ret}, nil
}

func (mlserver *mockGRPCMLServer) setModels(models []MLServerModelInfo) {
mlserver.models = models
}

func setupReverseGRPCService(numModels int, modelPrefix string, backEndGRPCPort, rpPort, backEndServerPort int) *reverseGRPCProxy {
logger := log.New()
log.SetLevel(log.DebugLevel)
Expand Down
10 changes: 6 additions & 4 deletions scheduler/pkg/agent/rproxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,12 @@ func TestLazyLoadRoundTripper(t *testing.T) {
_ = mlserver.ListenAndServe()
}()

time.Sleep(100 * time.Millisecond)

defer func() {
_ = mlserver.Shutdown(context.Background())
}()

basePath := "http://localhost:" + strconv.Itoa(serverPort)

loader := func(model string) *V2Err {
Expand All @@ -382,10 +388,6 @@ func TestLazyLoadRoundTripper(t *testing.T) {
resp, err := httpClient.Do(req)
g.Expect(err).To(BeNil())
g.Expect(resp.StatusCode).To(Equal(http.StatusOK))

defer func() {
_ = mlserver.Shutdown(context.Background())
}()
})
}
}
31 changes: 26 additions & 5 deletions scheduler/pkg/agent/v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,16 @@ import (
"google.golang.org/grpc/status"
)

type MLServerModelState string

const (
MLServerModelState_UNKNOWN MLServerModelState = "UNKNOWN"
MLServerModelState_READY MLServerModelState = "READY"
MLServerModelState_UNAVAILABLE MLServerModelState = "UNAVAILABLE"
MLServerModelState_LOADING MLServerModelState = "LOADING"
MLServerModelState_UNLOADING MLServerModelState = "UNLOADING"
)

const (
// we define all communication error into one bucket
// TODO: separate out the different comm issues (e.g. DNS vs Connection refused etc.)
Expand All @@ -27,6 +37,11 @@ const (
V2RequestErrCode = -200
)

type MLServerModelInfo struct {
Name string
State MLServerModelState
}

type V2Client struct {
host string
httpPort int
Expand Down Expand Up @@ -285,17 +300,17 @@ func (v *V2Client) readyGrpc() error {
return err
}

func (v *V2Client) GetModels() ([]string, error) {
func (v *V2Client) GetModels() ([]MLServerModelInfo, error) {
if v.isGrpc {
return v.getModelsGrpc()
} else {
v.logger.Warnf("Http GetModels not available returning empty list")
return []string{}, nil
return []MLServerModelInfo{}, nil
}
}

func (v *V2Client) getModelsGrpc() ([]string, error) {
var models []string
func (v *V2Client) getModelsGrpc() ([]MLServerModelInfo, error) {
var models []MLServerModelInfo
ctx := context.Background()
req := &v2.RepositoryIndexRequest{}

Expand All @@ -304,7 +319,13 @@ func (v *V2Client) getModelsGrpc() ([]string, error) {
return nil, err
}
for _, modelRes := range res.Models {
models = append(models, modelRes.Name)
if modelRes.Name == "" {
// nothing to do for empty model
// TODO: why mlserver returns back empty string model?
continue
}
models = append(
models, MLServerModelInfo{Name: modelRes.Name, State: MLServerModelState(modelRes.State)})
}
return models, nil
}
5 changes: 5 additions & 0 deletions scheduler/pkg/agent/v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,11 @@ func TestGrpcV2(t *testing.T) {
v2Err = v2Client.UnloadModel(modelNameMissing)
g.Expect(v2Err.IsNotFound()).To(BeTrue())

mockMLServer.setModels([]MLServerModelInfo{{dummModel, MLServerModelState_READY}, {"", MLServerModelState_UNAVAILABLE}})
models, err := v2Client.GetModels()
g.Expect(err).To(BeNil())
g.Expect(models).To(Equal([]MLServerModelInfo{{dummModel, MLServerModelState_READY}})) // empty string models should be discarded

err = v2Client.Ready()
g.Expect(err).To(BeNil())

Expand Down

0 comments on commit 9dde04c

Please sign in to comment.