diff --git a/python/ray/autoscaler/_private/aws/cloudwatch/cloudwatch_helper.py b/python/ray/autoscaler/_private/aws/cloudwatch/cloudwatch_helper.py index 6c8b80282086..939d11620eaa 100644 --- a/python/ray/autoscaler/_private/aws/cloudwatch/cloudwatch_helper.py +++ b/python/ray/autoscaler/_private/aws/cloudwatch/cloudwatch_helper.py @@ -4,7 +4,8 @@ import logging import os import time -from typing import Any, Dict, List, Tuple, Union +from enum import Enum +from typing import Any, Callable, Dict, List, Union import botocore @@ -21,6 +22,12 @@ CLOUDWATCH_CONFIG_HASH_TAG_BASE = "cloudwatch-config-hash" +class CloudwatchConfigType(str, Enum): + AGENT = "agent" + DASHBOARD = "dashboard" + ALARM = "alarm" + + class CloudwatchHelper: def __init__( self, provider_config: Dict[str, Any], node_id: str, cluster_name: str @@ -34,6 +41,22 @@ def __init__( self.ssm_client = client_cache("ssm", region) cloudwatch_resource = resource_cache("cloudwatch", region) self.cloudwatch_client = cloudwatch_resource.meta.client + self.CLOUDWATCH_CONFIG_TYPE_TO_CONFIG_VARIABLE_REPLACE_FUNC: Dict[ + str, Callable + ] = { + CloudwatchConfigType.AGENT.value: self._replace_cwa_config_vars, + CloudwatchConfigType.DASHBOARD.value: self._replace_dashboard_config_vars, + CloudwatchConfigType.ALARM.value: self._load_config_file, + } + self.CLOUDWATCH_CONFIG_TYPE_TO_UPDATE_FUNC_HEAD_NODE: Dict[str, Callable] = { + CloudwatchConfigType.AGENT.value: self._restart_cloudwatch_agent, + CloudwatchConfigType.DASHBOARD.value: self._put_cloudwatch_dashboard, + CloudwatchConfigType.ALARM.value: self._put_cloudwatch_alarm, + } + self.CLOUDWATCH_CONFIG_TYPE_TO_UPDATE_FUNC_WORKER_NODE: Dict[str, Callable] = { + CloudwatchConfigType.AGENT.value: self._restart_cloudwatch_agent, + CloudwatchConfigType.ALARM.value: self._put_cloudwatch_alarm, + } def update_from_config(self, is_head_node: bool) -> None: """Discovers and applies CloudWatch config updates as required. @@ -41,12 +64,11 @@ def update_from_config(self, is_head_node: bool) -> None: Args: is_head_node: whether this node is the head node. """ - if CloudwatchHelper.cloudwatch_config_exists(self.provider_config, "agent"): - self._update_cloudwatch_config(is_head_node, "agent") - if CloudwatchHelper.cloudwatch_config_exists(self.provider_config, "dashboard"): - self._update_cloudwatch_config(is_head_node, "dashboard") - if CloudwatchHelper.cloudwatch_config_exists(self.provider_config, "alarm"): - self._update_cloudwatch_config(is_head_node, "alarm") + for config_type in CloudwatchConfigType: + if CloudwatchHelper.cloudwatch_config_exists( + self.provider_config, config_type.value + ): + self._update_cloudwatch_config(config_type.value, is_head_node) def _ec2_health_check_waiter(self, node_id: str) -> None: # wait for all EC2 instance checks to complete @@ -66,14 +88,10 @@ def _ec2_health_check_waiter(self, node_id: str) -> None: ) raise e - def _update_cloudwatch_config(self, is_head_node: bool, config_type: str) -> None: - """Update remote CloudWatch configs at Parameter Store, - update hash tag value on node and perform associated operations - at CloudWatch console if local CloudWatch configs change. - - Args: - is_head_node: whether this node is the head node. - config_type: CloudWatch config file type. + def _update_cloudwatch_config(self, config_type: str, is_head_node: bool) -> None: + """ + check whether update operations are needed in + cloudwatch related configs """ cwa_installed = self._setup_cwa() param_name = self._get_ssm_param_name(config_type) @@ -84,19 +102,16 @@ def _update_cloudwatch_config(self, is_head_node: bool, config_type: str) -> Non ) cur_cw_config_hash = self._sha1_hash_file(config_type) ssm_cw_config_hash = self._sha1_hash_json(cw_config_ssm) - # check if user updated Unified Cloudwatch Agent config file. + # check if user updated cloudwatch related config files. # if so, perform corresponding actions. if cur_cw_config_hash != ssm_cw_config_hash: logger.info( "Cloudwatch {} config file has changed.".format(config_type) ) self._upload_config_to_ssm_and_set_hash_tag(config_type) - if config_type == "agent": - self._restart_cloudwatch_agent() - elif config_type == "dashboard": - self._put_cloudwatch_dashboard() - elif config_type == "alarm": - self._put_cloudwatch_alarm() + self.CLOUDWATCH_CONFIG_TYPE_TO_UPDATE_FUNC_HEAD_NODE.get( + config_type + )() else: head_node_hash = self._get_head_node_config_hash(config_type) cur_node_hash = self._get_cur_node_config_hash(config_type) @@ -104,10 +119,13 @@ def _update_cloudwatch_config(self, is_head_node: bool, config_type: str) -> Non logger.info( "Cloudwatch {} config file has changed.".format(config_type) ) - if config_type == "agent": - self._restart_cloudwatch_agent() - if config_type == "alarm": - self._put_cloudwatch_alarm() + update_func = ( + self.CLOUDWATCH_CONFIG_TYPE_TO_UPDATE_FUNC_WORKER_NODE.get( + config_type + ) + ) + if update_func: + update_func() self._update_cloudwatch_hash_tag_value( self.node_id, head_node_hash, config_type ) @@ -120,7 +138,9 @@ def _put_cloudwatch_dashboard(self) -> Dict[str, Any]: dashboard_name_cluster = dashboard_config.get("name", self.cluster_name) dashboard_name = self.cluster_name + "-" + dashboard_name_cluster - widgets = self._replace_dashboard_config_variables() + widgets = self._replace_dashboard_config_vars( + CloudwatchConfigType.DASHBOARD.value + ) response = self.cloudwatch_client.put_dashboard( DashboardName=dashboard_name, DashboardBody=json.dumps({"widgets": widgets}) @@ -144,7 +164,7 @@ def _put_cloudwatch_dashboard(self) -> Dict[str, Any]: def _put_cloudwatch_alarm(self) -> None: """put CloudWatch metric alarms read from config""" - param_name = self._get_ssm_param_name("alarm") + param_name = self._get_ssm_param_name(CloudwatchConfigType.ALARM.value) data = json.loads(self._get_ssm_param(param_name)) for item in data: item_out = copy.deepcopy(item) @@ -158,7 +178,7 @@ def _put_cloudwatch_alarm(self) -> None: logger.info("Successfully put alarms to CloudWatch console") def _send_command_to_node( - self, document_name: str, parameters: List[str], node_id: str + self, document_name: str, parameters: Dict[str, List[str]], node_id: str ) -> Dict[str, Any]: """send SSM command to the given nodes""" logger.debug( @@ -177,10 +197,10 @@ def _send_command_to_node( def _ssm_command_waiter( self, document_name: str, - parameters: List[str], + parameters: Dict[str, List[str]], node_id: str, retry_failed: bool = True, - ) -> bool: + ) -> Dict[str, Any]: """wait for SSM command to complete on all cluster nodes""" # This waiter differs from the built-in SSM.Waiter by @@ -192,7 +212,9 @@ def _ssm_command_waiter( command_id = response["Command"]["CommandId"] cloudwatch_config = self.provider_config["cloudwatch"] - agent_retryer_config = cloudwatch_config.get("agent").get("retryer", {}) + agent_retryer_config = cloudwatch_config.get( + CloudwatchConfigType.AGENT.value + ).get("retryer", {}) max_attempts = agent_retryer_config.get("max_attempts", 120) delay_seconds = agent_retryer_config.get("delay_seconds", 30) num_attempts = 0 @@ -283,19 +305,19 @@ def _replace_config_variables( def _replace_all_config_variables( self, - collection: Union[dict, list], + collection: Union[Dict[str, Any], str], node_id: str, cluster_name: str, region: str, - ) -> Tuple[(Union[dict, list], int)]: + ) -> Union[str, Dict[str, Any]]: """ Replace known config variable occurrences in the input collection. - The input collection must be either a dict or list. Returns a tuple consisting of the output collection and the number of modified strings in the collection (which is not necessarily equal to the number of variables replaced). """ + for key in collection: if type(collection) is dict: value = collection.get(key) @@ -303,6 +325,12 @@ def _replace_all_config_variables( elif type(collection) is list: value = key index_key = collection.index(key) + else: + raise ValueError( + f"Can't replace CloudWatch config variables " + f"in unsupported collection type: {type(collection)}." + f"Please check your CloudWatch JSON config files." + ) if type(value) is str: collection[index_key] = self._replace_config_variables( value, node_id, cluster_name, region @@ -344,8 +372,8 @@ def _set_cloudwatch_ssm_config_param( return self._get_default_empty_config_file_hash() else: logger.info( - "Failed to fetch CloudWatch {} config from SSM " - "parameter store.".format(config_type) + "Failed to fetch Unified CloudWatch Agent config from SSM " + "parameter store." ) logger.error(e) raise e @@ -368,31 +396,25 @@ def _get_ssm_param(self, parameter_name: str) -> str: def _sha1_hash_json(self, value: str) -> str: """calculate the json string sha1 hash""" - hash = hashlib.new("sha1") + sha1_hash = hashlib.new("sha1") binary_value = value.encode("ascii") - hash.update(binary_value) - sha1_res = hash.hexdigest() + sha1_hash.update(binary_value) + sha1_res = sha1_hash.hexdigest() return sha1_res def _sha1_hash_file(self, config_type: str) -> str: """calculate the config file sha1 hash""" - if config_type == "agent": - config = self._replace_cwa_config_variables() - if config_type == "dashboard": - config = self._replace_dashboard_config_variables() - if config_type == "alarm": - config = self._load_config_file("alarm") + config = self.CLOUDWATCH_CONFIG_TYPE_TO_CONFIG_VARIABLE_REPLACE_FUNC.get( + config_type + )(config_type) value = json.dumps(config) sha1_res = self._sha1_hash_json(value) return sha1_res def _upload_config_to_ssm_and_set_hash_tag(self, config_type: str): - if config_type == "agent": - data = self._replace_cwa_config_variables() - if config_type == "dashboard": - data = self._replace_dashboard_config_variables() - if config_type == "alarm": - data = self._load_config_file("alarm") + data = self.CLOUDWATCH_CONFIG_TYPE_TO_CONFIG_VARIABLE_REPLACE_FUNC.get( + config_type + )(config_type) sha1_hash_value = self._sha1_hash_file(config_type) self._upload_config_to_ssm(data, config_type) self._update_cloudwatch_hash_tag_value( @@ -405,7 +427,7 @@ def _add_cwa_installed_tag(self, node_id: str) -> None: Tags=[{"Key": CLOUDWATCH_AGENT_INSTALLED_TAG, "Value": "True"}], ) logger.info( - "Successfully add Unified Cloudwatch Agent installed " + "Successfully add Unified CloudWatch Agent installed " "tag on {}".format(node_id) ) @@ -444,12 +466,12 @@ def _upload_config_to_ssm(self, param: Dict[str, Any], config_type: str): param_name = self._get_ssm_param_name(config_type) self._put_ssm_param(param, param_name) - def _replace_cwa_config_variables(self) -> Dict[str, Any]: + def _replace_cwa_config_vars(self, config_type: str) -> Dict[str, Any]: """ replace {instance_id}, {region}, {cluster_name} variable occurrences in Unified Cloudwatch Agent config file """ - cwa_config = self._load_config_file("agent") + cwa_config = self._load_config_file(config_type) self._replace_all_config_variables( cwa_config, self.node_id, @@ -458,11 +480,11 @@ def _replace_cwa_config_variables(self) -> Dict[str, Any]: ) return cwa_config - def _replace_dashboard_config_variables(self) -> List[Dict[str, Any]]: + def _replace_dashboard_config_vars(self, config_type: str) -> List[str]: """ replace known variable occurrences in CloudWatch Dashboard config file """ - data = self._load_config_file("dashboard") + data = self._load_config_file(config_type) widgets = [] for item in data: item_out = self._replace_all_config_variables( @@ -471,16 +493,15 @@ def _replace_dashboard_config_variables(self) -> List[Dict[str, Any]]: self.cluster_name, self.provider_config["region"], ) - item_out = copy.deepcopy(item) widgets.append(item_out) return widgets - def _replace_alarm_config_variables(self) -> List[Dict[str, Any]]: + def _replace_alarm_config_vars(self, config_type: str) -> List[str]: """ replace {instance_id}, {region}, {cluster_name} variable occurrences in cloudwatch alarm config file """ - data = self._load_config_file("alarm") + data = self._load_config_file(config_type) param_data = [] for item in data: item_out = copy.deepcopy(item) @@ -494,11 +515,11 @@ def _replace_alarm_config_variables(self) -> List[Dict[str, Any]]: return param_data def _restart_cloudwatch_agent(self) -> None: - """restart Unified Cloudwatch Agent""" - cwa_param_name = self._get_ssm_param_name("agent") + """restart Unified CloudWatch Agent""" + cwa_param_name = self._get_ssm_param_name(CloudwatchConfigType.AGENT.value) logger.info( - "Restarting Unified Cloudwatch Agent package on {} node(s).".format( - (self.node_id) + "Restarting Unified CloudWatch Agent package on node {}.".format( + self.node_id ) ) self._stop_cloudwatch_agent() @@ -691,7 +712,9 @@ def resolve_instance_profile_name( default ray instance profile name if cloudwatch config file doesn't exist. """ - cwa_cfg_exists = CloudwatchHelper.cloudwatch_config_exists(config, "agent") + cwa_cfg_exists = CloudwatchHelper.cloudwatch_config_exists( + config, CloudwatchConfigType.AGENT.value + ) return ( CLOUDWATCH_RAY_INSTANCE_PROFILE if cwa_cfg_exists @@ -712,7 +735,9 @@ def resolve_iam_role_name( default cloudwatch iam role name if cloudwatch config file exists. default ray iam role name if cloudwatch config file doesn't exist. """ - cwa_cfg_exists = CloudwatchHelper.cloudwatch_config_exists(config, "agent") + cwa_cfg_exists = CloudwatchHelper.cloudwatch_config_exists( + config, CloudwatchConfigType.AGENT.value + ) return CLOUDWATCH_RAY_IAM_ROLE if cwa_cfg_exists else default_iam_role_name @staticmethod @@ -731,7 +756,9 @@ def resolve_policy_arns( related operations if cloudwatch agent config is specifed in cluster config file. """ - cwa_cfg_exists = CloudwatchHelper.cloudwatch_config_exists(config, "agent") + cwa_cfg_exists = CloudwatchHelper.cloudwatch_config_exists( + config, CloudwatchConfigType.AGENT.value + ) if cwa_cfg_exists: cloudwatch_managed_policy = { "Version": "2012-10-17", diff --git a/python/ray/tests/aws/conftest.py b/python/ray/tests/aws/conftest.py index e2c9c946e77d..ed3a6a4b71ad 100644 --- a/python/ray/tests/aws/conftest.py +++ b/python/ray/tests/aws/conftest.py @@ -1,7 +1,7 @@ import pytest from ray.autoscaler._private.constants import BOTO_MAX_RETRIES -from ray.autoscaler._private.aws.utils import resource_cache +from ray.autoscaler._private.aws.utils import resource_cache, client_cache from botocore.stub import Stubber @@ -38,3 +38,19 @@ def ec2_client_stub_max_retries(): with Stubber(resource.meta.client) as stubber: yield stubber stubber.assert_no_pending_responses() + + +@pytest.fixture() +def cloudwatch_client_stub(): + resource = resource_cache("cloudwatch", "us-west-2") + with Stubber(resource.meta.client) as stubber: + yield stubber + stubber.assert_no_pending_responses() + + +@pytest.fixture() +def ssm_client_stub(): + client = client_cache("ssm", "us-west-2") + with Stubber(client) as stubber: + yield stubber + stubber.assert_no_pending_responses() diff --git a/python/ray/tests/aws/test_autoscaler_aws.py b/python/ray/tests/aws/test_autoscaler_aws.py index ff30367d9476..a57916c5f307 100644 --- a/python/ray/tests/aws/test_autoscaler_aws.py +++ b/python/ray/tests/aws/test_autoscaler_aws.py @@ -794,6 +794,366 @@ def test_use_subnets_ordered_by_az(ec2_client_stub): assert set(offsets[10:15]) == {0}, "Last 5 should be in us-west-2a" +def test_cloudwatch_dashboard_creation(cloudwatch_client_stub, ssm_client_stub): + # create test cluster node IDs and an associated cloudwatch helper + node_id = "i-abc" + cloudwatch_helper = helpers.get_cloudwatch_helper(node_id) + + # given a directive to create a cluster CloudWatch Dashboard... + # expect to make a call to create a dashboard for each node in the cluster + stubs.put_cluster_dashboard_success( + cloudwatch_client_stub, + cloudwatch_helper, + ) + + # given our mocks and the example CloudWatch Dashboard config as input... + # expect a cluster CloudWatch Dashboard to be created successfully + cloudwatch_helper._put_cloudwatch_dashboard() + # expect no pending responses left in the CloudWatch client stub queue + cloudwatch_client_stub.assert_no_pending_responses() + + +def test_cloudwatch_alarm_creation(cloudwatch_client_stub, ssm_client_stub): + # create test cluster node IDs and an associated cloudwatch helper + node_id = "i-abc" + cloudwatch_helper = helpers.get_cloudwatch_helper(node_id) + + # given a directive to update a cluster CloudWatch Alarm Config without any + # change... + # expect the stored the CloudWatch Alarm Config is same as local config + cw_ssm_param_name = helpers.get_ssm_param_name( + cloudwatch_helper.cluster_name, "alarm" + ) + stubs.get_param_ssm_same( + ssm_client_stub, cw_ssm_param_name, cloudwatch_helper, "alarm" + ) + + # given a directive to create cluster CloudWatch alarms... + # expect to make a call to create alarms for each node in the cluster + stubs.put_cluster_alarms_success(cloudwatch_client_stub, cloudwatch_helper) + + # given our mocks and the example CloudWatch Alarm config as input... + # expect cluster alarms to be created successfully + cloudwatch_helper._put_cloudwatch_alarm() + + # expect no pending responses left in the CloudWatch client stub queue + cloudwatch_client_stub.assert_no_pending_responses() + + +def test_cloudwatch_agent_update_without_change_head_node( + ssm_client_stub, ec2_client_stub +): + # create test cluster head node ID and an associated cloudwatch helper + node_id = "i-abc" + is_head_node = True + cloudwatch_helper = helpers.get_cloudwatch_helper(node_id) + + # given a directive to check for the Unified CloudWatch Agent status... + # expect CloudWatch Agent is installed + stubs.get_ec2_cwa_installed_tag_true(ec2_client_stub, node_id) + + # given a directive to update a cluster CloudWatch Agent Config without any + # change... + # expect the stored the CloudWatch Agent Config is same as local config + cw_ssm_param_name = helpers.get_ssm_param_name( + cloudwatch_helper.cluster_name, "agent" + ) + stubs.get_param_ssm_same( + ssm_client_stub, cw_ssm_param_name, cloudwatch_helper, "agent" + ) + + # given our mocks and the same cloudwatch agent config as input... + # expect no update performed on CloudWatch Agent Config + cloudwatch_helper._update_cloudwatch_config("agent", is_head_node) + + +def test_cloudwatch_agent_update_with_change_head_node( + ec2_client_stub, ssm_client_stub +): + # create test cluster head node ID and an associated cloudwatch helper + node_id = "i-abc" + is_head_node = True + cloudwatch_helper = helpers.get_cloudwatch_helper(node_id) + + # given a directive to check for the Unified CloudWatch Agent status... + # expect CloudWatch Agent is installed + stubs.get_ec2_cwa_installed_tag_true(ec2_client_stub, node_id) + # given a directive to update a cluster CloudWatch Agent Config with new + # changes... + # expect the stored the CloudWatch Agent Config is different from local + # config + cw_ssm_param_name = helpers.get_ssm_param_name( + cloudwatch_helper.cluster_name, "agent" + ) + stubs.get_param_ssm_different(ssm_client_stub, cw_ssm_param_name) + + # given an updated CloudWatch Agent Config file... + # expect to store the new CloudWatch Agent config as an SSM parameter + cmd_id = stubs.put_parameter_cloudwatch_config( + ssm_client_stub, cloudwatch_helper.cluster_name, "agent" + ) + + # given an updated CloudWatch Agent Config file... + # expect to update the node tag equal to updated config file sha1 hash + # to reflect the changes in config file + stubs.update_hash_tag_success(ec2_client_stub, node_id, "agent", cloudwatch_helper) + # given that updated CloudWatch Agent Config is put to Parameter Store... + # expect to send an SSM command to restart CloudWatch Agent on all nodes + cmd_id = stubs.send_command_stop_cwa(ssm_client_stub, node_id) + # given a SSM command to stop CloudWatch Agent sent to all nodes... + # expect to wait for the command to complete successfully on every node + stubs.list_command_invocations_success(ssm_client_stub, node_id, cmd_id) + cmd_id = stubs.send_command_start_cwa(ssm_client_stub, node_id, cw_ssm_param_name) + # given a SSM command to start CloudWatch Agent sent to all nodes... + # expect to wait for the command to complete successfully on every node + stubs.list_command_invocations_success(ssm_client_stub, node_id, cmd_id) + + # given our mocks and the example CloudWatch Agent config as input... + # expect CloudWatch Agent configured to use updated file on each cluster + # node successfully + cloudwatch_helper._update_cloudwatch_config("agent", is_head_node) + + # expect no pending responses left in client stub queues + ec2_client_stub.assert_no_pending_responses() + ssm_client_stub.assert_no_pending_responses() + + +def test_cloudwatch_agent_update_with_change_worker_node( + ec2_client_stub, ssm_client_stub +): + # create test cluster worker node ID and an associated cloudwatch helper + node_id = "i-abc" + is_head_node = False + cloudwatch_helper = helpers.get_cloudwatch_helper(node_id) + + # given a directive to check for the Unified CloudWatch Agent status... + # expect CloudWatch Agent is installed + stubs.get_ec2_cwa_installed_tag_true(ec2_client_stub, node_id) + + # given a directive to update a cluster CloudWatch Agent Config with new + # changes... + # expect the stored the CloudWatch Agent Config is different from local + # config + stubs.get_head_node_config_hash_different( + ec2_client_stub, "agent", cloudwatch_helper, node_id + ) + stubs.get_cur_node_config_hash_different(ec2_client_stub, "agent", node_id) + + # given an updated CloudWatch Agent Config file... + # expect to update the node tag equal to updated config file sha1 hash + # to reflect the changes in config file + stubs.update_hash_tag_success(ec2_client_stub, node_id, "agent", cloudwatch_helper) + # given that updated CloudWatch Agent Config is put to Parameter Store... + # expect to send an SSM command to restart CloudWatch Agent on all nodes + cmd_id = stubs.send_command_stop_cwa(ssm_client_stub, node_id) + # given a SSM command to stop CloudWatch Agent sent to all nodes... + # expect to wait for the command to complete successfully on every node + stubs.list_command_invocations_success(ssm_client_stub, node_id, cmd_id) + cw_ssm_param_name = helpers.get_ssm_param_name( + cloudwatch_helper.cluster_name, "agent" + ) + cmd_id = stubs.send_command_start_cwa(ssm_client_stub, node_id, cw_ssm_param_name) + # given a SSM command to start CloudWatch Agent sent to all nodes... + # expect to wait for the command to complete successfully on every node + stubs.list_command_invocations_success(ssm_client_stub, node_id, cmd_id) + + # given our mocks and the example CloudWatch Agent config as input... + # expect CloudWatch Agent configured to use updated file on each cluster + # node successfully + cloudwatch_helper._update_cloudwatch_config("agent", is_head_node) + + # expect no pending responses left in client stub queues + ec2_client_stub.assert_no_pending_responses() + ssm_client_stub.assert_no_pending_responses() + + +def test_cloudwatch_dashboard_update_head_node( + ec2_client_stub, ssm_client_stub, cloudwatch_client_stub +): + # create test cluster head node ID and an associated cloudwatch helper + node_id = "i-abc" + is_head_node = True + cloudwatch_helper = helpers.get_cloudwatch_helper(node_id) + + # given a directive to check for the Unified CloudWatch Agent status... + # expect CloudWatch Agent is installed + stubs.get_ec2_cwa_installed_tag_true(ec2_client_stub, node_id) + + # given a directive to update a cluster CloudWatch Dashboard Config + # with new changes... + # expect the stored the CloudWatch Dashboard Config is different from local + # config + cw_ssm_param_name = helpers.get_ssm_param_name( + cloudwatch_helper.cluster_name, "dashboard" + ) + stubs.get_param_ssm_different(ssm_client_stub, cw_ssm_param_name) + + # given an updated CloudWatch Dashboard Config file... + # expect to store the new CloudWatch Dashboard config as an SSM parameter + stubs.put_parameter_cloudwatch_config( + ssm_client_stub, cloudwatch_helper.cluster_name, "dashboard" + ) + + # given an updated CloudWatch Dashboard Config file... + # expect to update the node tag equal to updated config file sha1 hash + # to reflect the changes in config file + stubs.update_hash_tag_success( + ec2_client_stub, node_id, "dashboard", cloudwatch_helper + ) + + # given a directive to create a cluster CloudWatch dashboard... + # expect to make a call to create a dashboard for each node in the cluster + stubs.put_cluster_dashboard_success( + cloudwatch_client_stub, + cloudwatch_helper, + ) + # given our mocks and the example CloudWatch Dashboard config as input... + # expect CloudWatch Dashboard configured to use updated file + # on each cluster node successfully + cloudwatch_helper._update_cloudwatch_config("dashboard", is_head_node) + + # expect no pending responses left in client stub queues + ec2_client_stub.assert_no_pending_responses() + ssm_client_stub.assert_no_pending_responses() + + +def test_cloudwatch_dashboard_update_worker_node( + ec2_client_stub, ssm_client_stub, cloudwatch_client_stub +): + # create test cluster worker node ID and an associated cloudwatch helper + node_id = "i-abc" + is_head_node = False + cloudwatch_helper = helpers.get_cloudwatch_helper(node_id) + + # given a directive to check for the Unified CloudWatch Agent status... + # expect CloudWatch Agent is installed + stubs.get_ec2_cwa_installed_tag_true(ec2_client_stub, node_id) + + # given a directive to update a cluster CloudWatch Dashboard Config + # with new changes... + # expect the stored the CloudWatch Dashboard Config is different from local + # config + stubs.get_head_node_config_hash_different( + ec2_client_stub, "dashboard", cloudwatch_helper, node_id + ) + stubs.get_cur_node_config_hash_different(ec2_client_stub, "dashboard", node_id) + + # given an updated CloudWatch Dashboard Config file... + # expect to update the node tag equal to updated config file sha1 hash + # to reflect the changes in config file + stubs.update_hash_tag_success( + ec2_client_stub, node_id, "dashboard", cloudwatch_helper + ) + + # given our mocks and the example CloudWatch Dashboard config as input... + # expect CloudWatch Dashboard configured to use updated file + # on each cluster node successfully + cloudwatch_helper._update_cloudwatch_config("dashboard", is_head_node) + + # expect no pending responses left in client stub queues + ec2_client_stub.assert_no_pending_responses() + ssm_client_stub.assert_no_pending_responses() + + +def test_cloudwatch_alarm_update_head_node( + ec2_client_stub, ssm_client_stub, cloudwatch_client_stub +): + # create test cluster head node ID and an associated cloudwatch helper + node_id = "i-abc" + is_head_node = True + cloudwatch_helper = helpers.get_cloudwatch_helper(node_id) + + # given a directive to check for the Unified CloudWatch Agent status... + # expect CloudWatch Agent is installed + stubs.get_ec2_cwa_installed_tag_true(ec2_client_stub, node_id) + + # given a directive to update a cluster CloudWatch Alarm Config with new + # changes... + # expect the stored the CloudWatch Alarm Config is different from local + # config + cw_ssm_param_name = helpers.get_ssm_param_name( + cloudwatch_helper.cluster_name, "alarm" + ) + stubs.get_param_ssm_different(ssm_client_stub, cw_ssm_param_name) + + # given an updated CloudWatch Alarm Config file... + # expect to store the new CloudWatch Alarm config as an SSM parameter + stubs.put_parameter_cloudwatch_config( + ssm_client_stub, cloudwatch_helper.cluster_name, "alarm" + ) + + # given an updated CloudWatch Alarm Config file... + # expect to update the node tag equal to updated config file sha1 hash + # to reflect the changes in config file + stubs.update_hash_tag_success(ec2_client_stub, node_id, "alarm", cloudwatch_helper) + stubs.get_param_ssm_same( + ssm_client_stub, cw_ssm_param_name, cloudwatch_helper, "alarm" + ) + + # given a directive to create cluster CloudWatch Alarms... + # expect to make a call to create alarms for each node in the cluster + stubs.put_cluster_alarms_success(cloudwatch_client_stub, cloudwatch_helper) + + # given our mocks and the example CloudWatch Alarm config as input... + # expect CloudWatch Alarm configured to use updated file on each cluster + # node successfully + cloudwatch_helper._update_cloudwatch_config("alarm", is_head_node) + + # expect no pending responses left in client stub queues + ec2_client_stub.assert_no_pending_responses() + ssm_client_stub.assert_no_pending_responses() + + +def test_cloudwatch_alarm_update_worker_node( + ec2_client_stub, ssm_client_stub, cloudwatch_client_stub +): + # create test cluster worker node ID and an associated cloudwatch helper + node_id = "i-abc" + is_head_node = False + cloudwatch_helper = helpers.get_cloudwatch_helper(node_id) + + # given a directive to check for the Unified CloudWatch Agent status... + # expect CloudWatch Agent is installed + stubs.get_ec2_cwa_installed_tag_true(ec2_client_stub, node_id) + + # given a directive to update a cluster CloudWatch Alarm Config with new + # changes... + # expect the stored the CloudWatch Alarm Config is different from local + # config + cw_ssm_param_name = helpers.get_ssm_param_name( + cloudwatch_helper.cluster_name, "alarm" + ) + + # given a directive to update a cluster CloudWatch Alarm Config with new + # changes... + # expect the stored the CloudWatch Alarm Config is different from local + # config + stubs.get_head_node_config_hash_different( + ec2_client_stub, "alarm", cloudwatch_helper, node_id + ) + stubs.get_cur_node_config_hash_different(ec2_client_stub, "alarm", node_id) + + # given an updated CloudWatch Alarm Config file... + # expect to update the node tag equal to updated config file sha1 hash + # to reflect the changes in config file + stubs.update_hash_tag_success(ec2_client_stub, node_id, "alarm", cloudwatch_helper) + stubs.get_param_ssm_same( + ssm_client_stub, cw_ssm_param_name, cloudwatch_helper, "alarm" + ) + + # given a directive to create cluster CloudWatch Alarms... + # expect to make a call to create alarms for each node in the cluster + stubs.put_cluster_alarms_success(cloudwatch_client_stub, cloudwatch_helper) + # given our mocks and the example CloudWatch Alarm config as input... + # expect CloudWatch Alarm configured to use updated file on each cluster + # node successfully + cloudwatch_helper._update_cloudwatch_config("alarm", is_head_node) + + # expect no pending responses left in client stub queues + ec2_client_stub.assert_no_pending_responses() + ssm_client_stub.assert_no_pending_responses() + + if __name__ == "__main__": import sys diff --git a/python/ray/tests/aws/utils/helpers.py b/python/ray/tests/aws/utils/helpers.py index 73aee099ad29..12476cd6649c 100644 --- a/python/ray/tests/aws/utils/helpers.py +++ b/python/ray/tests/aws/utils/helpers.py @@ -17,6 +17,7 @@ DEFAULT_CLUSTER_NAME, DEFAULT_NODE_PROVIDER_INSTANCE_TAGS, ) +from ray.autoscaler._private.aws.cloudwatch.cloudwatch_helper import CloudwatchHelper def get_aws_example_config_file_path(file_name): @@ -100,3 +101,47 @@ def apply_node_provider_config_updates(config, node_cfg, node_type_name, max_cou node_cfg.update(node_provider_cfg_updates) # merge node provider tag specs with user overrides AWSNodeProvider._merge_tag_specs(tag_specs, user_tag_specs) + + +def get_cloudwatch_agent_config_file_path(): + return get_aws_example_config_file_path( + "cloudwatch/example-cloudwatch-agent-config.json" + ) + + +def get_cloudwatch_dashboard_config_file_path(): + return get_aws_example_config_file_path( + "cloudwatch/example-cloudwatch-dashboard-config.json" + ) + + +def get_cloudwatch_alarm_config_file_path(): + return get_aws_example_config_file_path( + "cloudwatch/example-cloudwatch-alarm-config.json" + ) + + +def load_cloudwatch_example_config_file(): + config = load_aws_example_config_file("example-cloudwatch.yaml") + cw_cfg = config["provider"]["cloudwatch"] + cw_cfg["agent"]["config"] = get_cloudwatch_agent_config_file_path() + cw_cfg["dashboard"]["config"] = get_cloudwatch_dashboard_config_file_path() + cw_cfg["alarm"]["config"] = get_cloudwatch_alarm_config_file_path() + return config + + +def get_cloudwatch_helper(node_ids): + config = load_cloudwatch_example_config_file() + config["cluster_name"] = DEFAULT_CLUSTER_NAME + return CloudwatchHelper( + config["provider"], + node_ids, + config["cluster_name"], + ) + + +def get_ssm_param_name(cluster_name, config_type): + ssm_config_param_name = "AmazonCloudWatch-" + "ray_{}_config_{}".format( + config_type, cluster_name + ) + return ssm_config_param_name diff --git a/python/ray/tests/aws/utils/stubs.py b/python/ray/tests/aws/utils/stubs.py index 2173c78736b8..e61e56c9fc72 100644 --- a/python/ray/tests/aws/utils/stubs.py +++ b/python/ray/tests/aws/utils/stubs.py @@ -1,7 +1,9 @@ from typing import Dict, List import ray import copy +import json +from uuid import uuid4 from ray.tests.aws.utils import helpers from ray.tests.aws.utils.constants import ( DEFAULT_INSTANCE_PROFILE, @@ -12,6 +14,15 @@ TWENTY_SUBNETS_IN_DIFFERENT_AZS, ) from ray.autoscaler._private.aws.config import key_pair +from ray.tests.aws.utils.helpers import ( + get_cloudwatch_dashboard_config_file_path, + get_cloudwatch_alarm_config_file_path, +) +from ray.autoscaler._private.aws.cloudwatch.cloudwatch_helper import ( + CLOUDWATCH_AGENT_INSTALLED_TAG, + CLOUDWATCH_CONFIG_HASH_TAG_BASE, +) +from ray.autoscaler.tags import NODE_KIND_HEAD, TAG_RAY_NODE_KIND from unittest import mock @@ -235,3 +246,352 @@ def describe_launch_template_versions_by_name_default(ec2_client_stub, versions) }, service_response={"LaunchTemplateVersions": [DEFAULT_LT]}, ) + + +def describe_instance_status_ok(ec2_client_stub, instance_ids): + ec2_client_stub.add_response( + "describe_instance_status", + expected_params={"InstanceIds": instance_ids}, + service_response={ + "InstanceStatuses": [ + { + "InstanceId": instance_id, + "InstanceState": {"Code": 16, "Name": "running"}, + "AvailabilityZone": "us-west-2", + "SystemStatus": { + "Status": "ok", + "Details": [{"Status": "passed", "Name": "reachability"}], + }, + "InstanceStatus": { + "Status": "ok", + "Details": [{"Status": "passed", "Name": "reachability"}], + }, + } + ] + for instance_id in instance_ids + }, + ) + + +def get_ec2_cwa_installed_tag_true(ec2_client_stub, node_id): + ec2_client_stub.add_response( + "describe_instances", + expected_params={"InstanceIds": [node_id]}, + service_response={ + "Reservations": [ + { + "Instances": [ + { + "InstanceId": node_id, + "Tags": [ + { + "Key": CLOUDWATCH_AGENT_INSTALLED_TAG, + "Value": "True", + }, + ], + } + ] + } + ] + }, + ) + + +def update_hash_tag_success(ec2_client_stub, node_id, config_type, cloudwatch_helper): + hash_key_value = "-".join([CLOUDWATCH_CONFIG_HASH_TAG_BASE, config_type]) + cur_hash_value = get_sha1_hash_of_cloudwatch_config_file( + config_type, cloudwatch_helper + ) + ec2_client_stub.add_response( + "create_tags", + expected_params={ + "Resources": [node_id], + "Tags": [{"Key": hash_key_value, "Value": cur_hash_value}], + }, + service_response={"ResponseMetadata": {"HTTPStatusCode": 200}}, + ) + + +def add_cwa_installed_tag_response(ec2_client_stub, node_id): + ec2_client_stub.add_response( + "create_tags", + expected_params={ + "Resources": node_id, + "Tags": [{"Key": CLOUDWATCH_AGENT_INSTALLED_TAG, "Value": "True"}], + }, + service_response={"ResponseMetadata": {"HTTPStatusCode": 200}}, + ) + + +def get_head_node_config_hash_different(ec2_client_stub, config_type, cwh, node_id): + hash_key_value = "-".join([CLOUDWATCH_CONFIG_HASH_TAG_BASE, config_type]) + cur_hash_value = get_sha1_hash_of_cloudwatch_config_file(config_type, cwh) + filters = cwh._get_current_cluster_session_nodes(cwh.cluster_name) + filters.append( + { + "Name": "tag:{}".format(TAG_RAY_NODE_KIND), + "Values": [NODE_KIND_HEAD], + } + ) + ec2_client_stub.add_response( + "describe_instances", + expected_params={"Filters": filters}, + service_response={ + "Reservations": [ + { + "Instances": [ + { + "InstanceId": node_id, + "Tags": [ + {"Key": hash_key_value, "Value": cur_hash_value}, + ], + } + ] + } + ] + }, + ) + + +def get_cur_node_config_hash_different(ec2_client_stub, config_type, node_id): + hash_key_value = "-".join([CLOUDWATCH_CONFIG_HASH_TAG_BASE, config_type]) + ec2_client_stub.add_response( + "describe_instances", + expected_params={"InstanceIds": [node_id]}, + service_response={ + "Reservations": [ + { + "Instances": [ + { + "InstanceId": node_id, + "Tags": [ + {"Key": hash_key_value, "Value": str(uuid4())}, + ], + } + ] + } + ] + }, + ) + + +def send_command_cwa_install(ssm_client_stub, node_id): + command_id = str(uuid4()) + ssm_client_stub.add_response( + "send_command", + expected_params={ + "DocumentName": "AWS-ConfigureAWSPackage", + "InstanceIds": node_id, + "MaxConcurrency": "1", + "MaxErrors": "0", + "Parameters": { + "action": ["Install"], + "name": ["AmazonCloudWatchAgent"], + "version": ["latest"], + }, + }, + service_response={ + "Command": { + "CommandId": command_id, + "DocumentName": "AWS-ConfigureAWSPackage", + } + }, + ) + return command_id + + +def list_command_invocations_status(ssm_client_stub, node_id, cmd_id, status): + ssm_client_stub.add_response( + "list_command_invocations", + expected_params={"CommandId": cmd_id, "InstanceId": node_id}, + service_response={"CommandInvocations": [{"Status": status}]}, + ) + + +def list_command_invocations_failed(ssm_client_stub, node_id, cmd_id): + status = "Failed" + list_command_invocations_status(ssm_client_stub, node_id, cmd_id, status) + + +def list_command_invocations_success(ssm_client_stub, node_id, cmd_id): + status = "Success" + list_command_invocations_status(ssm_client_stub, node_id, cmd_id, status) + + +def put_parameter_cloudwatch_config(ssm_client_stub, cluster_name, section_name): + ssm_config_param_name = helpers.get_ssm_param_name(cluster_name, section_name) + ssm_client_stub.add_response( + "put_parameter", + expected_params={ + "Name": ssm_config_param_name, + "Type": "String", + "Value": ANY, + "Overwrite": True, + "Tier": ANY, + }, + service_response={}, + ) + + +def send_command_cwa_collectd_init(ssm_client_stub, node_id): + command_id = str(uuid4()) + ssm_client_stub.add_response( + "send_command", + expected_params={ + "DocumentName": "AWS-RunShellScript", + "InstanceIds": [node_id], + "MaxConcurrency": "1", + "MaxErrors": "0", + "Parameters": { + "commands": [ + "mkdir -p /usr/share/collectd/", + "touch /usr/share/collectd/types.db", + ], + }, + }, + service_response={"Command": {"CommandId": command_id}}, + ) + return command_id + + +def send_command_start_cwa(ssm_client_stub, node_id, parameter_name): + command_id = str(uuid4()) + ssm_client_stub.add_response( + "send_command", + expected_params={ + "DocumentName": "AmazonCloudWatch-ManageAgent", + "InstanceIds": [node_id], + "MaxConcurrency": "1", + "MaxErrors": "0", + "Parameters": { + "action": ["configure"], + "mode": ["ec2"], + "optionalConfigurationSource": ["ssm"], + "optionalConfigurationLocation": [parameter_name], + "optionalRestart": ["yes"], + }, + }, + service_response={"Command": {"CommandId": command_id}}, + ) + return command_id + + +def send_command_stop_cwa(ssm_client_stub, node_id): + command_id = str(uuid4()) + ssm_client_stub.add_response( + "send_command", + expected_params={ + "DocumentName": "AmazonCloudWatch-ManageAgent", + "InstanceIds": [node_id], + "MaxConcurrency": "1", + "MaxErrors": "0", + "Parameters": { + "action": ["stop"], + "mode": ["ec2"], + }, + }, + service_response={"Command": {"CommandId": command_id}}, + ) + return command_id + + +def get_param_ssm_same(ssm_client_stub, ssm_param_name, cloudwatch_helper, config_type): + command_id = str(uuid4()) + cw_value_json = ( + cloudwatch_helper.CLOUDWATCH_CONFIG_TYPE_TO_CONFIG_VARIABLE_REPLACE_FUNC.get( + config_type + )(config_type) + ) + ssm_client_stub.add_response( + "get_parameter", + expected_params={"Name": ssm_param_name}, + service_response={"Parameter": {"Value": json.dumps(cw_value_json)}}, + ) + return command_id + + +def get_sha1_hash_of_cloudwatch_config_file(config_type, cloudwatch_helper): + cw_value_file = cloudwatch_helper._sha1_hash_file(config_type) + return cw_value_file + + +def get_param_ssm_different(ssm_client_stub, ssm_param_name): + command_id = str(uuid4()) + ssm_client_stub.add_response( + "get_parameter", + expected_params={"Name": ssm_param_name}, + service_response={"Parameter": {"Value": "value"}}, + ) + return command_id + + +def get_param_ssm_exception(ssm_client_stub, ssm_param_name): + command_id = str(uuid4()) + ssm_client_stub.add_client_error( + "get_parameter", + "ParameterNotFound", + expected_params={"Name": ssm_param_name}, + response_meta={"Error": {"Code": "ParameterNotFound"}}, + ) + return command_id + + +def put_cluster_dashboard_success(cloudwatch_client_stub, cloudwatch_helper): + widgets = [] + json_config_path = get_cloudwatch_dashboard_config_file_path() + with open(json_config_path) as f: + dashboard_config = json.load(f) + + for item in dashboard_config: + item_out = cloudwatch_helper._replace_all_config_variables( + item, + cloudwatch_helper.node_id, + cloudwatch_helper.cluster_name, + cloudwatch_helper.provider_config["region"], + ) + widgets.append(item_out) + + dashboard_name = cloudwatch_helper.cluster_name + "-" + "example-dashboard-name" + cloudwatch_client_stub.add_response( + "put_dashboard", + expected_params={ + "DashboardName": dashboard_name, + "DashboardBody": json.dumps({"widgets": widgets}), + }, + service_response={"ResponseMetadata": {"HTTPStatusCode": 200}}, + ) + + +def put_cluster_alarms_success(cloudwatch_client_stub, cloudwatch_helper): + json_config_path = get_cloudwatch_alarm_config_file_path() + with open(json_config_path) as f: + data = json.load(f) + for item in data: + item_out = copy.deepcopy(item) + cloudwatch_helper._replace_all_config_variables( + item_out, + cloudwatch_helper.node_id, + cloudwatch_helper.cluster_name, + cloudwatch_helper.provider_config["region"], + ) + cloudwatch_client_stub.add_response( + "put_metric_alarm", + expected_params=item_out, + service_response={"ResponseMetadata": {"HTTPStatusCode": 200}}, + ) + + +def get_metric_alarm(cloudwatch_client_stub): + cloudwatch_client_stub.add_response( + "describe_alarms", + expected_params={}, + service_response={"MetricAlarms": [{"AlarmName": "myalarm"}]}, + ) + + +def delete_metric_alarms(cloudwatch_client_stub): + cloudwatch_client_stub.add_response( + "delete_alarms", + expected_params={"AlarmNames": ["myalarm"]}, + service_response={"ResponseMetadata": {"HTTPStatusCode": 200}}, + )