Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Override ModelDeploymentSpec of registered models #526

Merged
merged 2 commits into from
Jan 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions examples/tutorials/04-serving-multiple-models/serve.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,13 @@ images:
models:
TinyLlama/TinyLlama-1.1B-Chat-v1.0:
runtime_env: custom-gpu
# TOFIX (spillai): Currently, pre-defined models cannot
# be deployed with overridden resource specification.
deployment:
resources:
device: auto
device_memory: 3Gi

distil-whisper/distil-small.en:
runtime_env: custom-gpu
# TOFIX (spillai): Currently, pre-defined models cannot
# be deployed with overridden resource specification.
deployment:
resources:
device: auto
Expand Down
1 change: 1 addition & 0 deletions nos/common/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
FunctionSignature,
ModelDeploymentSpec,
ModelResources,
ModelServiceSpec,
ModelSpec,
ModelSpecMetadata,
ModelSpecMetadataCatalog,
Expand Down
14 changes: 13 additions & 1 deletion nos/common/spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -738,5 +738,17 @@ class ModelDeploymentSpec:

num_replicas: int = 1
"""Number of replicas."""
resources: ModelResources = field(default_factory=ModelResources)
resources: ModelResources = None
"""Model resources."""


@dataclass
class ModelServiceSpec:
"""Model service that captures the model, deployment and service specifications."""

model: ModelSpec
"""Model specification."""
deployment: ModelDeploymentSpec
"""Model deployment specification."""
service: Any = None
"""Model service."""
12 changes: 1 addition & 11 deletions nos/hub/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
FunctionSignature,
ModelDeploymentSpec,
ModelResources,
ModelServiceSpec,
ModelSpec,
ModelSpecMetadata,
ModelSpecMetadataCatalog,
Expand Down Expand Up @@ -220,17 +221,6 @@ def register_from_yaml(cls, filename: str) -> List[Any]:
List[ModelSpec]: List of model specifications.
"""

@dataclass
class ModelServiceSpec:
"""Model service that captures spec, deployment and service."""

model: ModelSpec
"""Model specification."""
deployment: ModelDeploymentSpec
"""Model deployment specification."""
service: Any = None
"""Model service."""

@dataclass
class _ModelImportConfig:
"""Model import configuration."""
Expand Down
106 changes: 56 additions & 50 deletions nos/managers/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from collections import OrderedDict
from dataclasses import asdict, dataclass, field
from enum import Enum
from functools import cached_property
from pathlib import Path
from typing import Any, Dict, Iterable, List, Union

Expand All @@ -15,7 +14,7 @@
from ray.runtime_env import RuntimeEnv
from ray.util.queue import Queue

from nos.common import ModelResources, ModelSpec, ModelSpecMetadataCatalog
from nos.common import ModelDeploymentSpec, ModelResources, ModelSpec, ModelSpecMetadataCatalog
from nos.logging import logger
from nos.managers.pool import ActorPool

Expand Down Expand Up @@ -125,16 +124,19 @@ class ModelHandle:

spec: ModelSpec
"""Model specification."""
num_replicas: Union[int, str] = field(default=1)
deployment: ModelDeploymentSpec = field(default_factory=ModelDeploymentSpec)
"""Number of replicas."""
_actors: List[Union[ray.remote, ray.actor.ActorHandle]] = field(init=False, default=None)
"""Ray actor handle."""
_actor_pool: ActorPool = field(init=False, default=None)
"""Ray actor pool."""
_actor_options: Dict[str, Any] = field(init=False, default=None)
"""Ray actor options."""

def __post_init__(self):
"""Initialize the actor handles."""
self._actors = [self._get_actor() for _ in range(self.num_replicas)]
self._actor_options = self._get_actor_options(self.spec, self.deployment)
self._actors = [self._get_actor() for _ in range(self.deployment.num_replicas)]
self._actor_pool = ActorPool(self._actors)

# Patch the model handle with methods from the model spec signature
Expand All @@ -153,30 +155,35 @@ def __post_init__(self):

def __repr__(self) -> str:
assert len(self._actors) == self.num_replicas
opts = self.actor_options
opts_str = ", ".join([f"{k}={v}" for k, v in opts.items()])
opts_str = ", ".join([f"{k}={v}" for k, v in self._actor_options.items()])
return f"ModelHandle(name={self.spec.name}, replicas={len(self._actors)}, opts=({opts_str}))"

@cached_property
def actor_options(self):
"""Get actor options from model specification."""
return self._get_actor_options(self.spec)
@property
def num_replicas(self) -> int:
"""Get the number of replicas."""
return self.deployment.num_replicas

@classmethod
def _get_actor_options(cls, spec: ModelSpec) -> Dict[str, Any]:
def _get_actor_options(cls, spec: ModelSpec, deployment: ModelDeploymentSpec) -> Dict[str, Any]:
"""Get actor options from model specification."""
# TOFIX (spillai): When considering CPU-only models with num_cpus specified,
# OMP_NUM_THREADS will be set to the number of CPUs requested. Otherwise,
# if num_cpus is not specified, OMP_NUM_THREADS will default to 1.
# Instead, for now, we manually set the environment variable in `InferenceServiceRuntime`
# to the number of CPUs threads available.
# Get the model resources from the catalog
try:
catalog = ModelSpecMetadataCatalog.get()
resources: ModelResources = catalog._resources_catalog[f"{spec.id}/{spec.default_method}"]
except Exception:
resources = ModelResources()
logger.debug(f"Failed to get model resources [model={spec.id}, method={spec.default_method}]")

# If deployment resources are not specified, get the model resources from the catalog
if deployment.resources is None:
try:
catalog = ModelSpecMetadataCatalog.get()
resources: ModelResources = catalog._resources_catalog[f"{spec.id}/{spec.default_method}"]
except Exception:
resources = ModelResources()
logger.debug(f"Failed to get model resources [model={spec.id}, method={spec.default_method}]")

# Otherwise, use the deployment resources provided
else:
resources = deployment.resources

# For GPU models, we need to set the number of fractional GPUs to use
if (resources.device == "auto" or resources.device == "gpu") and torch.cuda.is_available():
Expand Down Expand Up @@ -250,7 +257,7 @@ def _get_actor(self) -> Union[ray.remote, ray.actor.ActorHandle]:
model_cls = self.spec.default_signature.func_or_cls

# Get the actor options from the model spec
actor_options = self.actor_options
actor_options = self._actor_options
actor_cls = ray.remote(**actor_options)(model_cls)

# Check if the model class has the required method
Expand Down Expand Up @@ -307,50 +314,50 @@ def __call__(self, *args: Any, **kwargs: Any) -> Any:
)
return _StreamingModelHandleResponse(response_refs)

def scale(self, replicas: Union[int, str] = 1) -> "ModelHandle":
def scale(self, num_replicas: Union[int, str] = 1) -> "ModelHandle":
"""Scale the model handle to a new number of replicas.

Args:
replicas (int or str): Number of replicas, or set to "auto" to
num_replicas (int or str): Number of replicas, or set to "auto" to
automatically scale the model to the number of GPUs available.
"""
if isinstance(replicas, str) and replicas == "auto":
if isinstance(num_replicas, str) and num_replicas == "auto":
raise NotImplementedError("Automatic scaling not implemented.")
if not isinstance(replicas, int):
raise ValueError(f"Invalid replicas: {replicas}")
if not isinstance(num_replicas, int):
raise ValueError(f"Invalid replicas: {num_replicas}")

# Check if there are any pending futures
if self._actor_pool.has_next():
logger.warning(f"Pending futures detected, this may result in dropped queue items [name={self.spec.name}]")
logger.debug(f"Waiting for pending futures to complete before scaling [name={self.spec.name}].")
logger.debug(f"Scaling model [name={self.spec.name}].")

if replicas == len(self._actors):
logger.debug(f"Model already scaled appropriately [name={self.spec.name}, replicas={replicas}].")
if num_replicas == len(self._actors):
logger.debug(f"Model already scaled appropriately [name={self.spec.name}, replicas={num_replicas}].")
return self
elif replicas > len(self._actors):
self._actors += [self._get_actor() for _ in range(replicas - len(self._actors))]
logger.debug(f"Scaling up model [name={self.spec.name}, replicas={replicas}].")
elif num_replicas > len(self._actors):
self._actors += [self._get_actor() for _ in range(num_replicas - len(self._actors))]
logger.debug(f"Scaling up model [name={self.spec.name}, replicas={num_replicas}].")
else:
actors_to_remove = self._actors[replicas:]
actors_to_remove = self._actors[num_replicas:]
for actor in actors_to_remove:
ray.kill(actor)
self._actors = self._actors[:replicas]
self._actors = self._actors[:num_replicas]

logger.debug(f"Scaling down model [name={self.spec.name}, replicas={replicas}].")
logger.debug(f"Scaling down model [name={self.spec.name}, replicas={num_replicas}].")

# Update repicas and queue size
self.num_replicas = replicas
self.num_replicas = num_replicas

# Re-create the actor pool
logger.debug(f"Removing actor pool [replicas={len(self._actors)}].")
del self._actor_pool
self._actor_pool = None

# Re-create the actor pool
logger.debug(f"Re-creating actor pool [name={self.spec.name}, replicas={replicas}].")
logger.debug(f"Re-creating actor pool [name={self.spec.name}, replicas={num_replicas}].")
self._actor_pool = ActorPool(self._actors)
assert len(self._actors) == replicas, "Model scaling failed."
assert len(self._actors) == num_replicas, "Model scaling failed."
gc.collect()
return self

Expand Down Expand Up @@ -466,49 +473,48 @@ def __contains__(self, spec: ModelSpec) -> bool:
"""
return spec.id in self.handlers

def load(self, model_spec: ModelSpec, num_replicas: int = None) -> ModelHandle:
def load(self, spec: ModelSpec, deployment: ModelDeploymentSpec = ModelDeploymentSpec()) -> ModelHandle:
"""Load a model handle from the manager using the model specification.

Create a new model handle if it does not exist,
else return an existing handle.

Args:
model_spec (ModelSpec): Model specification.
num_replicas (int): Number of replicas.
spec (ModelSpec): Model specification.
deployment (ModelDeploymentSpec): Model deployment specification.
Returns:
ModelHandle: Model handle.
"""
model_id: str = model_spec.id
model_id: str = spec.id
if model_id not in self.handlers:
num_replicas = num_replicas or 1
return self.add(model_spec, num_replicas=num_replicas)
return self.add(spec, deployment)
else:
# Only scale the model if the number of replicas is specified,
# otherwise treat it as a get without modifying the number of replicas.
if num_replicas is not None and num_replicas != self.handlers[model_id].num_replicas:
self.handlers[model_id].scale(num_replicas)
if deployment.num_replicas is not None and deployment.num_replicas != self.handlers[model_id].num_replicas:
self.handlers[model_id].scale(deployment.num_replicas)
return self.handlers[model_id]

def get(self, model_spec: ModelSpec) -> ModelHandle:
def get(self, spec: ModelSpec) -> ModelHandle:
"""Get a model handle from the manager using the model identifier.

Args:
model_spec (ModelSpec): Model specification.
spec (ModelSpec): Model specification.
Returns:
ModelHandle: Model handle.
"""
model_id: str = model_spec.id
model_id: str = spec.id
if model_id not in self.handlers:
return self.add(model_spec, num_replicas=1)
return self.add(spec, ModelDeploymentSpec(num_replicas=1))
else:
return self.handlers[model_id]

def add(self, spec: ModelSpec, num_replicas: int = 1) -> ModelHandle:
def add(self, spec: ModelSpec, deployment: ModelDeploymentSpec = ModelDeploymentSpec()) -> ModelHandle:
"""Add a model to the manager.

Args:
spec (ModelSpec): Model specification.
num_replicas (int): Number of replicas.
deployment (ModelDeploymentSpec): Model deployment specification.
Raises:
ValueError: If the model already exists.
Returns:
Expand All @@ -525,7 +531,7 @@ def add(self, spec: ModelSpec, num_replicas: int = 1) -> ModelHandle:

# Create the serve deployment from the model handle
# Note: Currently one model per (model-name, task) is supported.
self.handlers[model_id] = ModelHandle(spec, num_replicas=num_replicas)
self.handlers[model_id] = ModelHandle(spec, deployment)
logger.debug(f"Added model [{self.handlers[model_id]}]")
logger.debug(self)
return self.handlers[model_id]
Expand Down
24 changes: 16 additions & 8 deletions nos/server/_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,15 @@
from google.protobuf import empty_pb2, wrappers_pb2

from nos import hub
from nos.common import FunctionSignature, ModelSpec, ModelSpecMetadataCatalog, dumps, loads
from nos.common import (
FunctionSignature,
ModelDeploymentSpec,
ModelServiceSpec,
ModelSpec,
ModelSpecMetadataCatalog,
dumps,
loads,
)
from nos.common.shm import NOS_SHM_ENABLED, SharedMemoryDataDict, SharedMemoryTransportManager
from nos.constants import ( # noqa F401
DEFAULT_GRPC_ADDRESS,
Expand Down Expand Up @@ -81,18 +89,18 @@ def __init__(self):
else:
self.shm_manager = None

def load_model_spec(self, model_spec: str, num_replicas: int = 1) -> ModelHandle:
def load_model_spec(self, spec: ModelSpec, deployment: ModelDeploymentSpec) -> ModelHandle:
"""Load the model by spec."""
return self.model_manager.load(model_spec, num_replicas=num_replicas)
return self.model_manager.load(spec, deployment)

def load_model(self, model_name: str, num_replicas: int = 1) -> ModelHandle:
"""Load the model."""
"""Load the model by model name."""
# Load the model spec (with caching to avoid repeated loading)
try:
model_spec: ModelSpec = load_spec(model_name)
spec: ModelSpec = load_spec(model_name)
except Exception as e:
raise ModelNotFoundError(f"Failed to load model spec [model_name={model_name}, e={e}]")
return self.load_model_spec(model_spec, num_replicas=num_replicas)
return self.load_model_spec(spec, ModelDeploymentSpec(num_replicas=num_replicas))

async def execute_model(
self, model_name: str, method: str = None, inputs: Dict[str, Any] = None, stream: bool = False
Expand Down Expand Up @@ -198,10 +206,10 @@ def __init__(self, catalog_filename: str = None):
raise ValueError(f"Model catalog not found [catalog={catalog_filename}]")

# Register models from the catalog
services: List[Any] = hub.register_from_yaml(catalog_filename)
services: List[ModelServiceSpec] = hub.register_from_yaml(catalog_filename)
for svc in services:
logger.debug(f"Servicing model [svc={svc}, replicas={svc.deployment.num_replicas}]")
self.load_model_spec(svc.model, num_replicas=svc.deployment.num_replicas)
self.load_model_spec(svc.model, svc.deployment)
logger.debug(f"Deployed model [svc={svc}]. \n{self.model_manager}")

def Ping(self, _: empty_pb2.Empty, context: grpc.ServicerContext) -> nos_service_pb2.PingResponse:
Expand Down
4 changes: 2 additions & 2 deletions tests/managers/test_model_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from loguru import logger

from nos import hub
from nos.common import ModelSpec, RuntimeEnv, TaskType
from nos.common import ModelDeploymentSpec, ModelSpec, RuntimeEnv, TaskType
from nos.managers import ModelHandle, ModelManager
from nos.managers.model import NOS_MAX_CONCURRENT_MODELS
from nos.test.conftest import model_manager as manager # noqa: F401, F811
Expand Down Expand Up @@ -115,7 +115,7 @@ def test_model_manager_errors(manager): # noqa: F811
manager.add(spec)

# Creating a model with num_replicas > 1
ModelHandle(spec, num_replicas=2)
ModelHandle(spec, ModelDeploymentSpec(num_replicas=2))

# Creating a model with an invalid eviction policy should raise a `NotImplementedError`.
with pytest.raises(NotImplementedError):
Expand Down
Loading