Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…o wlb/dask_worker
  • Loading branch information
jacobtomlinson committed Sep 16, 2024
2 parents ddf78ca + 6ca172e commit 7f5d48f
Show file tree
Hide file tree
Showing 37 changed files with 2,684 additions and 55 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ jobs:
fail-fast: true
matrix:
os: ["ubuntu-latest"]
python-version: ["3.8", "3.9", "3.10"]
python-version: ["3.10", "3.11", "3.12"]

steps:
- name: Checkout source
Expand Down Expand Up @@ -50,7 +50,7 @@ jobs:
uses: conda-incubator/setup-miniconda@v2
with:
miniconda-version: "latest"
python-version: "3.8"
python-version: "3.12"

- name: Run import tests
shell: bash -l {0}
Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ jobs:
- name: Checkout source
uses: actions/checkout@v2

- name: Set up Python 3.8
- name: Set up Python 3.12
uses: actions/setup-python@v1
with:
python-version: 3.8
python-version: 3.12

- name: Install pypa/build
run: python -m pip install build wheel
run: python -m pip install build wheel setuptools

- name: Build distributions
shell: bash -l {0}
Expand Down
4 changes: 2 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
repos:
- repo: https://github.com/psf/black
rev: 22.3.0
rev: 23.10.1
hooks:
- id: black
language_version: python3
exclude: versioneer.py
- repo: https://github.com/pycqa/flake8
rev: 3.9.2
rev: 6.1.0
hooks:
- id: flake8
language_version: python3
6 changes: 5 additions & 1 deletion .readthedocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ sphinx:
formats: all

python:
version: "3.8"
install:
- method: pip
path: .
Expand All @@ -16,3 +15,8 @@ python:

submodules:
include: all

build:
os: ubuntu-22.04
tools:
python: "3.11"
8 changes: 5 additions & 3 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ Dask Cloud Provider
:alt: Conda Forge


Native Cloud integration for Dask. This library intends to allow people to
create dask clusters on a given cloud provider with no set up other than having
credentials.
Native Cloud integration for Dask.

This library provides tools to enable Dask clusters to more natively integrate with the cloud.
It includes cluster managers to create dask clusters on a given cloud provider using native resources,
plugins to more closely integrate Dask components with the cloud platform they are running on and documentation to empower all folks running Dask on the cloud.
2 changes: 1 addition & 1 deletion ci/environment-3.8.yml → ci/environment-3.11.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ channels:
- defaults
- conda-forge
dependencies:
- python=3.8
- python=3.11
- nomkl
- pip
# Dask
Expand Down
2 changes: 1 addition & 1 deletion ci/environment-3.9.yml → ci/environment-3.12.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ channels:
- defaults
- conda-forge
dependencies:
- python=3.9
- python=3.12
- nomkl
- pip
# Dask
Expand Down
6 changes: 4 additions & 2 deletions ci/scripts/test_imports.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ set -o errexit


test_import () {
echo "Create environment: python=3.8 $1"
echo "Create environment: python=3.12 $1"
# Create an empty environment
conda create -q -y -n test-imports -c conda-forge python=3.8
conda create -q -y -n test-imports -c conda-forge python=3.12
conda activate test-imports
pip install -e .[$1]
echo "python -c '$2'"
Expand All @@ -19,3 +19,5 @@ test_import "aws" "import dask_cloudprovider.aws"
test_import "azure" "import dask_cloudprovider.azure"
test_import "digitalocean" "import dask_cloudprovider.digitalocean"
test_import "gcp" "import dask_cloudprovider.gcp"
test_import "ibm" "import dask_cloudprovider.ibm"
test_import "openstack" "import dask_cloudprovider.openstack"
2 changes: 1 addition & 1 deletion dask_cloudprovider/aws/ec2.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ async def configure_vm(self, client):
await asyncio.sleep(min(backoff, 10) + backoff % 1)
# Exponential backoff with a cap of 10 seconds and some jitter
backoff = backoff * 2
return self.instance[ip_address_key]
return self.instance[ip_address_key], None

async def destroy_vm(self):
boto_config = botocore.config.Config(retries=dict(max_attempts=10))
Expand Down
16 changes: 12 additions & 4 deletions dask_cloudprovider/aws/ecs.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ class Scheduler(Task):
Any extra command line arguments to pass to ``dask scheduler``, e.g. ``["--tls-cert", "/path/to/cert.pem"]``
Defaults to `None`, no extra command line arguments.
kwargs: Dict()
kwargs:
Other kwargs to be passed to :class:`Task`.
See :class:`Task` for parameter info.
Expand Down Expand Up @@ -414,7 +414,7 @@ class Worker(Task):
scheduler: str
The address of the scheduler
kwargs: Dict()
kwargs:
Other kwargs to be passed to :class:`Task`.
"""

Expand Down Expand Up @@ -486,6 +486,10 @@ class ECSCluster(SpecCluster, ConfigMixin):
The docker image to use for the scheduler and worker tasks.
Defaults to ``daskdev/dask:latest`` or ``rapidsai/rapidsai:latest`` if ``worker_gpu`` is set.
cpu_architecture: str (optional)
Runtime platform CPU architecture
Defaults to ``X86_64``.
scheduler_cpu: int (optional)
The amount of CPU to request for the scheduler in milli-cpu (1/1024).
Expand Down Expand Up @@ -680,7 +684,7 @@ class ECSCluster(SpecCluster, ConfigMixin):
mounted in worker tasks. This setting controls whether volumes are also mounted in the scheduler task.
Default ``False``.
**kwargs: dict
**kwargs:
Additional keyword arguments to pass to ``SpecCluster``.
Examples
Expand Down Expand Up @@ -714,6 +718,7 @@ def __init__(
fargate_workers=None,
fargate_spot=None,
image=None,
cpu_architecture="X86_64",
scheduler_cpu=None,
scheduler_mem=None,
scheduler_port=8786,
Expand Down Expand Up @@ -760,6 +765,7 @@ def __init__(
self._fargate_workers = fargate_workers
self._fargate_spot = fargate_spot
self.image = image
self._cpu_architecture = cpu_architecture.upper()
self._scheduler_cpu = scheduler_cpu
self._scheduler_mem = scheduler_mem
self._scheduler_port = scheduler_port
Expand Down Expand Up @@ -1226,6 +1232,7 @@ async def _create_scheduler_task_definition_arn(self):
if self._volumes and self._mount_volumes_on_scheduler
else [],
requiresCompatibilities=["FARGATE"] if self._fargate_scheduler else [],
runtimePlatform={"cpuArchitecture": self._cpu_architecture},
cpu=str(self._scheduler_cpu),
memory=str(self._scheduler_mem),
tags=dict_to_aws(self.tags),
Expand Down Expand Up @@ -1306,6 +1313,7 @@ async def _create_worker_task_definition_arn(self):
],
volumes=self._volumes if self._volumes else [],
requiresCompatibilities=["FARGATE"] if self._fargate_workers else [],
runtimePlatform={"cpuArchitecture": self._cpu_architecture},
cpu=str(self._worker_cpu),
memory=str(self._worker_mem),
tags=dict_to_aws(self.tags),
Expand Down Expand Up @@ -1397,7 +1405,7 @@ class FargateCluster(ECSCluster):
Parameters
----------
kwargs: dict
kwargs:
Keyword arguments to be passed to :class:`ECSCluster`.
Examples
Expand Down
6 changes: 3 additions & 3 deletions dask_cloudprovider/aws/tests/test_ec2.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ async def cluster_rapids():
# Deep Learning AMI (Ubuntu 18.04)
ami="ami-0c7c7d78f752f8f17",
# Python version must match local version and CUDA version must match AMI CUDA version
docker_image="rapidsai/rapidsai:cuda10.1-runtime-ubuntu18.04-py3.8",
docker_image="rapidsai/rapidsai:cuda10.1-runtime-ubuntu18.04-py3.9",
instance_type="p3.2xlarge",
bootstrap=False,
filesystem_size=120,
Expand All @@ -65,7 +65,7 @@ async def cluster_rapids_packer():
# Packer AMI
ami="ami-04e5539cb82859e69",
# Python version must match local version and CUDA version must match AMI CUDA version
docker_image="rapidsai/rapidsai:cuda10.1-runtime-ubuntu18.04-py3.8",
docker_image="rapidsai/rapidsai:cuda10.1-runtime-ubuntu18.04-py3.9",
instance_type="p3.2xlarge",
bootstrap=False,
filesystem_size=120,
Expand Down Expand Up @@ -202,7 +202,7 @@ async def test_get_cloud_init_rapids():
# Deep Learning AMI (Ubuntu 18.04)
ami="ami-0c7c7d78f752f8f17",
# Python version must match local version and CUDA version must match AMI CUDA version
docker_image="rapidsai/rapidsai:cuda10.1-runtime-ubuntu18.04-py3.8",
docker_image="rapidsai/rapidsai:cuda10.1-runtime-ubuntu18.04-py3.9",
instance_type="p3.2xlarge",
bootstrap=False,
filesystem_size=120,
Expand Down
9 changes: 6 additions & 3 deletions dask_cloudprovider/azure/azurevm.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,12 @@ async def create_vm(self):
self.nic.name,
)
self.cluster._log(f"Created VM {self.name}")

private_ip_address = self.nic.ip_configurations[0].private_ip_address
if self.public_ingress:
return self.public_ip.ip_address
return self.nic.ip_configurations[0].private_ip_address
return private_ip_address, self.public_ip.ip_address
else:
return private_ip_address, None

async def destroy_vm(self):
await self.cluster.call_async(
Expand Down Expand Up @@ -430,7 +433,7 @@ class AzureVMCluster(VMCluster):
... security_group="<security group>",
... n_workers=1,
... vm_size="Standard_NC12s_v3", # Or any NVIDIA GPU enabled size
... docker_image="rapidsai/rapidsai:cuda11.0-runtime-ubuntu18.04-py3.8",
... docker_image="rapidsai/rapidsai:cuda11.0-runtime-ubuntu18.04-py3.9",
... worker_class="dask_cuda.CUDAWorker")
>>> from dask.distributed import Client
>>> client = Client(cluster)
Expand Down
4 changes: 1 addition & 3 deletions dask_cloudprovider/azure/tests/test_azurevm.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ def inc(x):
@skip_without_credentials
@pytest.mark.external
async def test_create_cluster_sync():

with AzureVMCluster() as cluster:
with Client(cluster) as client:
cluster.scale(1)
Expand All @@ -84,10 +83,9 @@ def inc(x):
@skip_without_credentials
@pytest.mark.external
async def test_create_rapids_cluster_sync():

with AzureVMCluster(
vm_size="Standard_NC12s_v3",
docker_image="rapidsai/rapidsai:cuda11.0-runtime-ubuntu18.04-py3.8",
docker_image="rapidsai/rapidsai:cuda11.0-runtime-ubuntu18.04-py3.9",
worker_class="dask_cuda.CUDAWorker",
worker_options={"rmm_pool_size": "15GB"},
) as cluster:
Expand Down
29 changes: 29 additions & 0 deletions dask_cloudprovider/cloudprovider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ cloudprovider:
scheduler_timeout: "5 minutes" # Length of inactivity to wait before closing the cluster

image: "daskdev/dask:latest" # Docker image to use for non GPU tasks
cpu_architecture: "X86_64" # Runtime platform CPU architecture
gpu_image: "rapidsai/rapidsai:latest" # Docker image to use for GPU tasks
cluster_name_template: "dask-{uuid}" # Template to use when creating a cluster
cluster_arn: "" # ARN of existing ECS cluster to use (if not set one will be created)
Expand Down Expand Up @@ -107,6 +108,7 @@ cloudprovider:
public_ingress: true # configure the scheduler to be externally accessible. This assumes firefwall rules for 8787 and 8786
instance_labels:
container_vm: "dask-cloudprovider"
service_account: "default"

hetzner:
token: null # API token for interacting with the Hetzner cloud API
Expand All @@ -115,3 +117,30 @@ cloudprovider:
image: "ubuntu-20.04" # Operating System image to use
docker_image: "daskdev/dask:latest" # docker image to use
bootstrap: true # It is assumed that the OS image does not have Docker and needs bootstrapping. Set this to false if using a custom image with Docker already installed.

ibm:
api_key: null
image: "ghcr.io/dask/dask:latest"
region: us-east
project_id: null
scheduler_cpu: "1.0"
scheduler_mem: 4G
scheduler_timeout: 600 # seconds
worker_cpu: "2.0"
worker_mem: 8G
worker_threads: 1

openstack:
region: "RegionOne" # The name of the region where resources will be allocated in OpenStack. List available regions using: `openstack region list`.
size: null # Openstack flavors define the compute, memory, and storage capacity of computing instances. List available flavors using: `openstack flavor list`
auth_url: null # The authentication URL for the OpenStack Identity service (Keystone). Example: https://cloud.example.com:5000
application_credential_id: null # The application credential id created in OpenStack. Create application credentials using: openstack application credential create
application_credential_secret: null # The secret associated with the application credential ID for authentication.
auth_type: "v3applicationcredential" # The type of authentication used, typically "v3applicationcredential" for using OpenStack application credentials.
network_id: null # The unique identifier for the internal/private network in OpenStack where the cluster VMs will be connected. List available networks using: `openstack network list`
image: null # The OS image name or id to use for the VM. List available images using: `openstack image list`
keypair_name: null # The name of the SSH keypair used for instance access. Ensure you have created a keypair or use an existing one. List available keypairs using: `openstack keypair list`
security_group: null # The security group name that defines firewall rules for instances. List available security groups using: `openstack security group list`
external_network_id: null # The ID of the external network used for assigning floating IPs. List available external networks using: `openstack network list --external`
create_floating_ip: false # Specifies whether to assign a floating IP to each instance, enabling external access. Set to `True` if external connectivity is needed.
docker_image: "daskdev/dask:latest" # docker image to use
2 changes: 1 addition & 1 deletion dask_cloudprovider/digitalocean/droplet.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ async def create_vm(self):
await asyncio.sleep(0.1)
self.cluster._log(f"Created droplet {self.name}")

return self.droplet.ip_address
return self.droplet.ip_address, None

async def destroy_vm(self):
await self.cluster.call_async(self.droplet.destroy)
Expand Down
Loading

0 comments on commit 7f5d48f

Please sign in to comment.