From a37388f8b614a837a0ef8e179dea21e1a93b9807 Mon Sep 17 00:00:00 2001 From: Jiri Petrlik Date: Wed, 9 Aug 2023 16:09:19 +0200 Subject: [PATCH] Allow E2E tests to run with arbitrary k8s cluster --- tests/compatibility-test.py | 12 +-- tests/framework/prototype.py | 6 +- tests/framework/utils.py | 126 +++++++++++++++++++++++--- tests/test_sample_rayservice_yamls.py | 6 +- tests/test_security.py | 4 +- 5 files changed, 127 insertions(+), 27 deletions(-) diff --git a/tests/compatibility-test.py b/tests/compatibility-test.py index c518526702..6f52385ae6 100755 --- a/tests/compatibility-test.py +++ b/tests/compatibility-test.py @@ -43,8 +43,8 @@ class BasicRayTestCase(unittest.TestCase): @classmethod def setUpClass(cls): """Create a Kind cluster, a KubeRay operator, and a RayCluster.""" - K8S_CLUSTER_MANAGER.delete_kind_cluster() - K8S_CLUSTER_MANAGER.create_kind_cluster() + K8S_CLUSTER_MANAGER.cleanup() + K8S_CLUSTER_MANAGER.initialize_cluster() image_dict = { CONST.RAY_IMAGE_KEY: ray_image, CONST.OPERATOR_IMAGE_KEY: kuberay_operator_image @@ -77,8 +77,8 @@ class RayFTTestCase(unittest.TestCase): def setUpClass(cls): if not utils.is_feature_supported(ray_version, CONST.RAY_FT): raise unittest.SkipTest(f"{CONST.RAY_FT} is not supported") - K8S_CLUSTER_MANAGER.delete_kind_cluster() - K8S_CLUSTER_MANAGER.create_kind_cluster() + K8S_CLUSTER_MANAGER.cleanup() + K8S_CLUSTER_MANAGER.initialize_cluster() image_dict = { CONST.RAY_IMAGE_KEY: ray_image, CONST.OPERATOR_IMAGE_KEY: kuberay_operator_image @@ -224,8 +224,8 @@ class KubeRayHealthCheckTestCase(unittest.TestCase): @classmethod def setUpClass(cls): - K8S_CLUSTER_MANAGER.delete_kind_cluster() - K8S_CLUSTER_MANAGER.create_kind_cluster() + K8S_CLUSTER_MANAGER.cleanup() + K8S_CLUSTER_MANAGER.initialize_cluster() image_dict = { CONST.RAY_IMAGE_KEY: ray_image, CONST.OPERATOR_IMAGE_KEY: kuberay_operator_image diff --git a/tests/framework/prototype.py b/tests/framework/prototype.py index dc3c052333..322c173fbd 100644 --- a/tests/framework/prototype.py +++ b/tests/framework/prototype.py @@ -599,11 +599,11 @@ def __init__(self, methodName, docker_image_dict, cr_event): @classmethod def setUpClass(cls): - K8S_CLUSTER_MANAGER.delete_kind_cluster() + K8S_CLUSTER_MANAGER.cleanup() def setUp(self): if not K8S_CLUSTER_MANAGER.check_cluster_exist(): - K8S_CLUSTER_MANAGER.create_kind_cluster() + K8S_CLUSTER_MANAGER.initialize_cluster() self.operator_manager.prepare_operator() def runtest(self): @@ -615,4 +615,4 @@ def tearDown(self) -> None: self.cr_event.clean_up() except Exception as ex: logger.error(str(ex)) - K8S_CLUSTER_MANAGER.delete_kind_cluster() + K8S_CLUSTER_MANAGER.cleanup() diff --git a/tests/framework/utils.py b/tests/framework/utils.py index 7d29a192be..f7c0f4cdaf 100644 --- a/tests/framework/utils.py +++ b/tests/framework/utils.py @@ -10,6 +10,7 @@ import jsonpatch import time from kubernetes import client, config +from abc import ABC, abstractmethod logger = logging.getLogger(__name__) logging.basicConfig( @@ -60,16 +61,110 @@ class CONST: CONST = CONST() +class ClusterManager(ABC): + EXTERNAL_CLUSTER = "EXTERNAL_CLUSTER" -class KubernetesClusterManager: + @abstractmethod + def initialize_cluster(self, kind_config=None) -> None: + pass + + @abstractmethod + def cleanup(self) -> None: + pass + + @abstractmethod + def upload_image(): + pass + + @abstractmethod + def check_cluster_exist(self) -> bool: + pass + + @classmethod + def instance(cls): + if cls.EXTERNAL_CLUSTER in os.environ: + return ExternalClusterManager() + else: + return KindClusterManager() + +class ExternalClusterManager(ClusterManager): + def __init__(self) -> None: + config.load_kube_config() + self.k8s_client_dict = {} + self.k8s_client_dict.update( + { + CONST.K8S_V1_CLIENT_KEY: client.CoreV1Api(), + CONST.K8S_CR_CLIENT_KEY: client.CustomObjectsApi(), + } + ) + self.cleanup_timeout = 120 + + def cleanup(self, namespace = "default") -> None: + self.__delete_all_crs("ray.io", "v1alpha1", namespace, "rayservices") + self.__delete_all_crs("ray.io", "v1alpha1", namespace, "rayjobs") + self.__delete_all_crs("ray.io", "v1alpha1", namespace, "rayclusters") + + k8s_v1_api = self.k8s_client_dict[CONST.K8S_V1_CLIENT_KEY] + start_time = time.time() + while time.time() - start_time < self.cleanup_timeout: + pods = k8s_v1_api.list_pod_for_all_namespaces(label_selector = 'app.kubernetes.io/created-by=kuberay-operator') + if len(pods.items) == 0: + logger.info("--- Cleanup rayservices, rayjobs, rayclusters %s seconds ---", time.time() - start_time) + break + + time.sleep(1) + + shell_subprocess_run("helm uninstall kuberay-operator", check=False) + start_time = time.time() + while time.time() - start_time < self.cleanup_timeout: + pods = k8s_v1_api.list_pod_for_all_namespaces(label_selector = 'app.kubernetes.io/component=kuberay-operator') + if len(pods.items) == 0: + logger.info("--- Cleanup kuberay-operator %s seconds ---", time.time() - start_time) + break + + time.sleep(1) + + for _, k8s_client in self.k8s_client_dict.items(): + k8s_client.api_client.rest_client.pool_manager.clear() + k8s_client.api_client.close() + + self.k8s_client_dict = {} + + def initialize_cluster(self, kind_config=None) -> None: + pass + + def upload_image(self, image): + pass + + def check_cluster_exist(self) -> bool: + """Check whether cluster exists or not""" + return ( + shell_subprocess_run( + "kubectl cluster-info", check=False + ) + == 0 + ) + + def __delete_all_crs(self, group, version, namespace, plural): + custom_objects_api = self.k8s_client_dict[CONST.K8S_CR_CLIENT_KEY] + try: + crs = custom_objects_api.list_namespaced_custom_object(group, version, namespace, plural) + for cr in crs["items"]: + name = cr["metadata"]["name"] + custom_objects_api.delete_namespaced_custom_object(group, version, namespace, plural, name) + except client.exceptions.ApiException: + logger.info("CRD did not exist during clean up %s", plural) + + +class KindClusterManager(ClusterManager): """ - KubernetesClusterManager controls the lifecycle of KinD cluster and Kubernetes API client. + KindClusterManager controlls the lifecycle of KinD cluster and Kubernetes API client. """ def __init__(self) -> None: self.k8s_client_dict = {} - def delete_kind_cluster(self) -> None: + def cleanup(self) -> None: """Delete a KinD cluster""" shell_subprocess_run("kind delete cluster") for _, k8s_client in self.k8s_client_dict.items(): @@ -77,12 +172,7 @@ def delete_kind_cluster(self) -> None: k8s_client.api_client.close() self.k8s_client_dict = {} - def _adjust_kubeconfig_server_address(self) -> None: - """Modify the server address in kubeconfig to https://docker:6443""" - if os.getenv(CONST.BUILDKITE_ENV, default="") == "true": - shell_subprocess_run("kubectl config set clusters.kind-kind.server https://docker:6443") - - def create_kind_cluster(self, kind_config=None) -> None: + def initialize_cluster(self, kind_config=None) -> None: """Create a KinD cluster""" # To use NodePort service, `kind_config` needs to set `extraPortMappings` properly. kind_config = CONST.DEFAULT_KIND_CONFIG if not kind_config else kind_config @@ -99,6 +189,9 @@ def create_kind_cluster(self, kind_config=None) -> None: } ) + def upload_image(self, image): + shell_subprocess_run(f"kind load docker-image {image}") + def check_cluster_exist(self) -> bool: """Check whether KinD cluster exists or not""" return ( @@ -107,9 +200,13 @@ def check_cluster_exist(self) -> bool: ) == 0 ) + + def _adjust_kubeconfig_server_address(self) -> None: + """Modify the server address in kubeconfig to https://docker:6443""" + if os.getenv(CONST.BUILDKITE_ENV, default="") == "true": + shell_subprocess_run("kubectl config set clusters.kind-kind.server https://docker:6443") - -K8S_CLUSTER_MANAGER = KubernetesClusterManager() +K8S_CLUSTER_MANAGER = ClusterManager.instance() class OperatorManager: @@ -122,12 +219,15 @@ class OperatorManager: namespace : A namespace(string) that KubeRay operator will be installed in. patch : A jsonpatch that will be applied to the default KubeRay operator config to create the custom config. + cluster_manager : Cluster manager instance. """ def __init__( - self, docker_image_dict, namespace="default", patch=jsonpatch.JsonPatch([]) + self, docker_image_dict, namespace="default", patch=jsonpatch.JsonPatch([]), + cluster_manager = K8S_CLUSTER_MANAGER ) -> None: self.docker_image_dict = docker_image_dict + self.cluster_manager = cluster_manager self.namespace = namespace self.values_yaml = {} for key in [CONST.OPERATOR_IMAGE_KEY, CONST.RAY_IMAGE_KEY]: @@ -175,7 +275,7 @@ def download_images(): logger.info("Load images into KinD cluster") for key in self.docker_image_dict: image = self.docker_image_dict[key] - shell_subprocess_run(f"kind load docker-image {image}") + self.cluster_manager.upload_image(image) def __install_crd_and_operator(self): """ diff --git a/tests/test_sample_rayservice_yamls.py b/tests/test_sample_rayservice_yamls.py index 1813719fe2..d50455894d 100644 --- a/tests/test_sample_rayservice_yamls.py +++ b/tests/test_sample_rayservice_yamls.py @@ -197,8 +197,8 @@ def set_up_cluster(self): {"path": "/calc", "json_args": ["MUL", 3], "expected_output": '"15 pizzas please!"'}, ] - K8S_CLUSTER_MANAGER.delete_kind_cluster() - K8S_CLUSTER_MANAGER.create_kind_cluster() + K8S_CLUSTER_MANAGER.cleanup() + K8S_CLUSTER_MANAGER.initialize_cluster() operator_manager = OperatorManager(DEFAULT_IMAGE_DICT) operator_manager.prepare_operator() start_curl_pod("curl", "default") @@ -206,7 +206,7 @@ def set_up_cluster(self): yield - K8S_CLUSTER_MANAGER.delete_kind_cluster() + K8S_CLUSTER_MANAGER.cleanup() def test_deploy_applications(self, set_up_cluster): rs = RuleSet([EasyJobRule(), CurlServiceRule(queries=self.default_queries)]) diff --git a/tests/test_security.py b/tests/test_security.py index 7a8794a70b..c5475e4f1b 100644 --- a/tests/test_security.py +++ b/tests/test_security.py @@ -39,9 +39,9 @@ class PodSecurityTestCase(unittest.TestCase): @classmethod def setUpClass(cls): - K8S_CLUSTER_MANAGER.delete_kind_cluster() + K8S_CLUSTER_MANAGER.cleanup() kind_config = CONST.REPO_ROOT.joinpath("ray-operator/config/security/kind-config.yaml") - K8S_CLUSTER_MANAGER.create_kind_cluster(kind_config = kind_config) + K8S_CLUSTER_MANAGER.initialize_cluster(kind_config = kind_config) # Apply the restricted Pod security standard to all Pods in the namespace pod-security. # The label pod-security.kubernetes.io/enforce=restricted means that the Pod that violates # the policies will be rejected.