From ef08910142723fe50bd97cc4d7081491ed3181bf Mon Sep 17 00:00:00 2001 From: Zhanghao Wu Date: Fri, 26 May 2023 08:11:10 -1000 Subject: [PATCH] [Core] Upgrade ray to 2.4.0 (#1734) * update the patches * upgrade node providers * fix azure config.py * print sky queue * add back azure disk size * fix job manager * fix hash * longer timeout * fix test smoke * Remove the patch for job_manager * longer timeout for azure_region test * address comments * format * fix templates * pip install --exists-action * Upgrade to 2.3 instead * upgrade to ray 2.3 * update patches for 2.4 * adopt changes for azure providers: a777a028b8dbd7bbae9a7393c98f6cd65f98a5f5 * fix license * fix patch for log monitor * sleep longer for the multi-echo * longer waiting time * longer wait time * fix click dependencies * update setup.py * Fix https://github.com/skypilot-org/skypilot/pull/1618#discussion_r1106102344 * fix https://github.com/skypilot-org/skypilot/pull/1618#discussion_r1106100129 * revert test_smoke * fix comment * revert to w instead of wipe * rewording * minor fix --- docs/source/reference/local/setup.rst | 4 +- examples/local/cluster-config.yaml | 2 +- sky/backends/cloud_vm_ray_backend.py | 8 +- .../monkey_patches/monkey_patch_ray_up.py | 4 +- sky/design_docs/onprem-design.md | 2 +- sky/setup_files/setup.py | 33 ++-- sky/skylet/LICENSE | 14 +- sky/skylet/constants.py | 2 +- sky/skylet/job_lib.py | 4 +- .../aws/cloudwatch/cloudwatch_helper.py | 161 ++++++++++-------- sky/skylet/providers/aws/config.py | 9 +- sky/skylet/providers/aws/node_provider.py | 5 + .../azure/azure-config-template.json | 56 ++++-- .../providers/azure/azure-vm-template.json | 35 +++- sky/skylet/providers/azure/config.py | 65 +++++-- sky/skylet/providers/azure/node_provider.py | 29 +++- sky/skylet/ray_patches/__init__.py | 5 +- sky/skylet/ray_patches/autoscaler.py.patch | 4 +- sky/skylet/ray_patches/cli.py.patch | 6 +- .../ray_patches/command_runner.py.patch | 5 +- sky/skylet/ray_patches/job_manager.py.patch | 19 --- sky/skylet/ray_patches/log_monitor.py.patch | 9 +- .../resource_demand_scheduler.py.patch | 6 +- sky/skylet/ray_patches/updater.py.patch | 4 +- sky/skylet/ray_patches/worker.py.patch | 15 +- sky/templates/aws-ray.yml.j2 | 6 +- sky/templates/azure-ray.yml.j2 | 6 +- sky/templates/gcp-ray.yml.j2 | 6 +- sky/templates/ibm-ray.yml.j2 | 2 +- sky/templates/lambda-ray.yml.j2 | 6 +- tests/test_smoke.py | 17 +- 31 files changed, 330 insertions(+), 219 deletions(-) delete mode 100644 sky/skylet/ray_patches/job_manager.py.patch diff --git a/docs/source/reference/local/setup.rst b/docs/source/reference/local/setup.rst index cec990e015a..33192fe855d 100644 --- a/docs/source/reference/local/setup.rst +++ b/docs/source/reference/local/setup.rst @@ -14,13 +14,13 @@ For further reference, `here = 3.7. $ pip3 install skypilot diff --git a/examples/local/cluster-config.yaml b/examples/local/cluster-config.yaml index e30992a603e..de09026a411 100644 --- a/examples/local/cluster-config.yaml +++ b/examples/local/cluster-config.yaml @@ -4,7 +4,7 @@ # The system administrator must have `sudo` access to the local nodes. # Requirements: # 1) Python (> 3.6) on all nodes. -# 2) Ray CLI (= 2.0.1) on all nodes. +# 2) Ray CLI (= 2.4.0) on all nodes. # # Example usage: # >> sky admin deploy cluster-config.yaml diff --git a/sky/backends/cloud_vm_ray_backend.py b/sky/backends/cloud_vm_ray_backend.py index 38920c718e9..e427baecab1 100644 --- a/sky/backends/cloud_vm_ray_backend.py +++ b/sky/backends/cloud_vm_ray_backend.py @@ -203,8 +203,8 @@ def add_prologue(self, self.job_id = job_id # Should use 'auto' or 'ray://:10001' rather than # 'ray://localhost:10001', or 'ray://127.0.0.1:10001', for public cloud. - # Otherwise, it will a bug of ray job failed to get the placement group - # in ray <= 2.0.1. + # Otherwise, ray will fail to get the placement group because of a bug + # in ray job. # TODO(mluo): Check why 'auto' not working with on-prem cluster and # whether the placement group issue also occurs in on-prem cluster. ray_address = 'ray://localhost:10001' if is_local else 'auto' @@ -1486,7 +1486,7 @@ def ray_up(): # Downside is existing tasks on the cluster will keep running # (which may be ok with the semantics of 'sky launch' twice). # Tracked in https://github.com/ray-project/ray/issues/20402. - # Ref: https://github.com/ray-project/ray/blob/releases/2.2.0/python/ray/autoscaler/sdk/sdk.py#L16-L49 # pylint: disable=line-too-long + # Ref: https://github.com/ray-project/ray/blob/releases/2.4.0/python/ray/autoscaler/sdk/sdk.py#L16-L49 # pylint: disable=line-too-long script_path = write_ray_up_script_with_patched_launch_hash_fn( cluster_config_file, ray_up_kwargs={'no_restart': True}) @@ -1721,7 +1721,7 @@ def _ensure_cluster_ray_started(self, handle: 'CloudVmRayResourceHandle', if isinstance(launched_resources.cloud, clouds.Local): raise RuntimeError( 'The command `ray status` errored out on the head node ' - 'of the local cluster. Check if ray[default]==2.0.1 ' + 'of the local cluster. Check if ray[default]==2.4.0 ' 'is installed or running correctly.') backend.run_on_head(handle, 'ray stop', use_cached_head_ip=False) diff --git a/sky/backends/monkey_patches/monkey_patch_ray_up.py b/sky/backends/monkey_patches/monkey_patch_ray_up.py index 34cb2f04e51..5819a5eb4e9 100644 --- a/sky/backends/monkey_patches/monkey_patch_ray_up.py +++ b/sky/backends/monkey_patches/monkey_patch_ray_up.py @@ -33,7 +33,7 @@ from ray.autoscaler import sdk -# Ref: https://github.com/ray-project/ray/blob/releases/2.2.0/python/ray/autoscaler/_private/util.py#L392-L404 +# Ref: https://github.com/ray-project/ray/blob/releases/2.4.0/python/ray/autoscaler/_private/util.py#L396-L408 def monkey_patch_hash_launch_conf(node_conf, auth): hasher = hashlib.sha1() # For hashing, we replace the path to the key with the key @@ -50,7 +50,7 @@ def monkey_patch_hash_launch_conf(node_conf, auth): return hasher.hexdigest() -# Ref: https://github.com/ray-project/ray/blob/840215bc09e942b50cad0ab2db96a8fdc79217c1/python/ray/autoscaler/_private/commands.py#L854-L912 +# Ref: https://github.com/ray-project/ray/blob/releases/2.4.0/python/ray/autoscaler/_private/commands.py#L854-L912 def monkey_patch_should_create_new_head( head_node_id, new_launch_hash, diff --git a/sky/design_docs/onprem-design.md b/sky/design_docs/onprem-design.md index b981656c0ab..0732eedd286 100644 --- a/sky/design_docs/onprem-design.md +++ b/sky/design_docs/onprem-design.md @@ -9,7 +9,7 @@ - Does not support different types of accelerators within the same node (intranode). ## Installing Ray and SkyPilot -- Admin installs Ray==2.0.1 and SkyPilot globally on all machines. It is assumed that the admin regularly keeps SkyPilot updated on the cluster. +- Admin installs Ray==2.4.0 and SkyPilot globally on all machines. It is assumed that the admin regularly keeps SkyPilot updated on the cluster. - Python >= 3.7 for all users. - When a regular user runs `sky launch`, a local version of SkyPilot will be installed on the machine for each user. The local installation of Ray is specified in `sky/templates/local-ray.yml.j2`. diff --git a/sky/setup_files/setup.py b/sky/setup_files/setup.py index ad2faa0d66d..38f08d6150e 100644 --- a/sky/setup_files/setup.py +++ b/sky/setup_files/setup.py @@ -65,9 +65,8 @@ def parse_readme(readme: str) -> str: install_requires = [ 'wheel', - # NOTE: ray 2.0.1 requires click<=8.0.4,>=7.0; We disable the - # shell completion for click<8.0 for backward compatibility. - 'click<=8.0.4,>=7.0', + # NOTE: ray requires click>=7.0 + 'click>=7.0', # NOTE: required by awscli. To avoid ray automatically installing # the latest version. 'colorama<0.4.5', @@ -84,22 +83,26 @@ def parse_readme(readme: str) -> str: # PrettyTable with version >=2.0.0 is required for the support of # `add_rows` method. 'PrettyTable>=2.0.0', - # Lower local ray version is not fully supported, due to the - # autoscaler issues (also tracked in #537). - 'ray[default]>=1.9.0,<=2.3.0', + # Lower version of ray will cause dependency conflict for + # click/grpcio/protobuf. + 'ray[default]>=2.2.0,<=2.4.0', 'rich', 'tabulate', - 'typing-extensions', + # Light weight requirement, can be replaced with "typing" once + # we deprecate Python 3.7 (this will take a while). + "typing_extensions; python_version < '3.8'", 'filelock>=3.6.0', - # This is used by ray. The latest 1.44.0 will generate an error - # `Fork support is only compatible with the epoll1 and poll - # polling strategies` - 'grpcio>=1.32.0,<=1.43.0', + # Adopted from ray's setup.py: + # Tracking issue: https://github.com/ray-project/ray/issues/30984 + "grpcio >= 1.32.0, <= 1.49.1; python_version < '3.10' and sys_platform == 'darwin'", # noqa:E501 + "grpcio >= 1.42.0, <= 1.49.1; python_version >= '3.10' and sys_platform == 'darwin'", # noqa:E501 + # Original issue: https://github.com/ray-project/ray/issues/33833 + "grpcio >= 1.32.0, <= 1.51.3; python_version < '3.10' and sys_platform != 'darwin'", # noqa:E501 + "grpcio >= 1.42.0, <= 1.51.3; python_version >= '3.10' and sys_platform != 'darwin'", # noqa:E501 'packaging', - # The latest 4.21.1 will break ray. Enforce < 4.0.0 until Ray releases the - # fix. - # https://github.com/ray-project/ray/pull/25211 - 'protobuf<4.0.0', + # Adopted from ray's setup.py: + # https://github.com/ray-project/ray/blob/86fab1764e618215d8131e8e5068f0d493c77023/python/setup.py#L326 + 'protobuf >= 3.15.3, != 3.19.5', 'psutil', 'pulp', ] diff --git a/sky/skylet/LICENSE b/sky/skylet/LICENSE index c9867ce194d..55e20728a6c 100644 --- a/sky/skylet/LICENSE +++ b/sky/skylet/LICENSE @@ -203,19 +203,19 @@ -------------------------------------------------------------------------------- Code in providers/azure from -https://github.com/ray-project/ray/tree/ray-2.0.1/python/ray/autoscaler/_private/_azure -Git commit of the release 2.0.1: 03b6bc7b5a305877501110ec04710a9c57011479 +https://github.com/ray-project/ray/tree/ray-2.4.0/python/ray/autoscaler/_private/_azure +Git commit of the release 2.4.0: a777a028b8dbd7bbae9a7393c98f6cd65f98a5f5 Code in providers/gcp from -https://github.com/ray-project/ray/tree/ray-2.0.1/python/ray/autoscaler/_private/gcp -Git commit of the release 2.0.1: 03b6bc7b5a305877501110ec04710a9c57011479 +https://github.com/ray-project/ray/tree/ray-2.4.0/python/ray/autoscaler/_private/gcp +Git commit of the release 2.4.0: 45ffe6eb99d96488fdec187bb47a4a78d9b5ee92 Code in providers/aws from -https://github.com/ray-project/ray/tree/ray-2.0.1/python/ray/autoscaler/_private/aws -Git commit of the release 2.0.1: 03b6bc7b5a305877501110ec04710a9c57011479 +https://github.com/ray-project/ray/tree/ray-2.4.0/python/ray/autoscaler/_private/aws +Git commit of the release 2.4.0: c27859fa49f6470b98743bdce8288c7242d89699 -Copyright 2016-2022 Ray developers +Copyright 2016-2023 Ray developers Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/sky/skylet/constants.py b/sky/skylet/constants.py index dad9465dc7b..75f6e42a3d0 100644 --- a/sky/skylet/constants.py +++ b/sky/skylet/constants.py @@ -2,7 +2,7 @@ SKY_LOGS_DIRECTORY = '~/sky_logs' SKY_REMOTE_WORKDIR = '~/sky_workdir' -SKY_REMOTE_RAY_VERSION = '2.0.1' +SKY_REMOTE_RAY_VERSION = '2.4.0' # TODO(mluo): Make explicit `sky launch -c ''` optional. UNINITIALIZED_ONPREM_CLUSTER_MESSAGE = ( diff --git a/sky/skylet/job_lib.py b/sky/skylet/job_lib.py index b3fbbd00765..316d9f5a3bc 100644 --- a/sky/skylet/job_lib.py +++ b/sky/skylet/job_lib.py @@ -392,7 +392,7 @@ def update_job_status(job_owner: str, during job cancelling, we still need this to handle the staleness problem, caused by instance restarting and other corner cases (if any). - This function should only be run on the remote instance with ray==2.0.1. + This function should only be run on the remote instance with ray==2.4.0. """ if len(job_ids) == 0: return [] @@ -402,7 +402,7 @@ def update_job_status(job_owner: str, job_client = _create_ray_job_submission_client() - # In ray 2.0.1, job_client.list_jobs returns a list of JobDetails, + # In ray 2.4.0, job_client.list_jobs returns a list of JobDetails, # which contains the job status (str) and submission_id (str). job_detail_lists: List['ray_pydantic.JobDetails'] = job_client.list_jobs() diff --git a/sky/skylet/providers/aws/cloudwatch/cloudwatch_helper.py b/sky/skylet/providers/aws/cloudwatch/cloudwatch_helper.py index 526cd19d1ae..b41be547f06 100644 --- a/sky/skylet/providers/aws/cloudwatch/cloudwatch_helper.py +++ b/sky/skylet/providers/aws/cloudwatch/cloudwatch_helper.py @@ -4,7 +4,8 @@ import logging import os import time -from typing import Any, Dict, List, Tuple, Union +from enum import Enum +from typing import Any, Callable, Dict, List, Union import botocore @@ -21,6 +22,12 @@ CLOUDWATCH_CONFIG_HASH_TAG_BASE = "cloudwatch-config-hash" +class CloudwatchConfigType(str, Enum): + AGENT = "agent" + DASHBOARD = "dashboard" + ALARM = "alarm" + + class CloudwatchHelper: def __init__( self, provider_config: Dict[str, Any], node_id: str, cluster_name: str @@ -34,6 +41,22 @@ def __init__( self.ssm_client = client_cache("ssm", region) cloudwatch_resource = resource_cache("cloudwatch", region) self.cloudwatch_client = cloudwatch_resource.meta.client + self.CLOUDWATCH_CONFIG_TYPE_TO_CONFIG_VARIABLE_REPLACE_FUNC: Dict[ + str, Callable + ] = { + CloudwatchConfigType.AGENT.value: self._replace_cwa_config_vars, + CloudwatchConfigType.DASHBOARD.value: self._replace_dashboard_config_vars, + CloudwatchConfigType.ALARM.value: self._load_config_file, + } + self.CLOUDWATCH_CONFIG_TYPE_TO_UPDATE_FUNC_HEAD_NODE: Dict[str, Callable] = { + CloudwatchConfigType.AGENT.value: self._restart_cloudwatch_agent, + CloudwatchConfigType.DASHBOARD.value: self._put_cloudwatch_dashboard, + CloudwatchConfigType.ALARM.value: self._put_cloudwatch_alarm, + } + self.CLOUDWATCH_CONFIG_TYPE_TO_UPDATE_FUNC_WORKER_NODE: Dict[str, Callable] = { + CloudwatchConfigType.AGENT.value: self._restart_cloudwatch_agent, + CloudwatchConfigType.ALARM.value: self._put_cloudwatch_alarm, + } def update_from_config(self, is_head_node: bool) -> None: """Discovers and applies CloudWatch config updates as required. @@ -41,12 +64,11 @@ def update_from_config(self, is_head_node: bool) -> None: Args: is_head_node: whether this node is the head node. """ - if CloudwatchHelper.cloudwatch_config_exists(self.provider_config, "agent"): - self._update_cloudwatch_config(is_head_node, "agent") - if CloudwatchHelper.cloudwatch_config_exists(self.provider_config, "dashboard"): - self._update_cloudwatch_config(is_head_node, "dashboard") - if CloudwatchHelper.cloudwatch_config_exists(self.provider_config, "alarm"): - self._update_cloudwatch_config(is_head_node, "alarm") + for config_type in CloudwatchConfigType: + if CloudwatchHelper.cloudwatch_config_exists( + self.provider_config, config_type.value + ): + self._update_cloudwatch_config(config_type.value, is_head_node) def _ec2_health_check_waiter(self, node_id: str) -> None: # wait for all EC2 instance checks to complete @@ -66,14 +88,10 @@ def _ec2_health_check_waiter(self, node_id: str) -> None: ) raise e - def _update_cloudwatch_config(self, is_head_node: bool, config_type: str) -> None: - """Update remote CloudWatch configs at Parameter Store, - update hash tag value on node and perform associated operations - at CloudWatch console if local CloudWatch configs change. - - Args: - is_head_node: whether this node is the head node. - config_type: CloudWatch config file type. + def _update_cloudwatch_config(self, config_type: str, is_head_node: bool) -> None: + """ + check whether update operations are needed in + cloudwatch related configs """ cwa_installed = self._setup_cwa() param_name = self._get_ssm_param_name(config_type) @@ -84,19 +102,16 @@ def _update_cloudwatch_config(self, is_head_node: bool, config_type: str) -> Non ) cur_cw_config_hash = self._sha1_hash_file(config_type) ssm_cw_config_hash = self._sha1_hash_json(cw_config_ssm) - # check if user updated Unified Cloudwatch Agent config file. + # check if user updated cloudwatch related config files. # if so, perform corresponding actions. if cur_cw_config_hash != ssm_cw_config_hash: logger.info( "Cloudwatch {} config file has changed.".format(config_type) ) self._upload_config_to_ssm_and_set_hash_tag(config_type) - if config_type == "agent": - self._restart_cloudwatch_agent() - elif config_type == "dashboard": - self._put_cloudwatch_dashboard() - elif config_type == "alarm": - self._put_cloudwatch_alarm() + self.CLOUDWATCH_CONFIG_TYPE_TO_UPDATE_FUNC_HEAD_NODE.get( + config_type + )() else: head_node_hash = self._get_head_node_config_hash(config_type) cur_node_hash = self._get_cur_node_config_hash(config_type) @@ -104,10 +119,13 @@ def _update_cloudwatch_config(self, is_head_node: bool, config_type: str) -> Non logger.info( "Cloudwatch {} config file has changed.".format(config_type) ) - if config_type == "agent": - self._restart_cloudwatch_agent() - if config_type == "alarm": - self._put_cloudwatch_alarm() + update_func = ( + self.CLOUDWATCH_CONFIG_TYPE_TO_UPDATE_FUNC_WORKER_NODE.get( + config_type + ) + ) + if update_func: + update_func() self._update_cloudwatch_hash_tag_value( self.node_id, head_node_hash, config_type ) @@ -120,7 +138,9 @@ def _put_cloudwatch_dashboard(self) -> Dict[str, Any]: dashboard_name_cluster = dashboard_config.get("name", self.cluster_name) dashboard_name = self.cluster_name + "-" + dashboard_name_cluster - widgets = self._replace_dashboard_config_variables() + widgets = self._replace_dashboard_config_vars( + CloudwatchConfigType.DASHBOARD.value + ) response = self.cloudwatch_client.put_dashboard( DashboardName=dashboard_name, DashboardBody=json.dumps({"widgets": widgets}) @@ -144,7 +164,7 @@ def _put_cloudwatch_dashboard(self) -> Dict[str, Any]: def _put_cloudwatch_alarm(self) -> None: """put CloudWatch metric alarms read from config""" - param_name = self._get_ssm_param_name("alarm") + param_name = self._get_ssm_param_name(CloudwatchConfigType.ALARM.value) data = json.loads(self._get_ssm_param(param_name)) for item in data: item_out = copy.deepcopy(item) @@ -158,7 +178,7 @@ def _put_cloudwatch_alarm(self) -> None: logger.info("Successfully put alarms to CloudWatch console") def _send_command_to_node( - self, document_name: str, parameters: List[str], node_id: str + self, document_name: str, parameters: Dict[str, List[str]], node_id: str ) -> Dict[str, Any]: """send SSM command to the given nodes""" logger.debug( @@ -177,10 +197,10 @@ def _send_command_to_node( def _ssm_command_waiter( self, document_name: str, - parameters: List[str], + parameters: Dict[str, List[str]], node_id: str, retry_failed: bool = True, - ) -> bool: + ) -> Dict[str, Any]: """wait for SSM command to complete on all cluster nodes""" # This waiter differs from the built-in SSM.Waiter by @@ -192,7 +212,9 @@ def _ssm_command_waiter( command_id = response["Command"]["CommandId"] cloudwatch_config = self.provider_config["cloudwatch"] - agent_retryer_config = cloudwatch_config.get("agent").get("retryer", {}) + agent_retryer_config = cloudwatch_config.get( + CloudwatchConfigType.AGENT.value + ).get("retryer", {}) max_attempts = agent_retryer_config.get("max_attempts", 120) delay_seconds = agent_retryer_config.get("delay_seconds", 30) num_attempts = 0 @@ -283,19 +305,19 @@ def _replace_config_variables( def _replace_all_config_variables( self, - collection: Union[dict, list], + collection: Union[Dict[str, Any], str], node_id: str, cluster_name: str, region: str, - ) -> Tuple[(Union[dict, list], int)]: + ) -> Union[str, Dict[str, Any]]: """ Replace known config variable occurrences in the input collection. - The input collection must be either a dict or list. Returns a tuple consisting of the output collection and the number of modified strings in the collection (which is not necessarily equal to the number of variables replaced). """ + for key in collection: if type(collection) is dict: value = collection.get(key) @@ -303,6 +325,12 @@ def _replace_all_config_variables( elif type(collection) is list: value = key index_key = collection.index(key) + else: + raise ValueError( + f"Can't replace CloudWatch config variables " + f"in unsupported collection type: {type(collection)}." + f"Please check your CloudWatch JSON config files." + ) if type(value) is str: collection[index_key] = self._replace_config_variables( value, node_id, cluster_name, region @@ -344,8 +372,8 @@ def _set_cloudwatch_ssm_config_param( return self._get_default_empty_config_file_hash() else: logger.info( - "Failed to fetch CloudWatch {} config from SSM " - "parameter store.".format(config_type) + "Failed to fetch Unified CloudWatch Agent config from SSM " + "parameter store." ) logger.error(e) raise e @@ -368,31 +396,25 @@ def _get_ssm_param(self, parameter_name: str) -> str: def _sha1_hash_json(self, value: str) -> str: """calculate the json string sha1 hash""" - hash = hashlib.new("sha1") + sha1_hash = hashlib.new("sha1") binary_value = value.encode("ascii") - hash.update(binary_value) - sha1_res = hash.hexdigest() + sha1_hash.update(binary_value) + sha1_res = sha1_hash.hexdigest() return sha1_res def _sha1_hash_file(self, config_type: str) -> str: """calculate the config file sha1 hash""" - if config_type == "agent": - config = self._replace_cwa_config_variables() - if config_type == "dashboard": - config = self._replace_dashboard_config_variables() - if config_type == "alarm": - config = self._load_config_file("alarm") + config = self.CLOUDWATCH_CONFIG_TYPE_TO_CONFIG_VARIABLE_REPLACE_FUNC.get( + config_type + )(config_type) value = json.dumps(config) sha1_res = self._sha1_hash_json(value) return sha1_res def _upload_config_to_ssm_and_set_hash_tag(self, config_type: str): - if config_type == "agent": - data = self._replace_cwa_config_variables() - if config_type == "dashboard": - data = self._replace_dashboard_config_variables() - if config_type == "alarm": - data = self._load_config_file("alarm") + data = self.CLOUDWATCH_CONFIG_TYPE_TO_CONFIG_VARIABLE_REPLACE_FUNC.get( + config_type + )(config_type) sha1_hash_value = self._sha1_hash_file(config_type) self._upload_config_to_ssm(data, config_type) self._update_cloudwatch_hash_tag_value( @@ -405,7 +427,7 @@ def _add_cwa_installed_tag(self, node_id: str) -> None: Tags=[{"Key": CLOUDWATCH_AGENT_INSTALLED_TAG, "Value": "True"}], ) logger.info( - "Successfully add Unified Cloudwatch Agent installed " + "Successfully add Unified CloudWatch Agent installed " "tag on {}".format(node_id) ) @@ -444,12 +466,12 @@ def _upload_config_to_ssm(self, param: Dict[str, Any], config_type: str): param_name = self._get_ssm_param_name(config_type) self._put_ssm_param(param, param_name) - def _replace_cwa_config_variables(self) -> Dict[str, Any]: + def _replace_cwa_config_vars(self, config_type: str) -> Dict[str, Any]: """ replace {instance_id}, {region}, {cluster_name} variable occurrences in Unified Cloudwatch Agent config file """ - cwa_config = self._load_config_file("agent") + cwa_config = self._load_config_file(config_type) self._replace_all_config_variables( cwa_config, self.node_id, @@ -458,11 +480,11 @@ def _replace_cwa_config_variables(self) -> Dict[str, Any]: ) return cwa_config - def _replace_dashboard_config_variables(self) -> List[Dict[str, Any]]: + def _replace_dashboard_config_vars(self, config_type: str) -> List[str]: """ replace known variable occurrences in CloudWatch Dashboard config file """ - data = self._load_config_file("dashboard") + data = self._load_config_file(config_type) widgets = [] for item in data: item_out = self._replace_all_config_variables( @@ -471,16 +493,15 @@ def _replace_dashboard_config_variables(self) -> List[Dict[str, Any]]: self.cluster_name, self.provider_config["region"], ) - item_out = copy.deepcopy(item) widgets.append(item_out) return widgets - def _replace_alarm_config_variables(self) -> List[Dict[str, Any]]: + def _replace_alarm_config_vars(self, config_type: str) -> List[str]: """ replace {instance_id}, {region}, {cluster_name} variable occurrences in cloudwatch alarm config file """ - data = self._load_config_file("alarm") + data = self._load_config_file(config_type) param_data = [] for item in data: item_out = copy.deepcopy(item) @@ -494,11 +515,11 @@ def _replace_alarm_config_variables(self) -> List[Dict[str, Any]]: return param_data def _restart_cloudwatch_agent(self) -> None: - """restart Unified Cloudwatch Agent""" - cwa_param_name = self._get_ssm_param_name("agent") + """restart Unified CloudWatch Agent""" + cwa_param_name = self._get_ssm_param_name(CloudwatchConfigType.AGENT.value) logger.info( - "Restarting Unified Cloudwatch Agent package on {} node(s).".format( - (self.node_id) + "Restarting Unified CloudWatch Agent package on node {}.".format( + self.node_id ) ) self._stop_cloudwatch_agent() @@ -691,7 +712,9 @@ def resolve_instance_profile_name( default ray instance profile name if cloudwatch config file doesn't exist. """ - cwa_cfg_exists = CloudwatchHelper.cloudwatch_config_exists(config, "agent") + cwa_cfg_exists = CloudwatchHelper.cloudwatch_config_exists( + config, CloudwatchConfigType.AGENT.value + ) return ( CLOUDWATCH_RAY_INSTANCE_PROFILE if cwa_cfg_exists @@ -712,7 +735,9 @@ def resolve_iam_role_name( default cloudwatch iam role name if cloudwatch config file exists. default ray iam role name if cloudwatch config file doesn't exist. """ - cwa_cfg_exists = CloudwatchHelper.cloudwatch_config_exists(config, "agent") + cwa_cfg_exists = CloudwatchHelper.cloudwatch_config_exists( + config, CloudwatchConfigType.AGENT.value + ) return CLOUDWATCH_RAY_IAM_ROLE if cwa_cfg_exists else default_iam_role_name @staticmethod @@ -731,7 +756,9 @@ def resolve_policy_arns( related operations if cloudwatch agent config is specifed in cluster config file. """ - cwa_cfg_exists = CloudwatchHelper.cloudwatch_config_exists(config, "agent") + cwa_cfg_exists = CloudwatchHelper.cloudwatch_config_exists( + config, CloudwatchConfigType.AGENT.value + ) if cwa_cfg_exists: cloudwatch_managed_policy = { "Version": "2012-10-17", diff --git a/sky/skylet/providers/aws/config.py b/sky/skylet/providers/aws/config.py index 16c769eec33..79ab8ebdc23 100644 --- a/sky/skylet/providers/aws/config.py +++ b/sky/skylet/providers/aws/config.py @@ -40,6 +40,8 @@ DEFAULT_AMI_NAME = "AWS Deep Learning AMI (Ubuntu 18.04) V61.0" # Obtained from https://aws.amazon.com/marketplace/pp/B07Y43P7X5 on 6/10/2022. +# TODO(alex) : write a unit test to make sure we update AMI version used in +# ray/autoscaler/aws/example-full.yaml whenever we update this dict. # NOTE(skypilot): these are not used; skypilot instead uses the default AMIs in aws.py. DEFAULT_AMI = { "us-east-1": "ami-0dd6adfad4ad37eec", # US East (N. Virginia) @@ -52,6 +54,11 @@ "eu-west-2": "ami-094ba2b4651f761ca", # EU (London) "eu-west-3": "ami-031da10fbf225bf5f", # EU (Paris) "sa-east-1": "ami-0be7c1f1dd96d7337", # SA (Sao Paulo) + "ap-northeast-1": "ami-0d69b2fd9641af433", # Asia Pacific (Tokyo) + "ap-northeast-2": "ami-0d6d00bd58046ff91", # Asia Pacific (Seoul) + "ap-northeast-3": "ami-068feab7122f7558d", # Asia Pacific (Osaka) + "ap-southeast-1": "ami-05006b266c1be4e8f", # Asia Pacific (Singapore) + "ap-southeast-2": "ami-066aa744514f9f95c", # Asia Pacific (Sydney) } # todo: cli_logger should handle this assert properly @@ -428,7 +435,7 @@ def _configure_key_pair(config): os.makedirs(os.path.expanduser("~/.ssh"), exist_ok=True) # Try a few times to get or create a good key pair. - MAX_NUM_KEYS = 30 + MAX_NUM_KEYS = 60 for i in range(MAX_NUM_KEYS): key_name = config["provider"].get("key_pair", {}).get("key_name") diff --git a/sky/skylet/providers/aws/node_provider.py b/sky/skylet/providers/aws/node_provider.py index d6499fdbb46..86297b748f7 100644 --- a/sky/skylet/providers/aws/node_provider.py +++ b/sky/skylet/providers/aws/node_provider.py @@ -499,6 +499,11 @@ def _create_node(self, node_config, tags, count): break except botocore.exceptions.ClientError as exc: if attempt == max_tries: + # SkyPilot: do not adopt the changes from upstream in + # https://github.com/ray-project/ray/commit/c2abfdb2f7eee7f3e4320cb0d9e8e3bd639d5680#diff-eeb7bc1d8342583cf12c40536240dbcc67f089466a18a37bd60f187265a2dc94 + # which replaces the exception to NodeLaunchException. As we directly + # handle the exception output in + # cloud_vm_ray_backend._update_blocklist_on_aws_error cli_logger.abort( "Failed to launch instances. Max attempts exceeded.", exc=exc, diff --git a/sky/skylet/providers/azure/azure-config-template.json b/sky/skylet/providers/azure/azure-config-template.json index ae6efe8f7d5..1a13a67a121 100644 --- a/sky/skylet/providers/azure/azure-config-template.json +++ b/sky/skylet/providers/azure/azure-config-template.json @@ -2,42 +2,54 @@ "$schema": "https://schema.management.azure.com/schemas/2019-04-01/deploymentTemplate.json#", "contentVersion": "1.0.0.0", "parameters": { + "clusterId": { + "type": "string", + "metadata": { + "description": "Unique string appended to resource names to isolate resources from different ray clusters." + } + }, "subnet": { "type": "string", "metadata": { - "description": "The subnet to be used" + "description": "Subnet parameters." } } }, "variables": { - "Contributor": "[subscriptionResourceId('Microsoft.Authorization/roleDefinitions', 'b24988ac-6180-42a0-ab88-20f7382dd24c')]", - "location": "[resourceGroup().location]" + "contributor": "[subscriptionResourceId('Microsoft.Authorization/roleDefinitions', 'b24988ac-6180-42a0-ab88-20f7382dd24c')]", + "location": "[resourceGroup().location]", + "msiName": "[concat('ray-', parameters('clusterId'), '-msi')]", + "roleAssignmentName": "[concat('ray-', parameters('clusterId'), '-ra')]", + "nsgName": "[concat('ray-', parameters('clusterId'), '-nsg')]", + "nsg": "[resourceId('Microsoft.Network/networkSecurityGroups', variables('nsgName'))]", + "vnetName": "[concat('ray-', parameters('clusterId'), '-vnet')]", + "subnetName": "[concat('ray-', parameters('clusterId'), '-subnet')]" }, "resources": [ { "type": "Microsoft.ManagedIdentity/userAssignedIdentities", "apiVersion": "2018-11-30", "location": "[variables('location')]", - "name": "ray-msi-user-identity" + "name": "[variables('msiName')]" }, { "type": "Microsoft.Authorization/roleAssignments", - "apiVersion": "2020-04-01-preview", - "name": "[guid(resourceGroup().id)]", + "apiVersion": "2020-08-01-preview", + "name": "[guid(variables('roleAssignmentName'))]", "properties": { - "principalId": "[reference('ray-msi-user-identity').principalId]", - "roleDefinitionId": "[variables('Contributor')]", + "principalId": "[reference(variables('msiName')).principalId]", + "roleDefinitionId": "[variables('contributor')]", "scope": "[resourceGroup().id]", "principalType": "ServicePrincipal" }, "dependsOn": [ - "[resourceId('Microsoft.ManagedIdentity/userAssignedIdentities', 'ray-msi-user-identity')]" + "[variables('msiName')]" ] }, { "type": "Microsoft.Network/networkSecurityGroups", "apiVersion": "2019-02-01", - "name": "ray-nsg", + "name": "[variables('nsgName')]", "location": "[variables('location')]", "properties": { "securityRules": [ @@ -60,7 +72,7 @@ { "type": "Microsoft.Network/virtualNetworks", "apiVersion": "2019-11-01", - "name": "ray-vnet", + "name": "[variables('vnetName')]", "location": "[variables('location')]", "properties": { "addressSpace": { @@ -70,19 +82,33 @@ }, "subnets": [ { - "name": "ray-subnet", + "name": "[variables('subnetName')]", "properties": { "addressPrefix": "[parameters('subnet')]", "networkSecurityGroup": { - "id": "[resourceId('Microsoft.Network/networkSecurityGroups','ray-nsg')]" + "id": "[variables('nsg')]" } } } ] }, "dependsOn": [ - "[resourceId('Microsoft.Network/networkSecurityGroups', 'ray-nsg')]" + "[variables('nsg')]" ] } - ] + ], + "outputs": { + "subnet": { + "type": "string", + "value": "[resourceId('Microsoft.Network/virtualNetworks/subnets', variables('vnetName'), variables('subnetName'))]" + }, + "nsg": { + "type": "string", + "value": "[variables('nsg')]" + }, + "msi": { + "type": "string", + "value": "[resourceId('Microsoft.ManagedIdentity/userAssignedIdentities', variables('msiName'))]" + } + } } diff --git a/sky/skylet/providers/azure/azure-vm-template.json b/sky/skylet/providers/azure/azure-vm-template.json index b374a568682..03ca08944c2 100644 --- a/sky/skylet/providers/azure/azure-vm-template.json +++ b/sky/skylet/providers/azure/azure-vm-template.json @@ -89,6 +89,24 @@ "description": "OS disk size in GBs." } }, + "msi": { + "type": "string", + "metadata": { + "description": "Managed service identity resource id." + } + }, + "nsg": { + "type": "string", + "metadata": { + "description": "Network security group resource id." + } + }, + "subnet": { + "type": "string", + "metadata": { + "descriptions": "Subnet resource id." + } + }, "osDiskTier": { "type": "string", "allowedValues": [ @@ -103,12 +121,11 @@ }, "variables": { "location": "[resourceGroup().location]", - "networkInterfaceNamePrivate": "[concat(parameters('vmName'),'-nic')]", - "networkInterfaceNamePublic": "[concat(parameters('vmName'),'-nic-public')]", + "networkInterfaceNamePrivate": "[concat(parameters('vmName'), '-nic')]", + "networkInterfaceNamePublic": "[concat(parameters('vmName'), '-nic-public')]", "networkInterfaceName": "[if(parameters('provisionPublicIp'), variables('networkInterfaceNamePublic'), variables('networkInterfaceNamePrivate'))]", "networkIpConfig": "[guid(resourceGroup().id, parameters('vmName'))]", - "publicIpAddressName": "[concat(parameters('vmName'), '-ip' )]", - "subnetRef": "[resourceId('Microsoft.Network/virtualNetworks/subnets', 'ray-vnet', 'ray-subnet')]" + "publicIpAddressName": "[concat(parameters('vmName'), '-ip')]" }, "resources": [ { @@ -129,7 +146,7 @@ "name": "[variables('networkIpConfig')]", "properties": { "subnet": { - "id": "[variables('subnetRef')]" + "id": "[parameters('subnet')]" }, "privateIPAllocationMethod": "Dynamic", "publicIpAddress": { @@ -139,7 +156,7 @@ } ], "networkSecurityGroup": { - "id": "[resourceId('Microsoft.Network/networkSecurityGroups','ray-nsg')]" + "id": "[parameters('nsg')]" } }, "condition": "[parameters('provisionPublicIp')]" @@ -159,14 +176,14 @@ "name": "[variables('networkIpConfig')]", "properties": { "subnet": { - "id": "[variables('subnetRef')]" + "id": "[parameters('subnet')]" }, "privateIPAllocationMethod": "Dynamic" } } ], "networkSecurityGroup": { - "id": "[resourceId('Microsoft.Network/networkSecurityGroups','ray-nsg')]" + "id": "[parameters('nsg')]" } }, "condition": "[not(parameters('provisionPublicIp'))]" @@ -251,7 +268,7 @@ "identity": { "type": "UserAssigned", "userAssignedIdentities": { - "[resourceId('Microsoft.ManagedIdentity/userAssignedIdentities', 'ray-msi-user-identity')]": { + "[parameters('msi')]": { } } } diff --git a/sky/skylet/providers/azure/config.py b/sky/skylet/providers/azure/config.py index 5ce8269954d..fb0687f6f7c 100644 --- a/sky/skylet/providers/azure/config.py +++ b/sky/skylet/providers/azure/config.py @@ -1,6 +1,7 @@ import json import logging import random +from hashlib import sha256 from pathlib import Path from typing import Any, Callable @@ -9,11 +10,7 @@ from azure.mgmt.resource import ResourceManagementClient from azure.mgmt.resource.resources.models import DeploymentMode -RETRIES = 30 -MSI_NAME = "ray-msi-user-identity" -NSG_NAME = "ray-nsg" -SUBNET_NAME = "ray-subnet" -VNET_NAME = "ray-vnet" +UNIQUE_ID_LEN = 4 logger = logging.getLogger(__name__) @@ -26,7 +23,9 @@ def get_azure_sdk_function(client: Any, function_name: str) -> Callable: versions of the SDK by first trying the old name and falling back to the prefixed new name. """ - func = getattr(client, function_name, getattr(client, f"begin_{function_name}")) + func = getattr( + client, function_name, getattr(client, f"begin_{function_name}", None) + ) if func is None: raise AttributeError( "'{obj}' object has no {func} or begin_{func} attribute".format( @@ -70,10 +69,11 @@ def _configure_resource_group(config): if "tags" in config["provider"]: params["tags"] = config["provider"]["tags"] - logger.info("Creating/Updating Resource Group: %s", resource_group) - resource_client.resource_groups.create_or_update( - resource_group_name=resource_group, parameters=params + logger.info("Creating/Updating resource group: %s", resource_group) + rg_create_or_update = get_azure_sdk_function( + client=resource_client.resource_groups, function_name="create_or_update" ) + rg_create_or_update(resource_group_name=resource_group, parameters=params) # load the template file current_path = Path(__file__).parent @@ -81,26 +81,55 @@ def _configure_resource_group(config): with open(template_path, "r") as template_fp: template = json.load(template_fp) - # choose a random subnet, skipping most common value of 0 - random.seed(resource_group) - subnet_mask = "10.{}.0.0/16".format(random.randint(1, 254)) + logger.info("Using cluster name: %s", config["cluster_name"]) + + # set unique id for resources in this cluster + unique_id = config["provider"].get("unique_id") + if unique_id is None: + hasher = sha256() + hasher.update(config["provider"]["resource_group"].encode("utf-8")) + unique_id = hasher.hexdigest()[:UNIQUE_ID_LEN] + else: + unique_id = str(unique_id) + config["provider"]["unique_id"] = unique_id + logger.info("Using unique id: %s", unique_id) + cluster_id = "{}-{}".format(config["cluster_name"], unique_id) + + subnet_mask = config["provider"].get("subnet_mask") + if subnet_mask is None: + # choose a random subnet, skipping most common value of 0 + random.seed(unique_id) + subnet_mask = "10.{}.0.0/16".format(random.randint(1, 254)) + logger.info("Using subnet mask: %s", subnet_mask) parameters = { "properties": { "mode": DeploymentMode.incremental, "template": template, - "parameters": {"subnet": {"value": subnet_mask}}, + "parameters": { + "subnet": {"value": subnet_mask}, + "clusterId": {"value": cluster_id}, + }, } } create_or_update = get_azure_sdk_function( client=resource_client.deployments, function_name="create_or_update" ) - create_or_update( - resource_group_name=resource_group, - deployment_name="ray-config", - parameters=parameters, - ).wait() + outputs = ( + create_or_update( + resource_group_name=resource_group, + deployment_name="ray-config", + parameters=parameters, + ) + .result() + .properties.outputs + ) + + # append output resource ids to be used with vm creation + config["provider"]["msi"] = outputs["msi"]["value"] + config["provider"]["nsg"] = outputs["nsg"]["value"] + config["provider"]["subnet"] = outputs["subnet"]["value"] return config diff --git a/sky/skylet/providers/azure/node_provider.py b/sky/skylet/providers/azure/node_provider.py index e774b64e588..1a0306e5b67 100644 --- a/sky/skylet/providers/azure/node_provider.py +++ b/sky/skylet/providers/azure/node_provider.py @@ -25,7 +25,7 @@ ) VM_NAME_MAX_LEN = 64 -VM_NAME_UUID_LEN = 8 +UNIQUE_ID_LEN = 4 logger = logging.getLogger(__name__) azure_logger = logging.getLogger("azure.core.pipeline.policies.http_logging_policy") @@ -64,7 +64,9 @@ def __init__(self, provider_config, cluster_name): # to create/update it here, so the resource group always exists. from sky.skylet.providers.azure.config import _configure_resource_group - _configure_resource_group({"provider": provider_config}) + _configure_resource_group( + {"cluster_name": cluster_name, "provider": provider_config} + ) subscription_id = provider_config["subscription_id"] self.cache_stopped_nodes = provider_config.get("cache_stopped_nodes", True) # Sky only supports Azure CLI credential for now. @@ -83,8 +85,11 @@ def __init__(self, provider_config, cluster_name): @synchronized def _get_filtered_nodes(self, tag_filters): + # add cluster name filter to only get nodes from this cluster + cluster_tag_filters = {**tag_filters, TAG_RAY_CLUSTER_NAME: self.cluster_name} + def match_tags(vm): - for k, v in tag_filters.items(): + for k, v in cluster_tag_filters.items(): if vm.tags.get(k) != v: return False return True @@ -149,7 +154,10 @@ def non_terminated_nodes(self, tag_filters): nodes() must be called again to refresh results. Examples: - >>> provider.non_terminated_nodes({TAG_RAY_NODE_KIND: "worker"}) + >>> from ray.autoscaler.tags import TAG_RAY_NODE_KIND + >>> provider = ... # doctest: +SKIP + >>> provider.non_terminated_nodes( # doctest: +SKIP + ... {TAG_RAY_NODE_KIND: "worker"}) ["node-1", "node-2"] """ nodes = self._get_filtered_nodes(tag_filters=tag_filters) @@ -267,9 +275,11 @@ def _create_node(self, node_config, tags, count): config_tags.update(tags) config_tags[TAG_RAY_CLUSTER_NAME] = self.cluster_name - name_tag = config_tags.get(TAG_RAY_NODE_NAME, "node") - unique_id = uuid4().hex[:VM_NAME_UUID_LEN] - vm_name = "{name}-{id}".format(name=name_tag, id=unique_id) + vm_name = "{node}-{unique_id}-{vm_id}".format( + node=config_tags.get(TAG_RAY_NODE_NAME, "node"), + unique_id=self.provider_config["unique_id"], + vm_id=uuid4().hex[:UNIQUE_ID_LEN], + )[:VM_NAME_MAX_LEN] use_internal_ips = self.provider_config.get("use_internal_ips", False) template_params = node_config["azure_arm_parameters"].copy() @@ -277,6 +287,9 @@ def _create_node(self, node_config, tags, count): template_params["provisionPublicIp"] = not use_internal_ips template_params["vmTags"] = config_tags template_params["vmCount"] = count + template_params["msi"] = self.provider_config["msi"] + template_params["nsg"] = self.provider_config["nsg"] + template_params["subnet"] = self.provider_config["subnet"] parameters = { "properties": { @@ -294,7 +307,7 @@ def _create_node(self, node_config, tags, count): ) create_or_update( resource_group_name=resource_group, - deployment_name="ray-vm-{}".format(name_tag), + deployment_name=vm_name, parameters=parameters, ).wait() diff --git a/sky/skylet/ray_patches/__init__.py b/sky/skylet/ray_patches/__init__.py index 10fd0f58c68..7178779d0e0 100644 --- a/sky/skylet/ray_patches/__init__.py +++ b/sky/skylet/ray_patches/__init__.py @@ -14,7 +14,7 @@ Example workflow: - >> wget https://raw.githubusercontent.com/ray-project/ray/releases/2.0.1/python/ray/autoscaler/_private/command_runner.py + >> wget https://raw.githubusercontent.com/ray-project/ray/releases/2.4.0/python/ray/autoscaler/_private/command_runner.py >> cp command_runner.py command_runner.py.1 >> # Make some edits to command_runner.py.1... @@ -69,9 +69,6 @@ def patch() -> None: from ray.dashboard.modules.job import cli _run_patch(cli.__file__, _to_absolute('cli.py.patch')) - from ray.dashboard.modules.job import job_manager - _run_patch(job_manager.__file__, _to_absolute('job_manager.py.patch')) - from ray.autoscaler._private import autoscaler _run_patch(autoscaler.__file__, _to_absolute('autoscaler.py.patch')) diff --git a/sky/skylet/ray_patches/autoscaler.py.patch b/sky/skylet/ray_patches/autoscaler.py.patch index 2d466dc1735..732c27282b2 100644 --- a/sky/skylet/ray_patches/autoscaler.py.patch +++ b/sky/skylet/ray_patches/autoscaler.py.patch @@ -1,9 +1,9 @@ 0a1,4 -> # From https://github.com/ray-project/ray/blob/ray-2.0.1/python/ray/autoscaler/_private/autoscaler.py +> # From https://github.com/ray-project/ray/blob/ray-2.4.0/python/ray/autoscaler/_private/autoscaler.py > # Sky patch changes: > # - enable upscaling_speed to be 0.0 > -1022c1026 +1068c1072 < if upscaling_speed: --- > if upscaling_speed is not None: # NOTE(sky): enable 0.0 diff --git a/sky/skylet/ray_patches/cli.py.patch b/sky/skylet/ray_patches/cli.py.patch index 7eef4cd19c0..62313082e53 100644 --- a/sky/skylet/ray_patches/cli.py.patch +++ b/sky/skylet/ray_patches/cli.py.patch @@ -1,11 +1,11 @@ 0a1,4 -> # Adapted from https://github.com/ray-project/ray/blob/ray-2.0.1/dashboard/modules/job/cli.py +> # Adapted from https://github.com/ray-project/ray/blob/ray-2.4.0/dashboard/modules/job/cli.py > # Fixed the problem in ray's issue https://github.com/ray-project/ray/issues/26514 > # Otherwise, the output redirection ">" will not work. > -5d8 +4d7 < from subprocess import list2cmdline -182c185 +212c215 < entrypoint=list2cmdline(entrypoint), --- > entrypoint=" ".join(entrypoint), diff --git a/sky/skylet/ray_patches/command_runner.py.patch b/sky/skylet/ray_patches/command_runner.py.patch index 899f2e02d75..afa525bb5e7 100644 --- a/sky/skylet/ray_patches/command_runner.py.patch +++ b/sky/skylet/ray_patches/command_runner.py.patch @@ -1,4 +1,7 @@ -329c329 +0a1,2 +> # From https://github.com/ray-project/ray/blob/ray-2.4.0/python/ray/autoscaler/_private/command_runner.py +> +140c142 < "ControlPersist": "10s", --- > "ControlPersist": "300s", diff --git a/sky/skylet/ray_patches/job_manager.py.patch b/sky/skylet/ray_patches/job_manager.py.patch deleted file mode 100644 index 82c54fd9003..00000000000 --- a/sky/skylet/ray_patches/job_manager.py.patch +++ /dev/null @@ -1,19 +0,0 @@ -0a1,4 -> # Adapted from https://github.com/ray-project/ray/blob/ray-2.0.1/dashboard/modules/job/job_manager.py -> # Fixed the problem where the _monitor_job thread is leaked, due to `await job_supervisor.ping.remote()` -> # does not raise an exception after the job_supervisor is exited, causing the dashboard to hang. -> -384c388,398 -< await job_supervisor.ping.remote() ---- -> # Simulate the await behavior, in case some unexpected exception happens -> # and this will not yield for other coroutines. -> # https://ray-distributed.slack.com/archives/CP950VC76/p1661206198489669?thread_ts=1660955297.652409&cid=CP950VC76 -> ref = [job_supervisor.ping.remote()] -> not_ready = ref -> while not_ready: -> ready, not_ready = ray.wait(ref, timeout=0.1) -> if ready: -> ray.get(ready) -> else: -> await asyncio.sleep(0) diff --git a/sky/skylet/ray_patches/log_monitor.py.patch b/sky/skylet/ray_patches/log_monitor.py.patch index f06d150e0ef..5743f94f538 100644 --- a/sky/skylet/ray_patches/log_monitor.py.patch +++ b/sky/skylet/ray_patches/log_monitor.py.patch @@ -1,8 +1,9 @@ -0a1,3 -> # Adapted from https://github.com/ray-project/ray/blob/ray-2.0.1/python/ray/_private/log_monitor.py +0a1,4 +> # Original file https://github.com/ray-project/ray/blob/ray-2.4.0/python/ray/_private/log_monitor.py > # Fixed the problem for progress bar, as the latest version does not preserve \r for progress bar. -> # The change is adapted from https://github.com/ray-project/ray/blob/ray-1.10.0/python/ray/_private/log_monitor.py#L299-L300 -351c354,355 +> # We change the newline handling back to https://github.com/ray-project/ray/blob/ray-1.10.0/python/ray/_private/log_monitor.py#L299-L300 +> +354c358,359 < next_line = next_line.rstrip("\r\n") --- > if next_line[-1] == "\n": diff --git a/sky/skylet/ray_patches/resource_demand_scheduler.py.patch b/sky/skylet/ray_patches/resource_demand_scheduler.py.patch index 41a1719d60d..08c609f0449 100644 --- a/sky/skylet/ray_patches/resource_demand_scheduler.py.patch +++ b/sky/skylet/ray_patches/resource_demand_scheduler.py.patch @@ -1,17 +1,17 @@ 0a1,5 -> # From https://github.com/ray-project/ray/blob/ray-2.0.1/python/ray/autoscaler/_private/resource_demand_scheduler.py +> # From https://github.com/ray-project/ray/blob/ray-2.4.0/python/ray/autoscaler/_private/resource_demand_scheduler.py > # Sky patch changes: > # - no new nodes are allowed to be launched launched when the upscaling_speed is 0 > # - comment out "assert not unfulfilled": this seems a buggy assert > -509c514,517 +450c455,458 < if upper_bound > 0: --- > # NOTE(sky): do not autoscale when upsclaing speed is 0. > if self.upscaling_speed == 0: > upper_bound = 0 > if upper_bound >= 0: -646c654 +594c602 < assert not unfulfilled --- > # assert not unfulfilled # NOTE(sky): buggy assert. diff --git a/sky/skylet/ray_patches/updater.py.patch b/sky/skylet/ray_patches/updater.py.patch index 77cfce04f7a..c4c865002c8 100644 --- a/sky/skylet/ray_patches/updater.py.patch +++ b/sky/skylet/ray_patches/updater.py.patch @@ -1,7 +1,7 @@ 0a1,4 -> # From https://github.com/ray-project/ray/blob/releases/2.0.1/python/ray/autoscaler/_private/updater.py +> # From https://github.com/ray-project/ray/blob/releases/2.4.0/python/ray/autoscaler/_private/updater.py > # Sky patch changes: > # - Ensure the node state is refreshed before checking the node is terminated. > -318a319 +318a323 > self.provider.non_terminated_nodes({}) diff --git a/sky/skylet/ray_patches/worker.py.patch b/sky/skylet/ray_patches/worker.py.patch index 256f09f9282..e75673c37c4 100644 --- a/sky/skylet/ray_patches/worker.py.patch +++ b/sky/skylet/ray_patches/worker.py.patch @@ -1,17 +1,18 @@ 0a1,4 -> # Adapted from https://github.com/ray-project/ray/blob/ray-2.0.1/python/ray/worker.py +> # Adapted from https://github.com/ray-project/ray/blob/ray-2.4.0/python/ray/_private/worker.py > # Fixed the problem in ray's issue https://github.com/ray-project/ray/issues/9233 > # Tracked in PR https://github.com/ray-project/ray/pull/21977/files. > -1748a1753,1759 -> +1872a1877,1884 +> > def end_for(line: str) -> str: > if sys.platform == "win32": > return "\n" > if line.endswith("\r"): > return "" > return "\n" -1768a1780 -> end=end_for(line), -1782a1795 -> end=end_for(line), +> +1896a1909 +> end=end_for(line), +1914a1928 +> end=end_for(line), diff --git a/sky/templates/aws-ray.yml.j2 b/sky/templates/aws-ray.yml.j2 index 8d82a03ccc4..bb8c709f4d5 100644 --- a/sky/templates/aws-ray.yml.j2 +++ b/sky/templates/aws-ray.yml.j2 @@ -24,7 +24,7 @@ provider: use_internal_ips: {{use_internal_ips}} # Disable launch config check for worker nodes as it can cause resource # leakage. - # Reference: https://github.com/ray-project/ray/blob/840215bc09e942b50cad0ab2db96a8fdc79217c1/python/ray/autoscaler/_private/autoscaler.py#L1101 + # Reference: https://github.com/ray-project/ray/blob/cd1ba65e239360c8a7b130f991ed414eccc063ce/python/ray/autoscaler/_private/autoscaler.py#L1115 # The upper-level SkyPilot code has make sure there will not be resource # leakage. disable_launch_config_check: true @@ -177,8 +177,8 @@ setup_commands: (type -a pip | grep -q pip3) || echo 'alias pip=pip3' >> ~/.bashrc; (which conda > /dev/null 2>&1 && conda init > /dev/null) || (wget -nc https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh && bash Miniconda3-latest-Linux-x86_64.sh -b && eval "$(~/miniconda3/bin/conda shell.bash hook)" && conda init && conda config --set auto_activate_base true); source ~/.bashrc; - (pip3 list | grep ray | grep {{ray_version}} 2>&1 > /dev/null || pip3 install -U ray[default]=={{ray_version}}) && mkdir -p ~/sky_workdir && mkdir -p ~/.sky/sky_app; - (pip3 list | grep skypilot && [ "$(cat {{sky_remote_path}}/current_sky_wheel_hash)" == "{{sky_wheel_hash}}" ]) || (pip3 uninstall skypilot -y; pip3 install "$(echo {{sky_remote_path}}/{{sky_wheel_hash}}/skypilot-{{sky_version}}*.whl)[aws]" && echo "{{sky_wheel_hash}}" > {{sky_remote_path}}/current_sky_wheel_hash || exit 1); + (pip3 list | grep "ray " | grep {{ray_version}} 2>&1 > /dev/null || pip3 install --exists-action w -U ray[default]=={{ray_version}}) && mkdir -p ~/sky_workdir && mkdir -p ~/.sky/sky_app; + (pip3 list | grep "skypilot " && [ "$(cat {{sky_remote_path}}/current_sky_wheel_hash)" == "{{sky_wheel_hash}}" ]) || (pip3 uninstall skypilot -y; pip3 install "$(echo {{sky_remote_path}}/{{sky_wheel_hash}}/skypilot-{{sky_version}}*.whl)[aws]" && echo "{{sky_wheel_hash}}" > {{sky_remote_path}}/current_sky_wheel_hash || exit 1); sudo bash -c 'rm -rf /etc/security/limits.d; echo "* soft nofile 1048576" >> /etc/security/limits.conf; echo "* hard nofile 1048576" >> /etc/security/limits.conf'; sudo grep -e '^DefaultTasksMax' /etc/systemd/system.conf || (sudo bash -c 'echo "DefaultTasksMax=infinity" >> /etc/systemd/system.conf'); sudo systemctl set-property user-$(id -u $(whoami)).slice TasksMax=infinity; sudo systemctl daemon-reload; mkdir -p ~/.ssh; (grep -Pzo -q "Host \*\n StrictHostKeyChecking no" ~/.ssh/config) || printf "Host *\n StrictHostKeyChecking no\n" >> ~/.ssh/config; diff --git a/sky/templates/azure-ray.yml.j2 b/sky/templates/azure-ray.yml.j2 index bb108c64604..a336ca19fbb 100644 --- a/sky/templates/azure-ray.yml.j2 +++ b/sky/templates/azure-ray.yml.j2 @@ -21,7 +21,7 @@ provider: subscription_id: {{azure_subscription_id}} # Disable launch config check for worker nodes as it can cause resource # leakage. - # Reference: https://github.com/ray-project/ray/blob/840215bc09e942b50cad0ab2db96a8fdc79217c1/python/ray/autoscaler/_private/autoscaler.py#L1101 + # Reference: https://github.com/ray-project/ray/blob/cd1ba65e239360c8a7b130f991ed414eccc063ce/python/ray/autoscaler/_private/autoscaler.py#L1115 # The upper-level SkyPilot code has make sure there will not be resource # leakage. disable_launch_config_check: true @@ -126,8 +126,8 @@ setup_commands: (type -a pip | grep -q pip3) || echo 'alias pip=pip3' >> ~/.bashrc; which conda > /dev/null 2>&1 || (wget -nc https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh && bash Miniconda3-latest-Linux-x86_64.sh -b && eval "$(/home/azureuser/miniconda3/bin/conda shell.bash hook)" && conda init && conda config --set auto_activate_base true); source ~/.bashrc; - (pip3 list | grep ray | grep {{ray_version}} 2>&1 > /dev/null || pip3 install -U ray[default]=={{ray_version}}) && mkdir -p ~/sky_workdir && mkdir -p ~/.sky/sky_app && touch ~/.sudo_as_admin_successful; - (pip3 list | grep skypilot && [ "$(cat {{sky_remote_path}}/current_sky_wheel_hash)" == "{{sky_wheel_hash}}" ]) || (pip3 uninstall skypilot -y; pip3 install "$(echo {{sky_remote_path}}/{{sky_wheel_hash}}/skypilot-{{sky_version}}*.whl)[azure]" && echo "{{sky_wheel_hash}}" > {{sky_remote_path}}/current_sky_wheel_hash || exit 1); + (pip3 list | grep "ray " | grep {{ray_version}} 2>&1 > /dev/null || pip3 install --exists-action w -U ray[default]=={{ray_version}}) && mkdir -p ~/sky_workdir && mkdir -p ~/.sky/sky_app && touch ~/.sudo_as_admin_successful; + (pip3 list | grep "skypilot " && [ "$(cat {{sky_remote_path}}/current_sky_wheel_hash)" == "{{sky_wheel_hash}}" ]) || (pip3 uninstall skypilot -y; pip3 install "$(echo {{sky_remote_path}}/{{sky_wheel_hash}}/skypilot-{{sky_version}}*.whl)[azure]" && echo "{{sky_wheel_hash}}" > {{sky_remote_path}}/current_sky_wheel_hash || exit 1); sudo bash -c 'rm -rf /etc/security/limits.d; echo "* soft nofile 1048576" >> /etc/security/limits.conf; echo "* hard nofile 1048576" >> /etc/security/limits.conf'; sudo grep -e '^DefaultTasksMax' /etc/systemd/system.conf || (sudo bash -c 'echo "DefaultTasksMax=infinity" >> /etc/systemd/system.conf'); sudo systemctl set-property user-$(id -u $(whoami)).slice TasksMax=infinity; sudo systemctl daemon-reload; mkdir -p ~/.ssh; (grep -Pzo -q "Host \*\n StrictHostKeyChecking no" ~/.ssh/config) || printf "Host *\n StrictHostKeyChecking no\n" >> ~/.ssh/config; diff --git a/sky/templates/gcp-ray.yml.j2 b/sky/templates/gcp-ray.yml.j2 index dc17ea2e559..97c1b91c29f 100644 --- a/sky/templates/gcp-ray.yml.j2 +++ b/sky/templates/gcp-ray.yml.j2 @@ -22,7 +22,7 @@ provider: {%- endif %} # Disable launch config check for worker nodes as it can cause resource # leakage. - # Reference: https://github.com/ray-project/ray/blob/840215bc09e942b50cad0ab2db96a8fdc79217c1/python/ray/autoscaler/_private/autoscaler.py#L1101 + # Reference: https://github.com/ray-project/ray/blob/cd1ba65e239360c8a7b130f991ed414eccc063ce/python/ray/autoscaler/_private/autoscaler.py#L1115 # The upper-level SkyPilot code has make sure there will not be resource # leakage. disable_launch_config_check: true @@ -170,8 +170,8 @@ setup_commands: test -f /home/gcpuser/miniconda3/etc/profile.d/conda.sh && source /home/gcpuser/miniconda3/etc/profile.d/conda.sh && conda activate base || true; pip3 install --upgrade google-api-python-client; {%- endif %} - (pip3 list | grep ray | grep {{ray_version}} 2>&1 > /dev/null || pip3 install -U ray[default]=={{ray_version}}) && mkdir -p ~/sky_workdir && mkdir -p ~/.sky/sky_app; - (pip3 list | grep skypilot && [ "$(cat {{sky_remote_path}}/current_sky_wheel_hash)" == "{{sky_wheel_hash}}" ]) || (pip3 uninstall skypilot -y; pip3 install "$(echo {{sky_remote_path}}/{{sky_wheel_hash}}/skypilot-{{sky_version}}*.whl)[gcp]" && echo "{{sky_wheel_hash}}" > {{sky_remote_path}}/current_sky_wheel_hash || exit 1); + (pip3 list | grep "ray " | grep {{ray_version}} 2>&1 > /dev/null || pip3 install --exists-action w -U ray[default]=={{ray_version}}) && mkdir -p ~/sky_workdir && mkdir -p ~/.sky/sky_app; + (pip3 list | grep "skypilot " && [ "$(cat {{sky_remote_path}}/current_sky_wheel_hash)" == "{{sky_wheel_hash}}" ]) || (pip3 uninstall skypilot -y; pip3 install "$(echo {{sky_remote_path}}/{{sky_wheel_hash}}/skypilot-{{sky_version}}*.whl)[gcp]" && echo "{{sky_wheel_hash}}" > {{sky_remote_path}}/current_sky_wheel_hash || exit 1); sudo bash -c 'rm -rf /etc/security/limits.d; echo "* soft nofile 1048576" >> /etc/security/limits.conf; echo "* hard nofile 1048576" >> /etc/security/limits.conf'; sudo grep -e '^DefaultTasksMax' /etc/systemd/system.conf || (sudo bash -c 'echo "DefaultTasksMax=infinity" >> /etc/systemd/system.conf'); sudo systemctl set-property user-$(id -u $(whoami)).slice TasksMax=infinity; sudo systemctl daemon-reload; mkdir -p ~/.ssh; (grep -Pzo -q "Host \*\n StrictHostKeyChecking no" ~/.ssh/config) || printf "Host *\n StrictHostKeyChecking no\n" >> ~/.ssh/config; diff --git a/sky/templates/ibm-ray.yml.j2 b/sky/templates/ibm-ray.yml.j2 index 1790161e442..306ab9734b4 100644 --- a/sky/templates/ibm-ray.yml.j2 +++ b/sky/templates/ibm-ray.yml.j2 @@ -18,7 +18,7 @@ provider: resource_group_id: {{resource_group_id}} # Disable launch config check for worker nodes as it can cause resource # leakage. - # Reference: https://github.com/ray-project/ray/blob/840215bc09e942b50cad0ab2db96a8fdc79217c1/python/ray/autoscaler/_private/autoscaler.py#L1101 + # Reference: https://github.com/ray-project/ray/blob/cd1ba65e239360c8a7b130f991ed414eccc063ce/python/ray/autoscaler/_private/autoscaler.py#L1115 # The upper-level SkyPilot code has make sure there will not be resource # leakage. disable_launch_config_check: true diff --git a/sky/templates/lambda-ray.yml.j2 b/sky/templates/lambda-ray.yml.j2 index 6a997b69174..93b3dd048b7 100644 --- a/sky/templates/lambda-ray.yml.j2 +++ b/sky/templates/lambda-ray.yml.j2 @@ -11,7 +11,7 @@ provider: region: {{region}} # Disable launch config check for worker nodes as it can cause resource # leakage. - # Reference: https://github.com/ray-project/ray/blob/840215bc09e942b50cad0ab2db96a8fdc79217c1/python/ray/autoscaler/_private/autoscaler.py#L1101 + # Reference: https://github.com/ray-project/ray/blob/cd1ba65e239360c8a7b130f991ed414eccc063ce/python/ray/autoscaler/_private/autoscaler.py#L1115 # The upper-level SkyPilot code has make sure there will not be resource # leakage. disable_launch_config_check: true @@ -77,8 +77,8 @@ setup_commands: (type -a pip | grep -q pip3) || echo 'alias pip=pip3' >> ~/.bashrc; which conda > /dev/null 2>&1 || (wget -nc https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh && bash Miniconda3-latest-Linux-x86_64.sh -b && eval "$(~/miniconda3/bin/conda shell.bash hook)" && conda init && conda config --set auto_activate_base true); source ~/.bashrc; - (pip3 list | grep ray | grep {{ray_version}} 2>&1 > /dev/null || pip3 install -U ray[default]=={{ray_version}}) && mkdir -p ~/sky_workdir && mkdir -p ~/.sky/sky_app && touch ~/.sudo_as_admin_successful; - (pip3 list | grep skypilot && [ "$(cat {{sky_remote_path}}/current_sky_wheel_hash)" == "{{sky_wheel_hash}}" ]) || (pip3 uninstall skypilot -y; pip3 install "$(echo {{sky_remote_path}}/{{sky_wheel_hash}}/skypilot-{{sky_version}}*.whl)[lambda]" && echo "{{sky_wheel_hash}}" > {{sky_remote_path}}/current_sky_wheel_hash || exit 1); + (pip3 list | grep "ray " | grep {{ray_version}} 2>&1 > /dev/null || pip3 install --exists-action w -U ray[default]=={{ray_version}}) && mkdir -p ~/sky_workdir && mkdir -p ~/.sky/sky_app && touch ~/.sudo_as_admin_successful; + (pip3 list | grep "skypilot " && [ "$(cat {{sky_remote_path}}/current_sky_wheel_hash)" == "{{sky_wheel_hash}}" ]) || (pip3 uninstall skypilot -y; pip3 install "$(echo {{sky_remote_path}}/{{sky_wheel_hash}}/skypilot-{{sky_version}}*.whl)[lambda]" && echo "{{sky_wheel_hash}}" > {{sky_remote_path}}/current_sky_wheel_hash || exit 1); sudo bash -c 'rm -rf /etc/security/limits.d; echo "* soft nofile 1048576" >> /etc/security/limits.conf; echo "* hard nofile 1048576" >> /etc/security/limits.conf'; sudo grep -e '^DefaultTasksMax' /etc/systemd/system.conf || (sudo bash -c 'echo "DefaultTasksMax=infinity" >> /etc/systemd/system.conf'); sudo systemctl set-property user-$(id -u $(whoami)).slice TasksMax=infinity; sudo systemctl daemon-reload; mkdir -p ~/.ssh; (grep -Pzo -q "Host \*\n StrictHostKeyChecking no" ~/.ssh/config) || printf "Host *\n StrictHostKeyChecking no\n" >> ~/.ssh/config; diff --git a/tests/test_smoke.py b/tests/test_smoke.py index 5f294767107..af4714df0fb 100644 --- a/tests/test_smoke.py +++ b/tests/test_smoke.py @@ -778,12 +778,12 @@ def test_job_queue(generic_cloud: str): f'sky exec {name} -n {name}-1 -d examples/job_queue/job.yaml', f'sky exec {name} -n {name}-2 -d examples/job_queue/job.yaml', f'sky exec {name} -n {name}-3 -d examples/job_queue/job.yaml', - f'sky queue {name} | grep {name}-1 | grep RUNNING', - f'sky queue {name} | grep {name}-2 | grep RUNNING', - f'sky queue {name} | grep {name}-3 | grep PENDING', + f's=$(sky queue {name}); echo "$s"; echo; echo; echo "$s" | grep {name}-1 | grep RUNNING', + f's=$(sky queue {name}); echo "$s"; echo; echo; echo "$s" | grep {name}-2 | grep RUNNING', + f's=$(sky queue {name}); echo "$s"; echo; echo; echo "$s" | grep {name}-3 | grep PENDING', f'sky cancel -y {name} 2', 'sleep 5', - f'sky queue {name} | grep {name}-3 | grep RUNNING', + f's=$(sky queue {name}); echo "$s"; echo; echo; echo "$s" | grep {name}-3 | grep RUNNING', f'sky cancel -y {name} 3', f'sky exec {name} --gpus K80:0.2 "[[ \$SKYPILOT_NUM_GPUS_PER_NODE -eq 1 ]] || exit 1"', f'sky exec {name} --gpus K80:1 "[[ \$SKYPILOT_NUM_GPUS_PER_NODE -eq 1 ]] || exit 1"', @@ -859,7 +859,7 @@ def test_job_queue_multinode(generic_cloud: str): f's=$(sky queue {name}) && echo "$s" && (echo "$s" | grep {name}-3 | grep PENDING)', f'sky cancel -y {name} 1', 'sleep 5', - f'sky queue {name} | grep {name}-3 | grep RUNNING', + f's=$(sky queue {name}); echo "$s"; echo; echo; echo "$s" | grep {name}-3 | grep RUNNING', f'sky cancel -y {name} 1 2 3', f'sky launch -c {name} -n {name}-4 --detach-setup -d examples/job_queue/job_multinode.yaml', # Test the job status is correctly set to SETTING_UP, during the setup is running, @@ -891,7 +891,7 @@ def test_large_job_queue(generic_cloud: str): 'sleep 20', # Each job takes 0.5 CPU and the default VM has 8 CPUs, so there should be 8 / 0.5 = 16 jobs running. # The first 16 jobs are canceled, so there should be 75 - 32 = 43 jobs PENDING. - f'sky queue {name} | grep -v grep | grep PENDING | wc -l | grep 43', + f's=$(sky queue {name}); echo "$s"; echo; echo; echo "$s" | grep -v grep | grep PENDING | wc -l | grep 43', ], f'sky down -y {name}', timeout=20 * 60, @@ -946,7 +946,7 @@ def test_multi_echo(generic_cloud: str): 'multi_echo', [ f'python examples/multi_echo.py {name} {generic_cloud}', - 'sleep 70', + 'sleep 90', ] + # Ensure jobs succeeded. [f'sky logs {name} {i + 1} --status' for i in range(32)] + @@ -1289,6 +1289,7 @@ def test_cancel_pytorch(generic_cloud: str): f'sky logs {name} 3 --status', # Ensure the job succeeded. ], f'sky down -y {name}', + timeout=20 * 60, ) run_one_test(test) @@ -1351,7 +1352,7 @@ def test_spot(generic_cloud: str): _SPOT_CANCEL_WAIT.format(job_name=f'{name}-1'), 'sleep 5', f'{_SPOT_QUEUE_WAIT}| grep {name}-1 | head -n1 | grep "CANCELLING\|CANCELLED"', - 'sleep 120', + 'sleep 200', f'{_SPOT_QUEUE_WAIT}| grep {name}-1 | head -n1 | grep CANCELLED', f'{_SPOT_QUEUE_WAIT}| grep {name}-2 | head -n1 | grep "RUNNING\|SUCCEEDED"', ],