-
Notifications
You must be signed in to change notification settings - Fork 402
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Allow E2E tests to run with arbitrary k8s cluster #1306
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,6 +10,7 @@ | |
import jsonpatch | ||
import time | ||
from kubernetes import client, config | ||
from abc import ABC, abstractmethod | ||
|
||
logger = logging.getLogger(__name__) | ||
logging.basicConfig( | ||
|
@@ -60,29 +61,118 @@ class CONST: | |
|
||
CONST = CONST() | ||
|
||
class ClusterManager(ABC): | ||
EXTERNAL_CLUSTER = "EXTERNAL_CLUSTER" | ||
|
||
class KubernetesClusterManager: | ||
@abstractmethod | ||
def initialize_cluster(self, kind_config=None) -> None: | ||
pass | ||
|
||
@abstractmethod | ||
def cleanup(self) -> None: | ||
pass | ||
|
||
@abstractmethod | ||
def upload_image(): | ||
pass | ||
|
||
@abstractmethod | ||
def check_cluster_exist(self) -> bool: | ||
pass | ||
|
||
@classmethod | ||
def instance(cls): | ||
if cls.EXTERNAL_CLUSTER in os.environ: | ||
return ExternalClusterManager() | ||
else: | ||
return KindClusterManager() | ||
|
||
class ExternalClusterManager(ClusterManager): | ||
def __init__(self) -> None: | ||
config.load_kube_config() | ||
self.k8s_client_dict = {} | ||
self.k8s_client_dict.update( | ||
{ | ||
CONST.K8S_V1_CLIENT_KEY: client.CoreV1Api(), | ||
CONST.K8S_CR_CLIENT_KEY: client.CustomObjectsApi(), | ||
} | ||
) | ||
self.cleanup_timeout = 120 | ||
|
||
def cleanup(self, namespace = "default") -> None: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In ClusterManager's definition, |
||
self.__delete_all_crs("ray.io", "v1alpha1", namespace, "rayservices") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The custom resource lifecycles are better controlled by CREvent (link). In my opinion, the cleanup at the cluster level is not necessary. We can remove this function. |
||
self.__delete_all_crs("ray.io", "v1alpha1", namespace, "rayjobs") | ||
self.__delete_all_crs("ray.io", "v1alpha1", namespace, "rayclusters") | ||
|
||
k8s_v1_api = self.k8s_client_dict[CONST.K8S_V1_CLIENT_KEY] | ||
start_time = time.time() | ||
while time.time() - start_time < self.cleanup_timeout: | ||
pods = k8s_v1_api.list_pod_for_all_namespaces(label_selector = 'app.kubernetes.io/created-by=kuberay-operator') | ||
if len(pods.items) == 0: | ||
logger.info("--- Cleanup rayservices, rayjobs, rayclusters %s seconds ---", time.time() - start_time) | ||
break | ||
|
||
time.sleep(1) | ||
|
||
shell_subprocess_run("helm uninstall kuberay-operator", check=False) | ||
start_time = time.time() | ||
while time.time() - start_time < self.cleanup_timeout: | ||
pods = k8s_v1_api.list_pod_for_all_namespaces(label_selector = 'app.kubernetes.io/component=kuberay-operator') | ||
if len(pods.items) == 0: | ||
logger.info("--- Cleanup kuberay-operator %s seconds ---", time.time() - start_time) | ||
break | ||
|
||
time.sleep(1) | ||
|
||
for _, k8s_client in self.k8s_client_dict.items(): | ||
k8s_client.api_client.rest_client.pool_manager.clear() | ||
k8s_client.api_client.close() | ||
|
||
kevin85421 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
self.k8s_client_dict = {} | ||
|
||
def initialize_cluster(self, kind_config=None) -> None: | ||
pass | ||
|
||
def upload_image(self, image): | ||
pass | ||
|
||
def check_cluster_exist(self) -> bool: | ||
kevin85421 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
"""Check whether cluster exists or not""" | ||
return ( | ||
shell_subprocess_run( | ||
"kubectl cluster-info", check=False | ||
) | ||
== 0 | ||
) | ||
|
||
def __delete_all_crs(self, group, version, namespace, plural): | ||
custom_objects_api = self.k8s_client_dict[CONST.K8S_CR_CLIENT_KEY] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The custom resource lifecycles are better controlled by |
||
try: | ||
crs = custom_objects_api.list_namespaced_custom_object(group, version, namespace, plural) | ||
for cr in crs["items"]: | ||
name = cr["metadata"]["name"] | ||
custom_objects_api.delete_namespaced_custom_object(group, version, namespace, plural, name) | ||
except client.exceptions.ApiException: | ||
logger.info("CRD did not exist during clean up %s", plural) | ||
|
||
|
||
class KindClusterManager(ClusterManager): | ||
""" | ||
KubernetesClusterManager controls the lifecycle of KinD cluster and Kubernetes API client. | ||
KindClusterManager controlls the lifecycle of KinD cluster and Kubernetes API client. | ||
""" | ||
|
||
def __init__(self) -> None: | ||
self.k8s_client_dict = {} | ||
|
||
def delete_kind_cluster(self) -> None: | ||
def cleanup(self) -> None: | ||
"""Delete a KinD cluster""" | ||
shell_subprocess_run("kind delete cluster") | ||
for _, k8s_client in self.k8s_client_dict.items(): | ||
k8s_client.api_client.rest_client.pool_manager.clear() | ||
k8s_client.api_client.close() | ||
self.k8s_client_dict = {} | ||
|
||
def _adjust_kubeconfig_server_address(self) -> None: | ||
"""Modify the server address in kubeconfig to https://docker:6443""" | ||
if os.getenv(CONST.BUILDKITE_ENV, default="") == "true": | ||
shell_subprocess_run("kubectl config set clusters.kind-kind.server https://docker:6443") | ||
|
||
def create_kind_cluster(self, kind_config=None) -> None: | ||
def initialize_cluster(self, kind_config=None) -> None: | ||
"""Create a KinD cluster""" | ||
# To use NodePort service, `kind_config` needs to set `extraPortMappings` properly. | ||
kind_config = CONST.DEFAULT_KIND_CONFIG if not kind_config else kind_config | ||
|
@@ -99,6 +189,9 @@ def create_kind_cluster(self, kind_config=None) -> None: | |
} | ||
) | ||
|
||
def upload_image(self, image): | ||
shell_subprocess_run(f"kind load docker-image {image}") | ||
|
||
def check_cluster_exist(self) -> bool: | ||
"""Check whether KinD cluster exists or not""" | ||
return ( | ||
|
@@ -107,9 +200,13 @@ def check_cluster_exist(self) -> bool: | |
) | ||
== 0 | ||
) | ||
|
||
def _adjust_kubeconfig_server_address(self) -> None: | ||
"""Modify the server address in kubeconfig to https://docker:6443""" | ||
if os.getenv(CONST.BUILDKITE_ENV, default="") == "true": | ||
shell_subprocess_run("kubectl config set clusters.kind-kind.server https://docker:6443") | ||
|
||
|
||
K8S_CLUSTER_MANAGER = KubernetesClusterManager() | ||
K8S_CLUSTER_MANAGER = ClusterManager.instance() | ||
|
||
|
||
class OperatorManager: | ||
|
@@ -122,12 +219,15 @@ class OperatorManager: | |
namespace : A namespace(string) that KubeRay operator will be installed in. | ||
patch : A jsonpatch that will be applied to the default KubeRay operator config | ||
to create the custom config. | ||
cluster_manager : Cluster manager instance. | ||
""" | ||
|
||
def __init__( | ||
self, docker_image_dict, namespace="default", patch=jsonpatch.JsonPatch([]) | ||
self, docker_image_dict, namespace="default", patch=jsonpatch.JsonPatch([]), | ||
cluster_manager = K8S_CLUSTER_MANAGER | ||
) -> None: | ||
self.docker_image_dict = docker_image_dict | ||
self.cluster_manager = cluster_manager | ||
self.namespace = namespace | ||
self.values_yaml = {} | ||
for key in [CONST.OPERATOR_IMAGE_KEY, CONST.RAY_IMAGE_KEY]: | ||
|
@@ -175,7 +275,7 @@ def download_images(): | |
logger.info("Load images into KinD cluster") | ||
for key in self.docker_image_dict: | ||
image = self.docker_image_dict[key] | ||
shell_subprocess_run(f"kind load docker-image {image}") | ||
self.cluster_manager.upload_image(image) | ||
|
||
def __install_crd_and_operator(self): | ||
""" | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to
load_kube_config
again if we have already loaded the config ininitialize_cluster
?