Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ray AWS Cluster Config Generator #47

Merged
merged 9 commits into from
Mar 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflow_scripts/test_cluster.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ set -ex

source $(dirname "$0")/env_setup.sh

# install_cloud_test
install_cloud_test

# python3 -m pytest -n 2 --junitxml=results.xml tests/unittests/cluster/
python3 -m pytest --junitxml=results.xml tests/unittests/cluster/
3 changes: 3 additions & 0 deletions src/autogluon/cloud/cluster/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .cluster_config_generator import ClusterConfigGenerator
from .ray_aws_cluster_config_generator import RayAWSClusterConfigGenerator
from .ray_cluster_config_generator import RayClusterConfigGenerator
60 changes: 53 additions & 7 deletions src/autogluon/cloud/cluster/cluster_config_generator.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,65 @@
import os
from abc import ABC, abstractmethod
from typing import Any, Dict, Union
from typing import Any, Dict, Optional, Union

import yaml

DEFAULT_CONFIG_LOCATION = os.path.join(os.path.dirname(__file__), "..", "default_cluster_configs")


class ClusterConfigGenerator(ABC):
@abstractmethod
@staticmethod
def get_default_config() -> Dict[str, Any]:
default_config_file = os.path.join(DEFAULT_CONFIG_LOCATION, "DEFAULT_CONFIG")

def __init__(self, config: Optional[Union[str, Dict[str, Any]]] = None, **kwargs) -> None:
"""
Parameter
---------
config, Optional[Union[str, Dict[str, Any]]]
Config to be used to launch up the cluster. Default: None
If not set, will use the default config pre-defined.
If str, must be a path pointing to a yaml file containing the config.
"""
if config is None:
config = self.default_config_file
if not isinstance(config, dict):
with open(config, "r") as yaml_file:
config = yaml.safe_load(yaml_file)
self.config = config
self._default_config = self.get_default_config()

@classmethod
def get_default_config(cls) -> Dict[str, Any]:
"""
Get default config of the cluster
"""
raise NotImplementedError
with open(cls.default_config_file, "r") as yaml_file:
config = yaml.safe_load(yaml_file)
return config

@abstractmethod
def update_config(new_config: Union[Dict[str, Any], str]) -> Dict[str, Any]:
def update_config(self, new_config: Union[Dict[str, Any], str] = None, **kwargs) -> Dict[str, Any]:
"""
Update current config with given one. Settings in new_config will overwrite the old one.

Parameters
----------
new_config, Optional[Union[Dict[str, Any], str]]
The new config to overwrite the current config.
If None, specific keys that needs to be updated must be provided.
If both new_config and specific arguments to update the config are provided,
will apply new_config first then override it with each specific aruguments
"""
if new_config is not None:
if isinstance(new_config, str):
with open(new_config, "r") as yaml_file:
new_config = yaml.safe_load(yaml_file)
self.config.update(new_config)
self._update_config(**kwargs)

return self.config

@abstractmethod
def _update_config(self, **kwargs) -> None:
"""
Specific implementations of different cluster config solution
"""
raise NotImplementedError
12 changes: 12 additions & 0 deletions src/autogluon/cloud/cluster/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
AVAILABLE_NODE_TYPES = "available_node_types"
NODE_CONFIG = "node_config"
INSTANCE_TYPE = "InstanceType"
MAX_WORKERS = "max_workers"
MIN_WORKERS = "min_workers"
BLOCK_DEVICE_MAPPINGS = "BlockDeviceMappings"
EBS = "Ebs"
VOLUME_SIZE = "VolumeSize"
DOCKER = "docker"
IMAGE = "image"
PROVIDER = "provider"
REGION = "region"
146 changes: 146 additions & 0 deletions src/autogluon/cloud/cluster/ray_aws_cluster_config_generator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
import os
from typing import Any, Dict, Optional, Union

from .cluster_config_generator import DEFAULT_CONFIG_LOCATION
from .constants import (
AVAILABLE_NODE_TYPES,
BLOCK_DEVICE_MAPPINGS,
DOCKER,
EBS,
IMAGE,
INSTANCE_TYPE,
MAX_WORKERS,
MIN_WORKERS,
NODE_CONFIG,
PROVIDER,
REGION,
VOLUME_SIZE,
)
from .ray_cluster_config_generator import RayClusterConfigGenerator


class RayAWSClusterConfigGenerator(RayClusterConfigGenerator):
default_config_file = os.path.join(DEFAULT_CONFIG_LOCATION, "ray_aws_default_cluster_config.yaml")

def __init__(
self, config: Optional[Union[str, Dict[str, Any]]] = None, region: Optional[str] = "us-east-1", **kwargs
) -> None:
"""
Parameter
---------
config, Optional[Union[str, Dict[str, Any]]]
Config to be used to launch up the cluster. Default: None
If not set, will use the default config pre-defined.
If str, must be a path pointing to a yaml file containing the config.
region, Optional[str]
Region to launch the cluster. Default us-east-1
"""
super().__init__(config=config)
self._update_region(region)

def _update_config(
self,
instance_type: Optional[str] = None,
instance_count: Optional[int] = None,
worker_node_name: Optional[str] = "worker",
volumes_size: Optional[int] = None,
custom_image_uri: Optional[str] = None,
**kwargs,
) -> Dict[str, Any]:
"""
Update current config with given parameters.

Parameters
----------
instance_type: str, default = None
Instance type the cluster will launch.
If provided, will overwrite `available_node_types.node_config.InstanceType`
To learn more,
https://docs.ray.io/en/latest/cluster/vms/references/ray-cluster-configuration.html#node-config
instance_count: int, default = None
Number of instance the cluster will launch.
If provided, will overwrite `available_node_types.min_workers` and `max_workers`
min_workers and max_workers will both equal to `instance_count` - 1 because there will be a head node.
This setting doesn't work when there's more than one definition of worker nodes.
To learn more,
https://docs.ray.io/en/latest/cluster/vms/references/ray-cluster-configuration.html#available-node-types-node-type-name-node-type-min-workers
https://docs.ray.io/en/latest/cluster/vms/references/ray-cluster-configuration.html#max-workers
https://docs.ray.io/en/latest/cluster/vms/references/ray-cluster-configuration.html#cluster-configuration-max-workers
worker_node_name: str, default = worker
Name of the worker node inside the yaml file. This is used to correctly configure the instance count if specified.
volumes_size: int, default = None
Size in GB of the EBS volume to use for each of the node.
If provided, will overwrite `available_node_types.node_config.BlockDeviceMappings.Ebs.VolmueSize`
To learn more,
https://docs.ray.io/en/latest/cluster/vms/references/ray-cluster-configuration.html#available-node-types
custom_image_uri: str, default = None
Custom image to be used by the cluster container. The image MUST have Ray and AG installed.
If provided, will overwrite `docker.image`
To learn more,
https://docs.ray.io/en/latest/cluster/vms/references/ray-cluster-configuration.html#docker-image
"""
self._update_instance_type(instance_type=instance_type)
self._update_instance_count(instance_count=instance_count, worker_node_name=worker_node_name)
self._update_volume_size(volumes_size=volumes_size)
self._update_custom_image(custom_image_uri=custom_image_uri)

def _set_available_node_types(self):
"""Set available node types to be default ones if user didn't provide any"""
default_config = self._default_config
available_node_types: Dict[str, Any] = self.config.get(AVAILABLE_NODE_TYPES, None)
if available_node_types is None:
available_node_types = default_config[AVAILABLE_NODE_TYPES]
self.config.update(available_node_types)

def _set_provider(self):
"""Set provider to be default ones if user didn't provide any"""
default_config = self._default_config
provider: Dict[str, Any] = self.config.get(PROVIDER, None)
if provider is None:
provider = default_config[PROVIDER]
self.config.update({PROVIDER: provider})
Comment on lines +95 to +101

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we templatize repeating methods: I see lots of repeating code which can be reuse between the _update_xxx methods.

Copy link
Author

@yinweisu yinweisu Mar 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is hard to templatize these methods because they are essentially trying to update keys and values lying in different layers of the yaml file. The method looks similar, but because the structure of the yaml file differs for each key, it's actually specific


def _update_region(self, region):
self._set_provider()
self.config[PROVIDER].update({REGION: region})

def _update_instance_type(self, instance_type):
if instance_type is not None:
self._set_available_node_types()
for node in self.config[AVAILABLE_NODE_TYPES]:
node_config: Dict[str, Any] = self.config[node].get(NODE_CONFIG, None)
assert (
node_config is not None
), f"Detected node definition for {node} but there's no node_config specified. Please provide one."
self.config[AVAILABLE_NODE_TYPES][node][NODE_CONFIG].update({INSTANCE_TYPE: instance_type})

def _update_instance_count(self, instance_count, worker_node_name):
if instance_count is not None:
worker_instance_count = instance_count - 1
assert worker_instance_count >= 0
self.config[MAX_WORKERS] = worker_instance_count
self._set_available_node_types()
assert (
worker_node_name in self.config[AVAILABLE_NODE_TYPES]
), f"Didn't find node definition for {worker_node_name}. Please make sure you provided the correct `worker_node_name`"
self.config[AVAILABLE_NODE_TYPES][worker_node_name].update({MIN_WORKERS: worker_instance_count})

def _update_volume_size(self, volumes_size):
if volumes_size is not None:
self._set_available_node_types()
for node in self.config[AVAILABLE_NODE_TYPES]:
node_config: Dict[str, Any] = self.config[node].get(NODE_CONFIG, None)
assert (
node_config is not None
), f"Detected node definition for {node} but there's no node_config specified. Please provide one."
block_mappings = self.config[AVAILABLE_NODE_TYPES][node][NODE_CONFIG][BLOCK_DEVICE_MAPPINGS]
if BLOCK_DEVICE_MAPPINGS not in node_config:
block_mappings = [{"DeviceName": "/dev/sda1", EBS: {VOLUME_SIZE: volumes_size}}]
else:
block_mappings[0][EBS].update({VOLUME_SIZE: volumes_size})

def _update_custom_image(self, custom_image_uri):
if custom_image_uri is not None:
if DOCKER not in self.config:
self.config[DOCKER] = {}
self.config[DOCKER].update({IMAGE: custom_image_uri})
20 changes: 3 additions & 17 deletions src/autogluon/cloud/cluster/ray_cluster_config_generator.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,7 @@
from typing import Any, Dict, Union
import os

from cluster_config_generator import ClusterConfigGenerator
from .cluster_config_generator import DEFAULT_CONFIG_LOCATION, ClusterConfigGenerator


class RayClusterConfigGenerator(ClusterConfigGenerator):
def __init__(self) -> None:
pass

@staticmethod
def get_default_config() -> Dict[str, Any]:
"""
Get default config of the cluster
"""
raise NotImplementedError

def update_config(new_config: Union[Dict[str, Any], str]) -> Dict[str, Any]:
"""
Update current config with given one. Settings in new_config will overwrite the old one.
"""
raise NotImplementedError
default_config_file = os.path.join(DEFAULT_CONFIG_LOCATION, "RAY_DUMMY")
4 changes: 2 additions & 2 deletions src/autogluon/cloud/cluster/ray_cluster_manager.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from typing import Dict

from cluster_manager import ClusterManager
from ray_cluster_config_generator import RayClusterConfigGenerator
from .cluster_manager import ClusterManager
from .ray_cluster_config_generator import RayClusterConfigGenerator


class RayClusterManager(ClusterManager):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# An unique identifier for the head node and workers of this cluster.
cluster_name: ag_ray_aws_default

# The maximum number of workers nodes to launch in addition to the head
max_workers: 2

# Cloud-provider specific configuration.
provider:
type: aws
region: us-east-1

# How Ray will authenticate with newly launched nodes.
auth:
ssh_user: ubuntu

available_node_types:
# Configurations for the head node.
head:
node_config:
InstanceType: m5.2xlarge
BlockDeviceMappings:
- DeviceName: /dev/sda1
Ebs:
VolumeSize: 100

# Configurations for the worker nodes.
worker:
min_workers: 2
node_config:
InstanceType: m5.2xlarge
BlockDeviceMappings:
- DeviceName: /dev/sda1
Ebs:
VolumeSize: 100

head_node_type: head
58 changes: 58 additions & 0 deletions tests/unittests/cluster/test_cluster_config_generator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import os
import tempfile

import pytest
import yaml

from autogluon.cloud.cluster import ClusterConfigGenerator, RayAWSClusterConfigGenerator
from autogluon.cloud.cluster.constants import (
AVAILABLE_NODE_TYPES,
BLOCK_DEVICE_MAPPINGS,
DOCKER,
EBS,
IMAGE,
INSTANCE_TYPE,
MAX_WORKERS,
MIN_WORKERS,
NODE_CONFIG,
PROVIDER,
REGION,
VOLUME_SIZE,
)


def _create_config_file(config):
if isinstance(config, str):
with open(config, "w") as yaml_file:
yaml.safe_dump({"foo": "bar"}, yaml_file)


@pytest.mark.parametrize("config_generator", [RayAWSClusterConfigGenerator])
@pytest.mark.parametrize("config", [None, {"foo": "bar"}, "dummy.yaml"])
def test_generate_config(config_generator, config):
with tempfile.TemporaryDirectory() as temp_dir:
os.chdir(temp_dir)
_create_config_file(config)
region = "us-west-2"
config_generator: ClusterConfigGenerator = config_generator(config, region=region)
assert isinstance(config_generator.config, dict)
assert config_generator.config[PROVIDER][REGION] == region


@pytest.mark.parametrize("config", [None, {"foo": "bar"}, "dummy.yaml"])
def test_update_ray_aws_cluster_config(config):
with tempfile.TemporaryDirectory() as temp_dir:
os.chdir(temp_dir)
_create_config_file(config)
config_generator = RayAWSClusterConfigGenerator(config)
dummy_config = {"cluster_name": "foo"}
config_generator.update_config(dummy_config)
assert config_generator.config["cluster_name"] == "foo"
config_generator.update_config(config_generator.default_config_file)
config_generator.update_config(instance_type="foo", instance_count=2, volumes_size=2, custom_image_uri="bar")
node = config_generator.config[AVAILABLE_NODE_TYPES]["worker"]
node_config = node[NODE_CONFIG]
assert node_config[INSTANCE_TYPE] == "foo"
assert config_generator.config[MAX_WORKERS] == 1 and node[MIN_WORKERS] == 1
assert node_config[BLOCK_DEVICE_MAPPINGS][0][EBS][VOLUME_SIZE] == 2
assert config_generator.config[DOCKER][IMAGE] == "bar"