Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make KubeCluster more configurable #26

Merged
merged 17 commits into from
Feb 5, 2018
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 152 additions & 25 deletions daskernetes/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,29 @@

logger = logging.getLogger(__name__)

def merge_dictionaries(a, b, path=None, update=True):
"""
Merge two dictionaries recursively.

From https://stackoverflow.com/a/25270947
"""
if path is None: path = []
for key in b:
if key in a:
if isinstance(a[key], dict) and isinstance(b[key], dict):
merge(a[key], b[key], path + [str(key)])
elif a[key] == b[key]:
pass # same leaf value
elif isinstance(a[key], list) and isinstance(b[key], list):
for idx, val in enumerate(b[key]):
a[key][idx] = merge(a[key][idx], b[key][idx], path + [str(key), str(idx)], update=update)
elif update:
a[key] = b[key]
else:
raise Exception('Conflict at %s' % '.'.join(path + [str(key)]))
else:
a[key] = b[key]
return a

class KubeCluster(object):
""" Launch a Dask cluster on Kubernetes
Expand All @@ -30,9 +53,9 @@ class KubeCluster(object):
namespace: str
Namespace in which to launch the workers. Defaults to current
namespace if available or "default"
worker_image: str
image: str
Docker image and tag
worker_labels: dict
labels: dict
Additional labels to add to pod
n_workers: int
Number of workers on initial launch. Use ``scale_up`` in the future
Expand All @@ -41,6 +64,18 @@ class KubeCluster(object):
Listen address for local scheduler. Defaults to 0.0.0.0
port: int
Port of local scheduler
extra_container_config: dict
Dict of properties to be deep merged into container spec
extra_pod_config: dict
Dict of properties to be deep merged into the pod spec
memory_limit: str
Max amount of memory *each* dask worker can use
memory_request: str
Min amount of memory *each* dask worker should be guaranteed
cpu_limit: str
Max amount of CPU cores each dask worker can use
cpu_request: str
Min amount of CPU cores each dask worker should be guaranteed
**kwargs: dict
Additional keyword arguments to pass to LocalCluster

Expand All @@ -57,13 +92,19 @@ def __init__(
self,
name=None,
namespace=None,
worker_image='daskdev/dask:latest',
worker_labels=None,
image='daskdev/dask:latest',
labels=None,
n_workers=0,
threads_per_worker=1,
host='0.0.0.0',
port=8786,
port=0,
env={},
extra_container_config={},
extra_pod_config={},
memory_limit=None,
memory_request=None,
cpu_limit=None,
cpu_request=None,
**kwargs,
):
self.cluster = LocalCluster(ip=host or socket.gethostname(),
Expand All @@ -75,7 +116,7 @@ def __init__(
except config.ConfigException:
config.load_kube_config()

self.api = client.CoreV1Api()
self.core_api = client.CoreV1Api()

if namespace is None:
namespace = _namespace_default()
Expand All @@ -85,17 +126,23 @@ def __init__(

self.namespace = namespace
self.name = name
self.worker_image = worker_image
self.worker_labels = (worker_labels or {}).copy()
self.image = image
self.labels = (labels or {}).copy()
self.threads_per_worker = threads_per_worker
self.env = dict(env)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want to we can continue supporting these kinds of common configurations by modifying the template in the _make_pod method. I don't know if this is desired, but it might be useful if there are parameters that we expect users to want to change?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I'm going to move the extra_* stuff to the constructor, and just have a 'common configuration' thing as another classmethod.

self.cpu_limit = cpu_limit
self.memory_limit = memory_limit
self.cpu_request = cpu_request
self.memory_request = memory_request
self.extra_pod_config = extra_pod_config
self.extra_container_config = extra_container_config

# Default labels that can't be overwritten
self.worker_labels['org.pydata.dask/cluster-name'] = name
self.worker_labels['app'] = 'dask'
self.worker_labels['component'] = 'dask-worker'
self.labels['dask.pydata.org/cluster-name'] = name
self.labels['app'] = 'dask'
self.labels['component'] = 'dask-worker'

finalize(self, cleanup_pods, self.namespace, self.worker_labels)
finalize(self, cleanup_pods, self.namespace, self.labels)

self._cached_widget = None

Expand Down Expand Up @@ -142,17 +189,17 @@ def scheduler_address(self):
return self.scheduler.address

def _make_pod(self):
return client.V1Pod(
pod = client.V1Pod(
metadata=client.V1ObjectMeta(
generate_name=self.name + '-',
labels=self.worker_labels
labels=self.labels
),
spec=client.V1PodSpec(
restart_policy='Never',
containers=[
client.V1Container(
name='dask-worker',
image=self.worker_image,
image=self.image,
args=[
'dask-worker',
self.scheduler_address,
Expand All @@ -165,24 +212,104 @@ def _make_pod(self):
)
)


resources = client.V1ResourceRequirements(limits={}, requests={})

if self.cpu_request:
resources.requests['cpu'] = self.cpu_request
if self.memory_request:
resources.requests['memory'] = self.memory_request

if self.cpu_limit:
resources.limits['cpu'] = self.cpu_limit
if self.memory_limit:
resources.limits['memory'] = self.memory_limit

pod.spec.containers[0].resources = resources

for key, value in self.extra_container_config.items():
self._set_k8s_attribute(
pod.spec.containers[0],
key,
value
)

for key, value in self.extra_pod_config.items():
self._set_k8s_attribute(
pod.spec,
key,
value
)
return pod

def _set_k8s_attribute(self, obj, attribute, value):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like the only reference to self is to get out core_api. Perhaps this should be a standalone function that accepts an api object? That might also make it easier to test.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea, I'll do that!

I think the serialize function there could be a static or classmethod, I'll file a bug upstream.

"""
Set a specific value on a kubernetes object's attribute

obj
an object from Kubernetes Python API client
attribute
Either be name of a python client class attribute (api_client)
or attribute name from JSON Kubernetes API (apiClient)
value
Can be anything (string, list, dict, k8s objects) that can be
accepted by the k8s python client
"""
# FIXME: We should be doing a recursive merge here

current_value = None
attribute_name = None
# All k8s python client objects have an 'attribute_map' property
# which has as keys python style attribute names (api_client)
# and as values the kubernetes JSON API style attribute names
# (apiClient). We want to allow this to use either.
for python_attribute, json_attribute in obj.attribute_map.items():
if json_attribute == attribute or python_attribute == attribute:
attribute_name = python_attribute
break
else:
raise ValueError('Attribute must be one of {}'.format(obj.attribute_map.values()))

if hasattr(obj, attribute_name):
current_value = getattr(obj, python_attribute)

if current_value is not None:
# This will ensure that current_value is something JSONable,
# so a dict, list, or scalar
current_value = self.core_api.api_client.sanitize_for_serialization(
current_value
)

if isinstance(current_value, dict):
# Deep merge our dictionaries!
setattr(obj, attribute_name, merge_dictionaries(current_value, value))
elif isinstance(current_value, list):
# Just append lists
setattr(obj, attribute_name, current_value + value)
else:
# Replace everything else
setattr(obj, attribute_name, value)

def pods(self):
return self.api.list_namespaced_pod(
return self.core_api.list_namespaced_pod(
self.namespace,
label_selector=format_labels(self.worker_labels)
label_selector=format_labels(self.labels)
).items

def logs(self, pod):
return self.api.read_namespaced_pod_log(pod.metadata.name,
pod.metadata.namespace)
return self.core_api.read_namespaced_pod_log(pod.metadata.name,
pod.metadata.namespace)

def scale_up(self, n, **kwargs):
"""
Make sure we have n dask-workers available for this cluster
"""
pods = self.pods()

out = [self.api.create_namespaced_pod(self.namespace, self._make_pod())
for _ in range(n - len(pods))]
out = [
self.core_api.create_namespaced_pod(self.namespace, self._make_pod())
for _ in range(n - len(pods))
]

return out
# fixme: wait for this to be ready before returning!
Expand All @@ -208,7 +335,7 @@ def scale_down(self, workers):
return
for pod in to_delete:
try:
self.api.delete_namespaced_pod(
self.core_api.delete_namespaced_pod(
pod.metadata.name,
self.namespace,
client.V1DeleteOptions()
Expand All @@ -227,7 +354,7 @@ def close(self):
self.cluster.close()

def __exit__(self, type, value, traceback):
cleanup_pods(self.namespace, self.worker_labels)
cleanup_pods(self.namespace, self.labels)
self.cluster.__exit__(type, value, traceback)

def __del__(self):
Expand All @@ -242,9 +369,9 @@ def adapt(self):
return Adaptive(self.scheduler, self)


def cleanup_pods(namespace, worker_labels):
def cleanup_pods(namespace, labels):
api = client.CoreV1Api()
pods = api.list_namespaced_pod(namespace, label_selector=format_labels(worker_labels))
pods = api.list_namespaced_pod(namespace, label_selector=format_labels(labels))
for pod in pods.items:
try:
api.delete_namespaced_pod(pod.metadata.name, namespace,
Expand Down
108 changes: 108 additions & 0 deletions daskernetes/tests/test_objects.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
import pytest
from daskernetes import KubeCluster
from distributed.utils_test import loop


def test_extra_pod_config(loop):
"""
Test that our pod config merging process works fine
"""
cluster = KubeCluster(
loop=loop,
n_workers=0,
extra_pod_config={
'automountServiceAccountToken': False
}
)

pod = cluster._make_pod()

assert pod.spec.automount_service_account_token == False

def test_extra_container_config(loop):
"""
Test that our container config merging process works fine
"""
cluster = KubeCluster(
loop=loop,
n_workers=0,
extra_container_config={
'imagePullPolicy': 'IfNotReady',
'securityContext': {
'runAsUser': 0
}
}
)

pod = cluster._make_pod()

assert pod.spec.containers[0].image_pull_policy == 'IfNotReady'
assert pod.spec.containers[0].security_context == {
'runAsUser': 0
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm very glad to see this working well :)


def test_container_resources_config(loop):
"""
Test container resource requests / limits being set properly
"""
cluster = KubeCluster(
loop=loop,
n_workers=0,
memory_request="1G",
memory_limit="2G",
cpu_limit="2"
)

pod = cluster._make_pod()

assert pod.spec.containers[0].resources.requests['memory'] == '1G'
assert pod.spec.containers[0].resources.limits['memory'] == '2G'
assert pod.spec.containers[0].resources.limits['cpu'] == '2'
assert "cpu" not in pod.spec.containers[0].resources.requests

def test_extra_container_config_merge(loop):
"""
Test that our container config merging process works recursively fine
"""
cluster = KubeCluster(
loop=loop,
n_workers=0,
env={"TEST": "HI"},
extra_container_config={
"env": [ {"name": "BOO", "value": "FOO" } ],
"args": ["last-item"]
}
)

pod = cluster._make_pod()

assert pod.spec.containers[0].env == [
{ "name": "TEST", "value": "HI"},
{ "name": "BOO", "value": "FOO"}
]

assert pod.spec.containers[0].args[-1] == "last-item"


def test_extra_container_config_merge(loop):
"""
Test that our container config merging process works recursively fine
"""
cluster = KubeCluster(
loop=loop,
n_workers=0,
env={"TEST": "HI"},
extra_container_config={
"env": [ {"name": "BOO", "value": "FOO" } ],
"args": ["last-item"]
}
)

pod = cluster._make_pod()

assert pod.spec.containers[0].env == [
{ "name": "TEST", "value": "HI"},
{ "name": "BOO", "value": "FOO"}
]

assert pod.spec.containers[0].args[-1] == "last-item"