diff --git a/hack/python-sdk/post_gen.py b/hack/python-sdk/post_gen.py index 222032f3b3..6ff381735e 100755 --- a/hack/python-sdk/post_gen.py +++ b/hack/python-sdk/post_gen.py @@ -54,6 +54,7 @@ def add_imports() -> None: init_file.write("from kubeflow.training.api.tf_job_client import TFJobClient\n") init_file.write("from kubeflow.training.api.py_torch_job_client import PyTorchJobClient\n") init_file.write("from kubeflow.training.api.xgboost_job_client import XGBoostJobClient\n") + init_file.write("from kubeflow.training.api.mpi_job_client import MPIJobClient\n") with open(os.path.join(sdk_dir, "kubeflow/__init__.py"), "a") as init_file: init_file.write("__path__ = __import__('pkgutil').extend_path(__path__, __name__)") diff --git a/pkg/controller.v1/mpi/mpijob_controller.go b/pkg/controller.v1/mpi/mpijob_controller.go index 9fef70e989..26f91b761d 100644 --- a/pkg/controller.v1/mpi/mpijob_controller.go +++ b/pkg/controller.v1/mpi/mpijob_controller.go @@ -291,7 +291,7 @@ func (jc *MPIJobReconciler) GetDefaultContainerPortName() string { func (jc *MPIJobReconciler) IsMasterRole(replicas map[commonv1.ReplicaType]*commonv1.ReplicaSpec, rtype commonv1.ReplicaType, index int) bool { - return false + return string(rtype) == string(mpiv1.MPIReplicaTypeLauncher) } func (jc *MPIJobReconciler) GetJobFromInformerCache(namespace, name string) (metav1.Object, error) { @@ -1039,6 +1039,10 @@ func (jc *MPIJobReconciler) newLauncher(mpiJob *mpiv1.MPIJob, kubectlDeliveryIma genericLabels := jc.GenLabels(mpiJob.GetName()) labels := defaultLauncherLabels(genericLabels) + masterRole := jc.IsMasterRole(mpiJob.Spec.MPIReplicaSpecs, mpiv1.MPIReplicaTypeLauncher, 0) + if masterRole { + labels[commonv1.JobRoleLabel] = "master" + } podSpec := mpiJob.Spec.MPIReplicaSpecs[mpiv1.MPIReplicaTypeLauncher].Template.DeepCopy() // copy the labels and annotations to pod from PodTemplate if len(podSpec.Labels) == 0 { diff --git a/sdk/python/kubeflow/training/__init__.py b/sdk/python/kubeflow/training/__init__.py index ea5fccbf3a..a33f1c49dc 100644 --- a/sdk/python/kubeflow/training/__init__.py +++ b/sdk/python/kubeflow/training/__init__.py @@ -54,3 +54,4 @@ from kubeflow.training.api.tf_job_client import TFJobClient from kubeflow.training.api.py_torch_job_client import PyTorchJobClient from kubeflow.training.api.xgboost_job_client import XGBoostJobClient +from kubeflow.training.api.mpi_job_client import MPIJobClient diff --git a/sdk/python/kubeflow/training/api/mpi_job_client.py b/sdk/python/kubeflow/training/api/mpi_job_client.py new file mode 100644 index 0000000000..d47c089de1 --- /dev/null +++ b/sdk/python/kubeflow/training/api/mpi_job_client.py @@ -0,0 +1,434 @@ +# Copyright 2021 The Kubeflow Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import multiprocessing +import time +import logging +import threading +import queue + +from kubernetes import client, config +from kubernetes import watch as k8s_watch + +from kubeflow.training.constants import constants +from kubeflow.training.utils import utils + +from .mpi_job_watch import watch as mpijob_watch + +logging.basicConfig(format='%(message)s') +logging.getLogger().setLevel(logging.INFO) + + +def wrap_log_stream(q, stream): + while True: + try: + logline = next(stream) + q.put(logline) + except StopIteration: + q.put(None) + return + except Exception as e: + raise RuntimeError( + "Exception when calling CoreV1Api->read_namespaced_pod_log: %s\n" % e) + + +def get_log_queue_pool(streams): + pool = [] + for stream in streams: + q = queue.Queue(maxsize=100) + pool.append(q) + threading.Thread(target=wrap_log_stream, args=(q, stream)).start() + return pool + + +class MPIJobClient(object): + def __init__(self, config_file=None, context=None, # pylint: disable=too-many-arguments + client_configuration=None, persist_config=True): + """ + MPIJob client constructor + :param config_file: kubeconfig file, defaults to ~/.kube/config + :param context: kubernetes context + :param client_configuration: kubernetes configuration object + :param persist_config: + """ + if config_file or not utils.is_running_in_k8s(): + config.load_kube_config( + config_file=config_file, + context=context, + client_configuration=client_configuration, + persist_config=persist_config) + else: + config.load_incluster_config() + + self.custom_api = client.CustomObjectsApi() + self.core_api = client.CoreV1Api() + + def create(self, mpijob, namespace=None): + """ + Create the MPIJob + :param mpijob: mpijob object + :param namespace: defaults to current or default namespace + :return: created mpijob + """ + + if namespace is None: + namespace = utils.set_mpijob_namespace(mpijob) + + try: + outputs = self.custom_api.create_namespaced_custom_object( + constants.MPIJOB_GROUP, + constants.MPIJOB_VERSION, + namespace, + constants.MPIJOB_PLURAL, + mpijob) + except client.rest.ApiException as e: + raise RuntimeError( + "Exception when calling CustomObjectsApi->create_namespaced_custom_object:\ + %s\n" % e) + + return outputs + + def get(self, name=None, namespace=None, watch=False, + timeout_seconds=600): # pylint: disable=inconsistent-return-statements + """ + Get the mpijob + :param name: existing mpijob name, if not defined, the get all mpijobs in the namespace. + :param namespace: defaults to current or default namespace + :param watch: Watch the MPIJob if `True`. + :param timeout_seconds: How long to watch the job.. + :return: mpijob + """ + if namespace is None: + namespace = utils.get_default_target_namespace() + + if name: + if watch: + mpijob_watch( + name=name, + namespace=namespace, + timeout_seconds=timeout_seconds) + else: + thread = self.custom_api.get_namespaced_custom_object( + constants.MPIJOB_GROUP, + constants.MPIJOB_VERSION, + namespace, + constants.MPIJOB_PLURAL, + name, + async_req=True) + + mpijob = None + try: + mpijob = thread.get(constants.APISERVER_TIMEOUT) + except multiprocessing.TimeoutError: + raise RuntimeError("Timeout trying to get MPIJob.") + except client.rest.ApiException as e: + raise RuntimeError( + "Exception when calling CustomObjectsApi->get_namespaced_custom_object:\ + %s\n" % e) + except Exception as e: + raise RuntimeError( + "There was a problem to get MPIJob {0} in namespace {1}. Exception: \ + {2} ".format(name, namespace, e)) + return mpijob + else: + if watch: + mpijob_watch( + namespace=namespace, + timeout_seconds=timeout_seconds) + else: + thread = self.custom_api.list_namespaced_custom_object( + constants.MPIJOB_GROUP, + constants.MPIJOB_VERSION, + namespace, + constants.MPIJOB_PLURAL, + async_req=True) + + mpijobs = None + try: + mpijobs = thread.get(constants.APISERVER_TIMEOUT) + except multiprocessing.TimeoutError: + raise RuntimeError("Timeout trying to get MPIJob.") + except client.rest.ApiException as e: + raise RuntimeError( + "Exception when calling CustomObjectsApi->list_namespaced_custom_object:\ + %s\n" % e) + except Exception as e: + raise RuntimeError( + "There was a problem to list MPIJobs in namespace {0}. \ + Exception: {1} ".format(namespace, e)) + return mpijobs + + def patch(self, name, mpijob, namespace=None): + """ + Patch existing mpijob + :param name: existing mpijob name + :param mpijob: patched mpijob + :param namespace: defaults to current or default namespace + :return: patched mpijob + """ + if namespace is None: + namespace = utils.set_mpijob_namespace(mpijob) + + try: + outputs = self.custom_api.patch_namespaced_custom_object( + constants.MPIJOB_GROUP, + constants.MPIJOB_VERSION, + namespace, + constants.MPIJOB_PLURAL, + name, + mpijob) + except client.rest.ApiException as e: + raise RuntimeError( + "Exception when calling CustomObjectsApi->patch_namespaced_custom_object:\ + %s\n" % e) + + return outputs + + def delete(self, name, namespace=None): + """ + Delete the mpijob + :param name: mpijob name + :param namespace: defaults to current or default namespace + :return: + """ + if namespace is None: + namespace = utils.get_default_target_namespace() + + try: + return self.custom_api.delete_namespaced_custom_object( + group=constants.MPIJOB_GROUP, + version=constants.MPIJOB_VERSION, + namespace=namespace, + plural=constants.MPIJOB_PLURAL, + name=name, + body=client.V1DeleteOptions()) + except client.rest.ApiException as e: + raise RuntimeError( + "Exception when calling CustomObjectsApi->delete_namespaced_custom_object:\ + %s\n" % e) + + def wait_for_job(self, name, # pylint: disable=inconsistent-return-statements + namespace=None, + timeout_seconds=600, + polling_interval=30, + watch=False, + status_callback=None): + """Wait for the specified job to finish. + + :param name: Name of the TfJob. + :param namespace: defaults to current or default namespace. + :param timeout_seconds: How long to wait for the job. + :param polling_interval: How often to poll for the status of the job. + :param watch: Watch the MPIJob if `True`. + :param status_callback: (Optional): Callable. If supplied this callable is + invoked after we poll the job. Callable takes a single argument which + is the job. + :return: + """ + if namespace is None: + namespace = utils.get_default_target_namespace() + + if watch: + mpijob_watch( + name=name, + namespace=namespace, + timeout_seconds=timeout_seconds) + else: + return self.wait_for_condition( + name, + ["Succeeded", "Failed"], + namespace=namespace, + timeout_seconds=timeout_seconds, + polling_interval=polling_interval, + status_callback=status_callback) + + def wait_for_condition(self, name, + expected_condition, + namespace=None, + timeout_seconds=600, + polling_interval=30, + status_callback=None): + """Waits until any of the specified conditions occur. + + :param name: Name of the job. + :param expected_condition: A list of conditions. Function waits until any of the + supplied conditions is reached. + :param namespace: defaults to current or default namespace. + :param timeout_seconds: How long to wait for the job. + :param polling_interval: How often to poll for the status of the job. + :param status_callback: (Optional): Callable. If supplied this callable is + invoked after we poll the job. Callable takes a single argument which + is the job. + :return: Object MPIJob status + """ + + if namespace is None: + namespace = utils.get_default_target_namespace() + + for _ in range(round(timeout_seconds / polling_interval)): + + mpijob = None + mpijob = self.get(name, namespace=namespace) + + if mpijob: + if status_callback: + status_callback(mpijob) + + # If we poll the CRD quick enough status won't have been set yet. + conditions = mpijob.get("status", {}).get("conditions", []) + # Conditions might have a value of None in status. + conditions = conditions or [] + for c in conditions: + if c.get("type", "") in expected_condition: + return mpijob + + time.sleep(polling_interval) + + raise RuntimeError( + "Timeout waiting for MPIJob {0} in namespace {1} to enter one of the " + "conditions {2}.".format(name, namespace, expected_condition), mpijob) + + def get_job_status(self, name, namespace=None): + """Returns MPIJob status, such as Running, Failed or Succeeded. + + :param name: The MPIJob name. + :param namespace: defaults to current or default namespace. + :return: Object MPIJob status + """ + if namespace is None: + namespace = utils.get_default_target_namespace() + + mpijob = self.get(name, namespace=namespace) + last_condition = mpijob.get("status", {}).get("conditions", [{}])[-1] + return last_condition.get("type", "") + + def is_job_running(self, name, namespace=None): + """Returns true if the MPIJob running; false otherwise. + + :param name: The MPIJob name. + :param namespace: defaults to current or default namespace. + :return: True or False + """ + mpijob_status = self.get_job_status(name, namespace=namespace) + return mpijob_status.lower() == "running" + + def is_job_succeeded(self, name, namespace=None): + """Returns true if the MPIJob succeeded; false otherwise. + + :param name: The MPIJob name. + :param namespace: defaults to current or default namespace. + :return: True or False + """ + mpijob_status = self.get_job_status(name, namespace=namespace) + return mpijob_status.lower() == "succeeded" + + def get_pod_names(self, name, namespace=None, master=False, # pylint: disable=inconsistent-return-statements + replica_type=None, replica_index=None): + """ + Get pod names of MPIJob. + :param name: mpijob name + :param namespace: defaults to current or default namespace. + :param master: Only get pod with label 'job-role: master' pod if True. + :param replica_type: User can specify one of 'worker, ps, chief' to only get one type pods. + By default get all type pods. + :param replica_index: User can specfy replica index to get one pod of MPIJob. + :return: set: pods name + """ + + if namespace is None: + namespace = utils.get_default_target_namespace() + + labels = utils.get_job_labels(name, master=master, + replica_type=replica_type, + replica_index=replica_index) + try: + resp = self.core_api.list_namespaced_pod( + namespace, label_selector=utils.to_selector(labels)) + except client.rest.ApiException as e: + raise RuntimeError( + "Exception when calling CoreV1Api->read_namespaced_pod_log: %s\n" % e) + + pod_names = [] + for pod in resp.items: + if pod.metadata and pod.metadata.name: + pod_names.append(pod.metadata.name) + + if not pod_names: + logging.warning("Not found Pods of the MPIJob %s with the labels %s.", name, labels) + else: + return set(pod_names) + + def get_logs(self, name, namespace=None, master=True, + replica_type=None, replica_index=None, + follow=False, container="mpi"): + """ + Get training logs of the MPIJob. + By default only get the logs of Pod that has labels 'job-role: master'. + :param container: container name + :param name: mpijob name + :param namespace: defaults to current or default namespace. + :param master: By default get pod with label 'job-role: master' pod if True. + If need to get more Pod Logs, set False. + :param replica_type: User can specify one of 'worker, ps, chief' to only get one type pods. + By default get all type pods. + :param replica_index: User can specfy replica index to get one pod of MPIJob. + :param follow: Follow the log stream of the pod. Defaults to false. + :return: str: pods logs + """ + + if namespace is None: + namespace = utils.get_default_target_namespace() + + pod_names = list(self.get_pod_names(name, namespace=namespace, + master=master, + replica_type=replica_type, + replica_index=replica_index)) + if pod_names: + if follow: + log_streams = [] + for pod in pod_names: + log_streams.append(k8s_watch.Watch().stream(self.core_api.read_namespaced_pod_log, + name=pod, namespace=namespace, container=container)) + finished = [False for _ in log_streams] + + # create thread and queue per stream, for non-blocking iteration + log_queue_pool = get_log_queue_pool(log_streams) + + # iterate over every watching pods' log queue + while True: + for index, log_queue in enumerate(log_queue_pool): + if all(finished): + return + if finished[index]: + continue + # grouping the every 50 log lines of the same pod + for _ in range(50): + try: + logline = log_queue.get(timeout=1) + if logline is None: + finished[index] = True + break + logging.info("[Pod %s]: %s", pod_names[index], logline) + except queue.Empty: + break + else: + for pod in pod_names: + try: + pod_logs = self.core_api.read_namespaced_pod_log(pod, namespace, container=container) + logging.info("The logs of Pod %s:\n %s", pod, pod_logs) + except client.rest.ApiException as e: + raise RuntimeError( + "Exception when calling CoreV1Api->read_namespaced_pod_log: %s\n" % e) + else: + raise RuntimeError("Not found Pods of the MPIJob {} " + "in namespace {}".format(name, namespace)) diff --git a/sdk/python/kubeflow/training/api/mpi_job_watch.py b/sdk/python/kubeflow/training/api/mpi_job_watch.py new file mode 100644 index 0000000000..7c2eda6482 --- /dev/null +++ b/sdk/python/kubeflow/training/api/mpi_job_watch.py @@ -0,0 +1,58 @@ +# Copyright 2021 The Kubeflow Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import retrying +from kubernetes import client +from kubernetes import watch as k8s_watch + +from kubeflow.training.constants import constants +from kubeflow.training.utils import utils + +tbl = utils.TableLogger( + header="{:<30.30} {:<20.20} {:<30.30}".format('NAME', 'STATE', 'TIME'), + column_format="{:<30.30} {:<20.20} {:<30.30}") + + +@retrying.retry(wait_fixed=1000, stop_max_attempt_number=20) +def watch(name=None, namespace=None, timeout_seconds=600): + """Watch the created or patched InferenceService in the specified namespace""" + + if namespace is None: + namespace = utils.get_default_target_namespace() + + stream = k8s_watch.Watch().stream( + client.CustomObjectsApi().list_namespaced_custom_object, + constants.MPIJOB_GROUP, + constants.MPIJOB_VERSION, + namespace, + constants.MPIJOB_PLURAL, + timeout_seconds=timeout_seconds) + + for event in stream: + mpijob = event['object'] + mpijob_name = mpijob['metadata']['name'] + if name and name != mpijob_name: + continue + else: + status = '' + update_time = '' + last_condition = mpijob.get('status', {}).get('conditions', [{}])[-1] + status = last_condition.get('type', '') + update_time = last_condition.get('lastTransitionTime', '') + + tbl(mpijob_name, status, update_time) + + if name == mpijob_name: + if status in [constants.JOB_STATUS_SUCCEEDED, constants.JOB_STATUS_FAILED]: + break diff --git a/sdk/python/kubeflow/training/constants/constants.py b/sdk/python/kubeflow/training/constants/constants.py index 47a47e4503..57060eeb19 100644 --- a/sdk/python/kubeflow/training/constants/constants.py +++ b/sdk/python/kubeflow/training/constants/constants.py @@ -28,9 +28,9 @@ # Job Label Names JOB_GROUP_LABEL = 'group-name' -JOB_NAME_LABEL = 'job-name' -JOB_TYPE_LABEL = 'replica-type' -JOB_INDEX_LABEL = 'replica-index' +JOB_NAME_LABEL = 'training.kubeflow.org/job-name' +JOB_TYPE_LABEL = 'training.kubeflow.org/replica-type' +JOB_INDEX_LABEL = 'training.kubeflow.org/replica-index' JOB_ROLE_LABEL = 'training.kubeflow.org/job-role' JOB_STATUS_SUCCEEDED = 'Succeeded' @@ -52,3 +52,11 @@ XGBOOSTJOB_VERSION = os.environ.get('XGBOOSTJOB_VERSION', 'v1') XGBOOST_LOGLEVEL = os.environ.get('XGBOOSTJOB_LOGLEVEL', 'INFO').upper() + +# MPIJob K8S constants +MPIJOB_GROUP = 'kubeflow.org' +MPIJOB_KIND = 'MPIJob' +MPIJOB_PLURAL = 'mpijobs' +MPIJOB_VERSION = os.environ.get('MPIJOB_VERSION', 'v1') + +MPI_LOGLEVEL = os.environ.get('MPIJOB_LOGLEVEL', 'INFO').upper() diff --git a/sdk/python/kubeflow/training/utils/utils.py b/sdk/python/kubeflow/training/utils/utils.py index 939f6f4228..b110e9c32f 100644 --- a/sdk/python/kubeflow/training/utils/utils.py +++ b/sdk/python/kubeflow/training/utils/utils.py @@ -48,11 +48,16 @@ def set_xgboostjob_namespace(xgboostjob): namespace = xgboostjob_namespace or get_default_target_namespace() return namespace +def set_mpijob_namespace(mpijob): + mpijob_namespace = mpijob.metadata.namespace + namespace = mpijob_namespace or get_default_target_namespace() + return namespace + def get_job_labels(name, master=False, replica_type=None, replica_index=None): """ Get labels according to specified flags. :param name: job name - :param master: if need include label 'job-role: master'. + :param master: if need include label 'training.kubeflow.org/job-role: master'. :param replica_type: Replica type according to the job type (master, worker, chief, ps etc). :param replica_index: Can specify replica index to get one pod of the job. :return: Dict: Labels @@ -61,7 +66,6 @@ def get_job_labels(name, master=False, replica_type=None, replica_index=None): constants.JOB_GROUP_LABEL: 'kubeflow.org', constants.JOB_NAME_LABEL: name, } - if master: labels[constants.JOB_ROLE_LABEL] = 'master' diff --git a/sdk/python/test/e2e/test_e2e_mpijob.py b/sdk/python/test/e2e/test_e2e_mpijob.py new file mode 100644 index 0000000000..a7a57c8dce --- /dev/null +++ b/sdk/python/test/e2e/test_e2e_mpijob.py @@ -0,0 +1,99 @@ +# Copyright 2021 kubeflow.org. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os + +from kubernetes.client import V1PodTemplateSpec +from kubernetes.client import V1ObjectMeta +from kubernetes.client import V1PodSpec +from kubernetes.client import V1Container + +from kubeflow.training import MPIJobClient +from kubeflow.training import V1ReplicaSpec +from kubeflow.training import V1MPIJob +from kubeflow.training import V1MPIJobSpec +from kubeflow.training import V1RunPolicy + +MPI_CLIENT = MPIJobClient(config_file=os.getenv('KUBECONFIG', '~/.kube/config')) +SDK_TEST_NAMESPACE = 'default' + + +def test_sdk_e2e(): + master_container = V1Container( + name="mpi", + image="horovod/horovod:0.20.0-tf2.3.0-torch1.6.0-mxnet1.5.0-py3.7-cpu", + command=["mpirun"], + args=["-np", "1", + "--allow-run-as-root", + "-bind-to", "none", + "-map-by", "slot", + "-x", "LD_LIBRARY_PATH", + "-x", "PATH", + "-mca", "pml", "ob1", + "-mca", "btl", "^openib", + #"python", "/examples/tensorflow2_mnist.py"] + "python", "/examples/pytorch_mnist.py", + "--epochs","1"] + ) + + worker_container = V1Container( + name="mpi", + image="horovod/horovod:0.20.0-tf2.3.0-torch1.6.0-mxnet1.5.0-py3.7-cpu", + ) + + + master = V1ReplicaSpec( + replicas=1, + restart_policy="Never", + template=V1PodTemplateSpec( + spec=V1PodSpec( + containers=[master_container] + ) + ) + ) + + worker = V1ReplicaSpec( + replicas=1, + restart_policy="Never", + template=V1PodTemplateSpec( + spec=V1PodSpec( + containers=[worker_container] + ) + ) + ) + + mpijob = V1MPIJob( + api_version="kubeflow.org/v1", + kind="MPIJob", + metadata=V1ObjectMeta(name="mpijob-mxnet-ci-test", namespace=SDK_TEST_NAMESPACE), + spec=V1MPIJobSpec( + slots_per_worker=1, + run_policy=V1RunPolicy( + clean_pod_policy="None", + ), + mpi_replica_specs={"Launcher": master, + "Worker": worker} + ) + ) + + MPI_CLIENT.create(mpijob) + + MPI_CLIENT.wait_for_job("mpijob-mxnet-ci-test", namespace=SDK_TEST_NAMESPACE) + if not MPI_CLIENT.is_job_succeeded("mpijob-mxnet-ci-test", + namespace=SDK_TEST_NAMESPACE): + raise RuntimeError("The MPIJob is not succeeded.") + + MPI_CLIENT.get_logs("mpijob-mxnet-ci-test", namespace=SDK_TEST_NAMESPACE) + + MPI_CLIENT.delete("mpijob-mxnet-ci-test", namespace=SDK_TEST_NAMESPACE)