diff --git a/dask_kubernetes/operator/controller/controller.py b/dask_kubernetes/operator/controller/controller.py index ab1d2c482..46271ae44 100644 --- a/dask_kubernetes/operator/controller/controller.py +++ b/dask_kubernetes/operator/controller/controller.py @@ -8,8 +8,7 @@ import aiohttp import kopf import kr8s -from kr8s.asyncio.objects import Pod, Deployment -import kubernetes_asyncio as kubernetes +from kr8s.asyncio.objects import Pod, Deployment, Service from importlib_metadata import entry_points from dask_kubernetes.operator._objects import ( @@ -287,79 +286,52 @@ async def daskcluster_create_components( ): """When the DaskCluster status.phase goes into Created create the cluster components.""" logger.info("Creating Dask cluster components.") - async with kubernetes.client.api_client.ApiClient() as api_client: - api = kubernetes.client.CoreV1Api(api_client) - custom_api = kubernetes.client.CustomObjectsApi(api_client) - annotations = _get_annotations(meta) - labels = _get_labels(meta) - scheduler_spec = spec.get("scheduler", {}) - if "metadata" in scheduler_spec: - if "annotations" in scheduler_spec["metadata"]: - annotations.update(**scheduler_spec["metadata"]["annotations"]) - if "labels" in scheduler_spec["metadata"]: - labels.update(**scheduler_spec["metadata"]["labels"]) - data = build_scheduler_deployment_spec( - name, namespace, scheduler_spec.get("spec"), annotations, labels - ) - kopf.adopt(data) - pod = await api.list_namespaced_pod( - namespace=namespace, - label_selector=f"dask.org/component=scheduler,dask.org/cluster-name={name}", - ) - if not pod.items: - await kubernetes.client.AppsV1Api(api_client).create_namespaced_deployment( - namespace=namespace, - body=data, - ) - logger.info( - f"Scheduler deployment {data['metadata']['name']} created in {namespace}." - ) + # Create scheduler deployment + annotations = _get_annotations(meta) + labels = _get_labels(meta) + scheduler_spec = spec.get("scheduler", {}) + if "metadata" in scheduler_spec: + if "annotations" in scheduler_spec["metadata"]: + annotations.update(**scheduler_spec["metadata"]["annotations"]) + if "labels" in scheduler_spec["metadata"]: + labels.update(**scheduler_spec["metadata"]["labels"]) + data = build_scheduler_deployment_spec( + name, namespace, scheduler_spec.get("spec"), annotations, labels + ) + kopf.adopt(data) + scheduler_deployment = await Deployment(data, namespace=namespace) + if not await scheduler_deployment.exists(): + await scheduler_deployment.create() + logger.info( + f"Scheduler deployment {scheduler_deployment.name} created in {namespace}." + ) - data = build_scheduler_service_spec( - name, scheduler_spec.get("service"), annotations, labels - ) - kopf.adopt(data) - service = await api.list_namespaced_service( - namespace=namespace, - label_selector=f"dask.org/component=scheduler,dask.org/cluster-name={name}", - ) - if not service.items: - await api.create_namespaced_service( - namespace=namespace, - body=data, - ) - logger.info( - f"Scheduler service {data['metadata']['name']} created in {namespace}." - ) + # Create scheduler service + data = build_scheduler_service_spec( + name, scheduler_spec.get("service"), annotations, labels + ) + kopf.adopt(data) + scheduler_service = await Service(data, namespace=namespace) + if not await scheduler_service.exists(): + await scheduler_service.create() + logger.info(f"Scheduler service {data['metadata']['name']} created in {namespace}.") + + # Create default worker group + worker_spec = spec.get("worker", {}) + annotations = _get_annotations(meta) + labels = _get_labels(meta) + if "metadata" in worker_spec: + if "annotations" in worker_spec["metadata"]: + annotations.update(**worker_spec["metadata"]["annotations"]) + if "labels" in worker_spec["metadata"]: + labels.update(**worker_spec["metadata"]["labels"]) + data = build_default_worker_group_spec(name, worker_spec, annotations, labels) + worker_group = await DaskWorkerGroup(data, namespace=namespace) + if not await worker_group.exists(): + await worker_group.create() + logger.info(f"Worker group {data['metadata']['name']} created in {namespace}.") - worker_spec = spec.get("worker", {}) - annotations = _get_annotations(meta) - labels = _get_labels(meta) - if "metadata" in worker_spec: - if "annotations" in worker_spec["metadata"]: - annotations.update(**worker_spec["metadata"]["annotations"]) - if "labels" in worker_spec["metadata"]: - labels.update(**worker_spec["metadata"]["labels"]) - data = build_default_worker_group_spec(name, worker_spec, annotations, labels) - worker_group = await custom_api.list_namespaced_custom_object( - group="kubernetes.dask.org", - version="v1", - plural="daskworkergroups", - namespace=namespace, - label_selector=f"dask.org/component=workergroup,dask.org/cluster-name={name}", - ) - if not worker_group["items"]: - await custom_api.create_namespaced_custom_object( - group="kubernetes.dask.org", - version="v1", - plural="daskworkergroups", - namespace=namespace, - body=data, - ) - logger.info( - f"Worker group {data['metadata']['name']} created in {namespace}." - ) patch.status["phase"] = "Pending" @@ -382,8 +354,8 @@ async def handle_scheduler_service_status( @kopf.on.create("daskworkergroup.kubernetes.dask.org") -async def daskworkergroup_create(body, logger, **kwargs): - wg = await DaskWorkerGroup(body) +async def daskworkergroup_create(body, namespace, logger, **kwargs): + wg = await DaskWorkerGroup(body, namespace=namespace) cluster = await wg.cluster() await cluster.adopt(wg) logger.info(f"Successfully adopted by {cluster.name}") @@ -393,6 +365,7 @@ async def daskworkergroup_create(body, logger, **kwargs): body=body, logger=logger, new=wg.replicas, + namespace=namespace, **kwargs, )