Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into batched-worker-prov
Browse files Browse the repository at this point in the history
  • Loading branch information
Matt711 committed Jul 6, 2023
2 parents 34b31a3 + 22c17a9 commit 5651dff
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 146 deletions.
1 change: 1 addition & 0 deletions dask_kubernetes/operator/_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ class DaskWorkerGroup(APIObject):
singular = "daskworkergroup"
namespaced = True
scalable = True
scalable_spec = "worker.replicas"

async def pods(self) -> List[Pod]:
return await self.api.get(
Expand Down
216 changes: 71 additions & 145 deletions dask_kubernetes/operator/controller/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,16 @@

import aiohttp
import kopf
from kr8s.asyncio.objects import Pod
import kubernetes_asyncio as kubernetes
from importlib_metadata import entry_points
from kubernetes_asyncio.client import ApiException

from dask_kubernetes.operator._objects import DaskCluster
from dask_kubernetes.operator._objects import (
DaskCluster,
DaskAutoscaler,
DaskWorkerGroup,
)
from dask_kubernetes.common.auth import ClusterAuth
from dask_kubernetes.common.networking import get_scheduler_address
from distributed.core import rpc, clean_exception
Expand Down Expand Up @@ -576,25 +581,11 @@ async def get_desired_workers(scheduler_service_name, namespace, logger):

@kopf.on.field("daskcluster.kubernetes.dask.org", field="spec.worker.replicas")
async def daskcluster_default_worker_group_replica_update(
name, namespace, meta, spec, old, new, body, logger, **kwargs
name, namespace, old, new, **kwargs
):
if old is None:
return
worker_group_name = f"{name}-default"

async with kubernetes.client.api_client.ApiClient() as api_client:
custom_objects_api = kubernetes.client.CustomObjectsApi(api_client)
custom_objects_api.api_client.set_default_header(
"content-type", "application/merge-patch+json"
)
await custom_objects_api.patch_namespaced_custom_object_scale(
group="kubernetes.dask.org",
version="v1",
plural="daskworkergroups",
namespace=namespace,
name=worker_group_name,
body={"spec": {"replicas": new}},
)
if old is not None:
wg = await DaskWorkerGroup.get(f"{name}-default", namespace=namespace)
await wg.scale(new)


@kopf.on.field("daskworkergroup.kubernetes.dask.org", field="spec.worker.replicas")
Expand Down Expand Up @@ -901,150 +892,85 @@ async def handle_runner_status_change_succeeded(meta, namespace, logger, **kwarg


@kopf.on.create("daskautoscaler.kubernetes.dask.org")
async def daskautoscaler_create(spec, name, namespace, logger, **kwargs):
async def daskautoscaler_create(name, spec, namespace, logger, patch, **kwargs):
"""When an autoscaler is created make it a child of the associated cluster for cascade deletion."""
async with kubernetes.client.api_client.ApiClient() as api_client:
api = kubernetes.client.CustomObjectsApi(api_client)
cluster = await api.get_namespaced_custom_object(
group="kubernetes.dask.org",
version="v1",
plural="daskclusters",
namespace=namespace,
name=spec["cluster"],
)
new_spec = dict(spec)
kopf.adopt(new_spec, owner=cluster)
api.api_client.set_default_header(
"content-type", "application/merge-patch+json"
)
await api.patch_namespaced_custom_object(
group="kubernetes.dask.org",
version="v1",
plural="daskautoscalers",
namespace=namespace,
name=name,
body=new_spec,
)
logger.info(f"Successfully adopted by {spec['cluster']}")
cluster = await DaskCluster.get(spec["cluster"], namespace=namespace)
kopf.adopt(patch, owner=cluster.raw)
logger.info(f"Autoscaler {name} successfully adopted by cluster {spec['cluster']}")


@kopf.timer("daskautoscaler.kubernetes.dask.org", interval=5.0)
async def daskautoscaler_adapt(spec, name, namespace, logger, **kwargs):
async with kubernetes.client.api_client.ApiClient() as api_client:
coreapi = kubernetes.client.CoreV1Api(api_client)
try:
scheduler = await Pod.get(
label_selector={
"dask.org/component": "scheduler",
"dask.org/cluster-name": spec["cluster"],
},
)
if not await scheduler.ready():
raise ValueError()
except ValueError:
logger.info("Scheduler not ready, skipping autoscaling")
return

pod_ready = False
try:
pods = await coreapi.list_namespaced_pod(
namespace=namespace,
label_selector=f"dask.org/component=scheduler,dask.org/cluster-name={spec['cluster']}",
)
scheduler_pod = await coreapi.read_namespaced_pod(
pods.items[0].metadata.name, namespace
)
if scheduler_pod.status.phase == "Running":
pod_ready = True
except ApiException as e:
if e.status != 404:
raise e

if not pod_ready:
logger.info("Scheduler not ready, skipping autoscaling")
return
autoscaler = await DaskAutoscaler.get(name, namespace=namespace)
worker_group = await DaskWorkerGroup.get(
f"{spec['cluster']}-default", namespace=namespace
)

customobjectsapi = kubernetes.client.CustomObjectsApi(api_client)
customobjectsapi.api_client.set_default_header(
"content-type", "application/merge-patch+json"
current_replicas = worker_group.replicas
cooldown_until = float(
autoscaler.annotations.get(
DASK_AUTOSCALER_COOLDOWN_UNTIL_ANNOTATION, time.time()
)
)

autoscaler_resource = await customobjectsapi.get_namespaced_custom_object(
group="kubernetes.dask.org",
version="v1",
plural="daskautoscalers",
namespace=namespace,
name=name,
)
# Cooldown autoscaling to prevent thrashing
if time.time() < cooldown_until:
logger.debug("Autoscaler for %s is in cooldown", spec["cluster"])
return

worker_group_resource = await customobjectsapi.get_namespaced_custom_object(
group="kubernetes.dask.org",
version="v1",
plural="daskworkergroups",
# Ask the scheduler for the desired number of worker
try:
desired_workers = await get_desired_workers(
scheduler_service_name=f"{spec['cluster']}-scheduler",
namespace=namespace,
name=f"{spec['cluster']}-default",
)

current_replicas = int(worker_group_resource["spec"]["worker"]["replicas"])
cooldown_until = float(
autoscaler_resource.get("metadata", {})
.get("annotations", {})
.get(DASK_AUTOSCALER_COOLDOWN_UNTIL_ANNOTATION, time.time())
logger=logger,
)
except SchedulerCommError:
logger.error("Unable to get desired number of workers from scheduler.")
return

# Cooldown autoscaling to prevent thrashing
if time.time() < cooldown_until:
logger.debug("Autoscaler for %s is in cooldown", spec["cluster"])
return

# Ask the scheduler for the desired number of worker
try:
desired_workers = await get_desired_workers(
scheduler_service_name=f"{spec['cluster']}-scheduler",
namespace=namespace,
logger=logger,
)
except SchedulerCommError:
logger.error("Unable to get desired number of workers from scheduler.")
return

# Ensure the desired number is within the min and max
desired_workers = max(spec["minimum"], desired_workers)
desired_workers = min(spec["maximum"], desired_workers)
# Ensure the desired number is within the min and max
desired_workers = max(spec["minimum"], desired_workers)
desired_workers = min(spec["maximum"], desired_workers)

if current_replicas > 0:
max_scale_down = int(current_replicas * 0.25)
max_scale_down = 1 if max_scale_down == 0 else max_scale_down
desired_workers = max(current_replicas - max_scale_down, desired_workers)
if current_replicas > 0:
max_scale_down = int(current_replicas * 0.25)
max_scale_down = 1 if max_scale_down == 0 else max_scale_down
desired_workers = max(current_replicas - max_scale_down, desired_workers)

# Update the default DaskWorkerGroup
if desired_workers != current_replicas:
await customobjectsapi.patch_namespaced_custom_object_scale(
group="kubernetes.dask.org",
version="v1",
plural="daskworkergroups",
namespace=namespace,
name=f"{spec['cluster']}-default",
body={"spec": {"replicas": desired_workers}},
)
# Update the default DaskWorkerGroup
if desired_workers != current_replicas:
await worker_group.scale(desired_workers)

cooldown_until = time.time() + 15
cooldown_until = time.time() + 15

await customobjectsapi.patch_namespaced_custom_object(
group="kubernetes.dask.org",
version="v1",
plural="daskautoscalers",
namespace=namespace,
name=name,
body={
"metadata": {
"annotations": {
DASK_AUTOSCALER_COOLDOWN_UNTIL_ANNOTATION: str(
cooldown_until
)
}
}
},
)
await autoscaler.annotate(
{DASK_AUTOSCALER_COOLDOWN_UNTIL_ANNOTATION: str(cooldown_until)}
)

logger.info(
"Autoscaler updated %s worker count from %d to %d",
spec["cluster"],
current_replicas,
desired_workers,
)
else:
logger.debug(
"Not autoscaling %s with %d workers", spec["cluster"], current_replicas
)
logger.info(
"Autoscaler updated %s worker count from %d to %d",
spec["cluster"],
current_replicas,
desired_workers,
)
else:
logger.debug(
"Not autoscaling %s with %d workers", spec["cluster"], current_replicas
)


@kopf.timer("daskcluster.kubernetes.dask.org", interval=5.0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ rules:
resources: [services, services/status]
verbs: ["*"]

- apiGroups: ["apps"]
resources: [deployments, deployments/status]
verbs: ["*"]

- apiGroups: ["", events.k8s.io]
resources: [events]
verbs: ["*"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ rules:
resources: [services, services/status]
verbs: ["*"]

- apiGroups: ["apps"]
resources: [deployments, deployments/status]
verbs: ["*"]

- apiGroups: ["", events.k8s.io]
resources: [events]
verbs: ["*"]
Expand Down
1 change: 1 addition & 0 deletions requirements-test.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ git+https://codeberg.org/hjacobs/pytest-kind.git
pytest-timeout
pytest-rerunfailures
git+https://github.com/elemental-lf/[email protected]
jsonschema==4.17.3
dask[complete]
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ kubernetes-asyncio>=12.0.1
kopf>=1.35.3
pykube-ng>=22.9.0
rich>=12.5.1
kr8s==0.5.2
kr8s==0.8.1

0 comments on commit 5651dff

Please sign in to comment.