Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Define a general-purpose cleanup method for CREvent #849

Merged
merged 2 commits into from
Dec 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 38 additions & 43 deletions tests/framework/prototype.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@
import jsonpatch

from framework.utils import (
create_custom_object,
delete_custom_object,
get_head_pod,
logger,
pod_exec_command,
shell_subprocess_run,
shell_subprocess_check_output,
CONST,
Expand Down Expand Up @@ -47,9 +51,9 @@ def get_expected_head_pods(custom_resource):
"""Get the number of head pods in custom_resource"""
resource_kind = custom_resource["kind"]
head_replica_paths = {
"RayCluster": "spec.headGroupSpec.replicas",
"RayService": "spec.rayClusterConfig.headGroupSpec.replicas",
"RayJob": "spec.rayClusterSpec.headGroupSpec.replicas"
CONST.RAY_CLUSTER_CRD: "spec.headGroupSpec.replicas",
CONST.RAY_SERVICE_CRD: "spec.rayClusterConfig.headGroupSpec.replicas",
CONST.RAY_JOB_CRD: "spec.rayClusterSpec.headGroupSpec.replicas"
}
if resource_kind in head_replica_paths:
path = head_replica_paths[resource_kind]
Expand All @@ -60,9 +64,9 @@ def get_expected_worker_pods(custom_resource):
"""Get the number of head pods in custom_resource"""
resource_kind = custom_resource["kind"]
worker_specs_paths = {
"RayCluster": "spec.workerGroupSpecs",
"RayService": "spec.rayClusterConfig.workerGroupSpecs",
"RayJob": "spec.rayClusterSpec.workerGroupSpecs"
CONST.RAY_CLUSTER_CRD: "spec.workerGroupSpecs",
CONST.RAY_SERVICE_CRD: "spec.rayClusterConfig.workerGroupSpecs",
CONST.RAY_JOB_CRD: "spec.rayClusterSpec.workerGroupSpecs"
}
if resource_kind in worker_specs_paths:
path = worker_specs_paths[resource_kind]
Expand Down Expand Up @@ -95,6 +99,7 @@ class Mutator:
def __init__(self, base_custom_resource, json_patch_list: List[jsonpatch.JsonPatch]):
self.base_cr = base_custom_resource
self.patch_list = json_patch_list

def mutate(self):
""" Generate a new cr by applying the json patch to `cr`. """
for patch in self.patch_list:
Expand All @@ -108,12 +113,14 @@ class Rule:
"""
def __init__(self):
pass

def trigger_condition(self, custom_resource=None) -> bool:
"""
The rule will only be checked when `trigger_condition` is true. For example, we will only
check "HeadPodNameRule" when "spec.headGroupSpec" is defined in CR YAML file.
"""
return True

def assert_rule(self, custom_resource=None, cr_namespace='default'):
"""Check whether the actual cluster state fulfills the rule or not."""
raise NotImplementedError
Expand All @@ -122,6 +129,7 @@ class RuleSet:
"""A set of Rule"""
def __init__(self, rules: List[Rule]):
self.rules = rules

def check_rule_set(self, custom_resource, namespace):
"""Check all rules that the trigger conditions are fulfilled."""
for rule in self.rules:
Expand Down Expand Up @@ -152,19 +160,26 @@ def trigger(self):
self.exec()
self.wait()
self.check_rule_sets()

def exec(self):
"""
Execute a command to trigger the CREvent. For example, create a CR by a
`kubectl apply` command.
"""
raise NotImplementedError
if not self.filepath:
create_custom_object(self.namespace, self.custom_resource_object)
else:
shell_subprocess_run(f"kubectl apply -n {self.namespace} -f {self.filepath}")

def wait(self):
"""Wait for the system to converge."""
time.sleep(self.timeout)

def check_rule_sets(self):
"""When the system converges, check all registered RuleSets."""
for ruleset in self.rulesets:
ruleset.check_rule_set(self.custom_resource_object, self.namespace)

def clean_up(self):
"""Cleanup the CR."""
raise NotImplementedError
Expand All @@ -179,9 +194,8 @@ def trigger_condition(self, custom_resource=None) -> bool:
def assert_rule(self, custom_resource=None, cr_namespace='default'):
expected_val = search_path(custom_resource,
"spec.headGroupSpec.template.spec.containers.0.name".split('.'))
headpods = K8S_CLUSTER_MANAGER.k8s_client_dict[CONST.K8S_V1_CLIENT_KEY].list_namespaced_pod(
namespace = cr_namespace, label_selector='ray.io/node-type=head')
assert headpods.items[0].spec.containers[0].name == expected_val
headpod = get_head_pod(cr_namespace)
assert headpod.spec.containers[0].name == expected_val

class HeadSvcRule(Rule):
"""The labels of the head pod and the selectors of the head service must match."""
Expand All @@ -199,12 +213,10 @@ def assert_rule(self, custom_resource=None, cr_namespace='default'):
class EasyJobRule(Rule):
"""Submit a very simple Ray job to test the basic functionality of the Ray cluster."""
def assert_rule(self, custom_resource=None, cr_namespace='default'):
k8s_v1_api = K8S_CLUSTER_MANAGER.k8s_client_dict[CONST.K8S_V1_CLIENT_KEY]
headpods = k8s_v1_api.list_namespaced_pod(
namespace = cr_namespace, label_selector='ray.io/node-type=head')
headpod_name = headpods.items[0].metadata.name
shell_subprocess_run(f"kubectl exec {headpod_name} -n {cr_namespace} --" +
" python -c \"import ray; ray.init(); print(ray.cluster_resources())\"")
headpod = get_head_pod(cr_namespace)
headpod_name = headpod.metadata.name
pod_exec_command(headpod_name, cr_namespace,
"python -c \"import ray; ray.init(); print(ray.cluster_resources())\"")

class CurlServiceRule(Rule):
""""Using curl to access the deployed application on Ray service"""
Expand All @@ -231,15 +243,6 @@ def assert_rule(self, custom_resource=None, cr_namespace='default'):

class RayClusterAddCREvent(CREvent):
"""CREvent for RayCluster addition"""
def exec(self):
if not self.filepath:
k8s_cr_api = K8S_CLUSTER_MANAGER.k8s_client_dict[CONST.K8S_CR_CLIENT_KEY]
k8s_cr_api.create_namespaced_custom_object(
group = 'ray.io',version = 'v1alpha1', namespace = self.namespace,
plural = 'rayclusters', body = self.custom_resource_object)
else:
shell_subprocess_run(f"kubectl apply -n {self.namespace} -f {self.filepath}")

def wait(self):
start_time = time.time()
expected_head_pods = get_expected_head_pods(self.custom_resource_object)
Expand Down Expand Up @@ -272,10 +275,11 @@ def wait(self):

def clean_up(self):
"""Delete added RayCluster"""
k8s_cr_api = K8S_CLUSTER_MANAGER.k8s_client_dict[CONST.K8S_CR_CLIENT_KEY]
k8s_cr_api.delete_namespaced_custom_object(
group = 'ray.io', version = 'v1alpha1', namespace = self.namespace,
plural = 'rayclusters', name = self.custom_resource_object['metadata']['name'])
if not self.filepath:
delete_custom_object(CONST.RAY_CLUSTER_CRD,
self.namespace, self.custom_resource_object['metadata']['name'])
else:
shell_subprocess_run(f"kubectl delete -n {self.namespace} -f {self.filepath}")
# Wait pods to be deleted
converge = False
k8s_v1_api = K8S_CLUSTER_MANAGER.k8s_client_dict[CONST.K8S_V1_CLIENT_KEY]
Expand All @@ -300,16 +304,6 @@ def clean_up(self):

class RayServiceAddCREvent(CREvent):
"""CREvent for RayService addition"""
def exec(self):
"""Wait for RayService to converge"""""
if not self.filepath:
k8s_cr_api = K8S_CLUSTER_MANAGER.k8s_client_dict[CONST.K8S_CR_CLIENT_KEY]
k8s_cr_api.create_namespaced_custom_object(
group = 'ray.io',version = 'v1alpha1', namespace = self.namespace,
plural = 'rayservices', body = self.custom_resource_object)
else:
shell_subprocess_run(f"kubectl apply -n {self.namespace} -f {self.filepath}")

def wait(self):
"""Wait for RayService to converge"""""
start_time = time.time()
Expand Down Expand Up @@ -347,10 +341,11 @@ def wait(self):

def clean_up(self):
"""Delete added RayService"""
k8s_cr_api = K8S_CLUSTER_MANAGER.k8s_client_dict[CONST.K8S_CR_CLIENT_KEY]
k8s_cr_api.delete_namespaced_custom_object(
group = 'ray.io', version = 'v1alpha1', namespace = self.namespace,
plural = 'rayservices', name = self.custom_resource_object['metadata']['name'])
if not self.filepath:
delete_custom_object(CONST.RAY_SERVICE_CRD,
self.namespace, self.custom_resource_object['metadata']['name'])
else:
shell_subprocess_run(f"kubectl delete -n {self.namespace} -f {self.filepath}")
# Wait pods to be deleted
converge = False
k8s_v1_api = K8S_CLUSTER_MANAGER.k8s_client_dict[CONST.K8S_V1_CLIENT_KEY]
Expand Down
34 changes: 34 additions & 0 deletions tests/framework/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ class CONST:
RAY_FT = "RAY_FT"
RAY_SERVICE = "RAY_SERVICE"

# Custom Resource Definitions
RAY_CLUSTER_CRD = "RayCluster"
RAY_SERVICE_CRD = "RayService"
RAY_JOB_CRD = "RayJob"

CONST = CONST()

class KubernetesClusterManager:
Expand Down Expand Up @@ -152,3 +157,32 @@ def pod_exec_command(pod_name, namespace, exec_command, check = True):
Both STDOUT and STDERR of `exec_command` will be printed.
"""
return shell_subprocess_run(f"kubectl exec {pod_name} -n {namespace} -- {exec_command}", check)

def create_custom_object(namespace, cr_object):
"""Create a custom resource based on `cr_object` in the given `namespace`."""
k8s_cr_api = K8S_CLUSTER_MANAGER.k8s_client_dict[CONST.K8S_CR_CLIENT_KEY]
crd = cr_object["kind"]
if crd == CONST.RAY_CLUSTER_CRD:
k8s_cr_api.create_namespaced_custom_object(
group = 'ray.io', version = 'v1alpha1', namespace = namespace,
plural = 'rayclusters', body = cr_object)
elif crd == CONST.RAY_SERVICE_CRD:
k8s_cr_api.create_namespaced_custom_object(
group = 'ray.io', version = 'v1alpha1', namespace = namespace,
plural = 'rayservices', body = cr_object)
elif crd == CONST.RAY_JOB_CRD:
raise NotImplementedError

def delete_custom_object(crd, namespace, cr_name):
"""Delete the given `cr_name` custom resource in the given `namespace`."""
k8s_cr_api = K8S_CLUSTER_MANAGER.k8s_client_dict[CONST.K8S_CR_CLIENT_KEY]
if crd == CONST.RAY_CLUSTER_CRD:
k8s_cr_api.delete_namespaced_custom_object(
group = 'ray.io', version = 'v1alpha1', namespace = namespace,
plural = 'rayclusters', name = cr_name)
elif crd == CONST.RAY_SERVICE_CRD:
k8s_cr_api.delete_namespaced_custom_object(
group = 'ray.io', version = 'v1alpha1', namespace = namespace,
plural = 'rayservices', name = cr_name)
elif crd == CONST.RAY_JOB_CRD:
raise NotImplementedError
5 changes: 2 additions & 3 deletions tests/test_sample_raycluster_yamls.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@
github_action_tests = {
"ray-cluster.getting-started.yaml",
"ray-cluster.ingress.yaml",
"ray-cluster.mini.yaml"
"ray-cluster.mini.yaml",
"ray-cluster.external-redis.yaml"
}

# Paths of untracked files, specified as strings, relative to KubeRay
Expand All @@ -61,8 +62,6 @@

skip_tests = {
'ray-cluster.complete.large.yaml': 'Skip this test because it requires a lot of resources.',
'ray-cluster.external-redis.yaml':
'It installs multiple Kubernetes resources and cannot clean up by DeleteCREvent.',
'ray-cluster.autoscaler.large.yaml':
'Skip this test because it requires a lot of resources.'
}
Expand Down