diff --git a/ci/scripts/test_imports.sh b/ci/scripts/test_imports.sh index 2fb1f02b..90dcc297 100644 --- a/ci/scripts/test_imports.sh +++ b/ci/scripts/test_imports.sh @@ -19,5 +19,6 @@ test_import "aws" "import dask_cloudprovider.aws" test_import "azure" "import dask_cloudprovider.azure" test_import "digitalocean" "import dask_cloudprovider.digitalocean" test_import "gcp" "import dask_cloudprovider.gcp" +test_import "fly" "import dask_cloudprovider.fly" test_import "ibm" "import dask_cloudprovider.ibm" test_import "openstack" "import dask_cloudprovider.openstack" diff --git a/dask_cloudprovider/cloudprovider.yaml b/dask_cloudprovider/cloudprovider.yaml index fcf201aa..4e89beb8 100755 --- a/dask_cloudprovider/cloudprovider.yaml +++ b/dask_cloudprovider/cloudprovider.yaml @@ -119,6 +119,16 @@ cloudprovider: docker_image: "daskdev/dask:latest" # docker image to use bootstrap: true # It is assumed that the OS image does not have Docker and needs bootstrapping. Set this to false if using a custom image with Docker already installed. + fly: + token: null # API token for interacting with the Fly API + region: "lax" # Region to launch Droplets in + vm_size: "shared-cpu-1x" # Droplet size to launch, default is 1GB RAM, 1 vCPU + image: "ghcr.io/dask/dask:latest" # Operating System image to use + memory_mb: 1024 # Memory in MB to use for the scheduler and workers + cpus: 1 # Number of CPUs to use for the scheduler and workers + app_name: null # Name of Fly app to use. If it is blank, a random name will be generated. + org_slug: "personal" # Organization slug to use. If it is blank, the personal organization will be used. + ibm: api_key: null image: "ghcr.io/dask/dask:latest" diff --git a/dask_cloudprovider/fly/__init__.py b/dask_cloudprovider/fly/__init__.py new file mode 100644 index 00000000..944de53a --- /dev/null +++ b/dask_cloudprovider/fly/__init__.py @@ -0,0 +1,6 @@ +from .machine import ( + FlyMachine, + FlyMachineWorker, + FlyMachineScheduler, + FlyMachineCluster, +) diff --git a/dask_cloudprovider/fly/machine.py b/dask_cloudprovider/fly/machine.py new file mode 100644 index 00000000..96a4c374 --- /dev/null +++ b/dask_cloudprovider/fly/machine.py @@ -0,0 +1,447 @@ +import uuid +import dask +import asyncio +import json +import warnings +from dask_cloudprovider.generic.vmcluster import ( + VMCluster, + VMInterface, + SchedulerMixin, + WorkerMixin, +) +from distributed.core import Status +from distributed.worker import Worker as _Worker +from distributed.scheduler import Scheduler as _Scheduler +from distributed.utils import cli_keywords +from dask_cloudprovider.utils.socket import async_socket_open + +try: + from .sdk.models import machines + from .sdk.fly import Fly +except ImportError as e: + msg = ( + "Dask Cloud Provider Fly.io requirements are not installed.\n\n" + "Please pip install as follows:\n\n" + ' pip install "dask-cloudprovider[fly]" --upgrade # or python -m pip install' + ) + raise ImportError(msg) from e + + +class FlyMachine(VMInterface): + def __init__( + self, + cluster: str, + config, + *args, + region: str = None, + vm_size: str = None, + memory_mb: int = None, + cpus: int = None, + image: str = None, + env_vars=None, + extra_bootstrap=None, + metadata=None, + restart=None, + **kwargs, + ): + super().__init__(*args, **kwargs) + self.machine = None + self.cluster = cluster + self.config = config + self.region = region + self.vm_size = vm_size + self.cpus = cpus + self.memory_mb = memory_mb + self.image = image + self.gpu_instance = False + self.bootstrap = True + self.extra_bootstrap = extra_bootstrap + self.env_vars = env_vars + self.metadata = metadata + self.restart = restart + self.app_name = self.cluster.app_name + self.env_vars["DASK_INTERNAL__INHERIT_CONFIG"] = dask.config.serialize( + dask.config.global_config + ) + self.api_token = self.cluster.api_token + self._client = None + if self.api_token is None: + raise ValueError("[fly.io] API token must be provided") + + async def create_vm(self): + machine_config = machines.FlyMachineConfig( + env=self.env_vars, + image=self.image, + metadata=self.metadata, + restart=self.restart, + services=[ + machines.FlyMachineConfigServices( + ports=[ + machines.FlyMachineRequestConfigServicesPort(port=8786), + ], + protocol="tcp", + internal_port=8786, + ), + machines.FlyMachineConfigServices( + ports=[ + machines.FlyMachineRequestConfigServicesPort( + port=80, handlers=["http"] + ), + machines.FlyMachineRequestConfigServicesPort( + port=443, handlers=["http", "tls"] + ), + machines.FlyMachineRequestConfigServicesPort( + port=8787, handlers=["http", "tls"] + ), + ], + protocol="tcp", + internal_port=8787, + ), + ], + guest=machines.FlyMachineConfigGuest( + cpu_kind="shared", + cpus=self.cpus, + memory_mb=self.memory_mb, + ), + metrics=None, + processes=[ + machines.FlyMachineConfigProcess( + name="app", + cmd=self.command, + env=self.env_vars, + ) + ], + ) + self.machine = await self._fly().create_machine( + app_name=self.cluster.app_name, # The name of the new Fly.io app. + config=machine_config, # A FlyMachineConfig object containing creation details. + name=self.name, # The name of the machine. + region=self.region, # The deployment region for the machine. + ) + self.host = f"{self.machine.id}.vm.{self.cluster.app_name}.internal" + self.internal_ip = self.machine.private_ip + self.port = 8786 + self.address = ( + f"{self.cluster.protocol}://[{self.machine.private_ip}]:{self.port}" + ) + # self.external_address = f"{self.cluster.protocol}://{self.host}:{self.port}" + log_attributes = { + "name": self.name, + "machine": self.machine.id, + "internal_ip": self.internal_ip, + "address": self.address, + } + if self.external_address is not None: + log_attributes["external_address"] = self.external_address + logline = "[fly.io] Created machine " + " ".join( + [f"{k}={v}" for k, v in log_attributes.items()] + ) + self.cluster._log(logline) + return self.address, self.external_address + + async def destroy_vm(self): + if self.machine is None: + self.cluster._log( + "[fly.io] Not Terminating Machine: Machine does not exist" + ) + return + await self._fly().delete_machine( + app_name=self.cluster.app_name, + machine_id=self.machine.id, + force=True, + ) + self.cluster._log(f"[fly.io] Terminated machine {self.name}") + + async def wait_for_scheduler(self): + self.cluster._log(f"Waiting for scheduler to run at {self.address}") + while not asyncio.create_task(async_socket_open(self.internal_ip, self.port)): + await asyncio.sleep(1) + self.cluster._log("Scheduler is running") + return True + + async def wait_for_app(self): + self.cluster._log("[fly.io] Waiting for app to be created...") + while self.cluster.app_name is None or self.app is None: + await asyncio.sleep(1) + return True + + def _fly(self): + if self._client is None: + self._client = Fly(api_token=self.api_token) + return self._client + + +class FlyMachineScheduler(SchedulerMixin, FlyMachine): + """Scheduler running on a Fly.io Machine.""" + + def __init__( + self, + *args, + scheduler_options: dict = {}, + **kwargs, + ): + super().__init__(*args, **kwargs) + self.name = f"dask-{self.cluster.uuid}-scheduler" + self.port = scheduler_options.get("port", 8786) + self.command = [ + "python", + "-m", + "distributed.cli.dask_scheduler", + "--host", + "fly-local-6pn", + ] + cli_keywords(scheduler_options, cls=_Scheduler) + + async def start(self): + self.cluster._log(f"Starting scheduler on {self.name}") + if self.cluster.app_name is None: + await self.create_app() + await self.start_scheduler() + self.status = Status.running + + async def start_scheduler(self): + self.cluster._log("Creating scheduler instance") + address, external_address = await self.create_vm() + await self.wait_for_scheduler() + self.cluster._log(f"Scheduler running at {address}") + self.cluster.scheduler_internal_address = address + self.cluster.scheduler_external_address = external_address + self.cluster.scheduler_port = self.port + + async def close(self): + await super().close() + if self.cluster.app_name is not None: + await self.delete_app() + + async def create_app(self): + """Create a Fly.io app.""" + if self.cluster.app_name is not None: + self.cluster._log("[fly.io] Not creating app as it already exists") + return + app_name = f"dask-{str(uuid.uuid4())[:8]}" + try: + self.cluster._log(f"[fly.io] Trying to create app {app_name}") + self.app = await self._fly().create_app(app_name=app_name) + self.cluster._log(f"[fly.io] Created app {app_name}") + self.cluster.app_name = app_name + except Exception as e: + self.cluster._log(f"[fly.io] Failed to create app {app_name}") + self.app = "failed" + raise e + + async def delete_app(self): + """Delete a Fly.io app.""" + if self.cluster.app_name is None: + self.cluster._log("[fly.io] Not deleting app as it does not exist") + return + await self._fly().delete_app(app_name=self.cluster.app_name) + self.cluster._log(f"[fly.io] Deleted app {self.cluster.app_name}") + + async def wait_for_app(self): + """Wait for the Fly.io app to be ready.""" + while self.app is None or self.cluster.app_name is None: + self.cluster._log("[fly.io] Waiting for app to be created") + await asyncio.sleep(1) + + +class FlyMachineWorker(WorkerMixin, FlyMachine): + """Worker running on a Fly.io Machine.""" + + def __init__( + self, + *args, + worker_module: str = None, + worker_class: str = None, + worker_options: dict = {}, + **kwargs, + ): + super().__init__(*args, **kwargs) + if worker_module is not None: + self.worker_module = worker_module + self.command = [ + "python", + "-m", + self.worker_module, + self.scheduler, + "--name", + str(self.name), + ] + cli_keywords(worker_options, cls=_Worker, cmd=self.worker_module) + if worker_class is not None: + self.worker_class = worker_class + self.command = [ + "python", + "-m", + "distributed.cli.dask_spec", + self.scheduler, + "--spec", + json.dumps( + { + "cls": self.worker_class, + "opts": { + **worker_options, + "name": self.name, + "host": "fly-local-6pn", + }, + } + ), + ] + + +class FlyMachineCluster(VMCluster): + """Cluster running on Fly.io Machines. + + VMs in Fly.io (FLY) are referred to as machines. This cluster manager constructs a Dask cluster + running on VMs. + + .. note:: + By default, the cluster will instantiate a new Fly.io app. The app will be deleted when + the cluster is closed. If you want to use an existing app, you can pass the app name to the + ``app_name`` parameter. + + When configuring your cluster you may find it useful to install the ``flyctl`` tool for querying the + CLY API for available options. + + https://fly.io/docs/hands-on/install-flyctl/ + + Parameters + ---------- + region: str + The FLY region to launch your cluster in. A full list can be obtained with ``flyctl platform regions``. + vm_size: str + The VM size slug. You can get a full list with ``flyctl platform sizes``. + The default is ``shared-cpu-1x`` which is 256GB RAM and 1 vCPU + image: str + The Docker image to run on all instances. + + This image must have a valid Python environment and have ``dask`` installed in order for the + ``dask-scheduler`` and ``dask-worker`` commands to be available. It is recommended the Python + environment matches your local environment where ``FlyMachineCluster`` is being created from. + + By default the ``ghcr.io/dask/dask:latest`` image will be used. + worker_module: str + The Dask worker module to start on worker VMs. + n_workers: int + Number of workers to initialise the cluster with. Defaults to ``0``. + worker_module: str + The Python module to run for the worker. Defaults to ``distributed.cli.dask_worker`` + worker_options: dict + Params to be passed to the worker class. + See :class:`distributed.worker.Worker` for default worker class. + If you set ``worker_module`` then refer to the docstring for the custom worker class. + scheduler_options: dict + Params to be passed to the scheduler class. + See :class:`distributed.scheduler.Scheduler`. + extra_bootstrap: list[str] (optional) + Extra commands to be run during the bootstrap phase. + env_vars: dict (optional) + Environment variables to be passed to the worker. + silence_logs: bool + Whether or not we should silence logging when setting up the cluster. + asynchronous: bool + If this is intended to be used directly within an event loop with + async/await + security : Security or bool, optional + Configures communication security in this cluster. Can be a security + object, or True. If True, temporary self-signed credentials will + be created automatically. Default is ``False``. + debug : bool, optional + More information will be printed when constructing clusters to enable debugging. + + Examples + -------- + + Create the cluster. + + >>> from dask_cloudprovider.fly import FlyMachineCluster + >>> cluster = FlyMachineCluster(n_workers=1) + Starting scheduler on dask-e058d78e-scheduler + [fly.io] Trying to create app dask-122f0e5f + [fly.io] Created app dask-122f0e5f + Creating scheduler instance + [fly.io] Created machine name=dask-e058d78e-scheduler id=6e82d4e6a02d58 + Waiting for scheduler to run at 6e82d4e6a02d58.vm.dask-122f0e5f.internal:8786 + Scheduler is running + Scheduler running at tcp://[fdaa:1:53b:a7b:112:2bed:ccd1:2]:8786 + Creating worker instance + [fly.io] Created machine name=dask-e058d78e-worker-7b24cb61 id=32873e0a095985 + + Connect a client. + + >>> from dask.distributed import Client + >>> client = Client(cluster) + + Do some work. + + >>> import dask.array as da + >>> arr = da.random.random((1000, 1000), chunks=(100, 100)) + >>> arr.mean().compute() + 0.5001550986751964 + + Close the cluster + + >>> client.close() + >>> cluster.close() + [fly.io] Terminated machine dask-e058d78e-worker-7b24cb61 + [fly.io] Terminated machine dask-e058d78e-scheduler + [fly.io] Deleted app dask-122f0e5f + + You can also do this all in one go with context managers to ensure the cluster is + created and cleaned up. + + >>> with FlyMachineCluster(n_workers=1) as cluster: + ... with Client(cluster) as client: + ... print(da.random.random((1000, 1000), chunks=(100, 100)).mean().compute()) + Starting scheduler on dask-e058d78e-scheduler + [fly.io] Trying to create app dask-122f0e5f + [fly.io] Created app dask-122f0e5f + Creating scheduler instance + [fly.io] Created machine name=dask-e058d78e-scheduler id=6e82d4e6a02d58 + Waiting for scheduler to run at 6e82d4e6a02d58.vm.dask-122f0e5f.internal:8786 + Scheduler is running + Scheduler running at tcp://[fdaa:1:53b:a7b:112:2bed:ccd1:2]:8786 + Creating worker instance + [fly.io] Created machine name=dask-e058d78e-worker-7b24cb61 id=32873e0a095985 + 0.5000558682356162 + [fly.io] Terminated machine dask-e058d78e-worker-7b24cb61 + [fly.io] Terminated machine dask-e058d78e-scheduler + [fly.io] Deleted app dask-122f0e5f + + """ + + def __init__( + self, + region: str = None, + vm_size: str = None, + image: str = None, + token: str = None, + memory_mb: int = None, + cpus: int = None, + debug: bool = False, + app_name: str = None, + **kwargs, + ): + self.config = dask.config.get("cloudprovider.fly", {}) + self.scheduler_class = FlyMachineScheduler + self.worker_class = FlyMachineWorker + self.debug = debug + self.app_name = app_name + self._client = None + self.options = { + "cluster": self, + "config": self.config, + "region": region if region is not None else self.config.get("region"), + "vm_size": vm_size if vm_size is not None else self.config.get("vm_size"), + "image": image if image is not None else self.config.get("image"), + "token": token if token is not None else self.config.get("token"), + "memory_mb": memory_mb + if memory_mb is not None + else self.config.get("memory_mb"), + "cpus": cpus if cpus is not None else self.config.get("cpus"), + "app_name": self.app_name, + "protocol": self.config.get("protocol", "tcp"), + "security": self.config.get("security", False), + "host": "fly-local-6pn", + } + self.scheduler_options = {**self.options} + self.worker_options = {**self.options} + self.api_token = self.options["token"] + super().__init__(debug=debug, security=self.options["security"], **kwargs) diff --git a/dask_cloudprovider/fly/sdk/__init__.py b/dask_cloudprovider/fly/sdk/__init__.py new file mode 100644 index 00000000..bd7c8862 --- /dev/null +++ b/dask_cloudprovider/fly/sdk/__init__.py @@ -0,0 +1,8 @@ +"""A Python SDK for interacting with the Fly.io API.""" + +__version__ = "0.1" + +from . import constants +from . import exceptions +from . import fly +from . import models diff --git a/dask_cloudprovider/fly/sdk/constants.py b/dask_cloudprovider/fly/sdk/constants.py new file mode 100644 index 00000000..eced3377 --- /dev/null +++ b/dask_cloudprovider/fly/sdk/constants.py @@ -0,0 +1,56 @@ +# @see https://github.com/bwhli/fly-python-sdk/blob/main/fly_python_sdk/constants.py + +FLY_REGIONS = [ + "ams", # Amsterdam, Netherlands + "arn", # Stockholm, Sweden + "bog", # Bogotá, Colombia + "bos", # Boston, Massachusetts (US) + "cdg", # Paris, France + "den", # Denver, Colorado (US) + "dfw", # Dallas, Texas (US) + "ewr", # Secaucus, NJ (US) + "fra", # Frankfurt, Germany + "gdl", # Guadalajara, Mexico + "gig", # Rio de Janeiro, Brazil + "gru", # São Paulo + "hkg", # Hong Kong, Hong Kong + "iad", # Ashburn, Virginia (US) + "jnb", # Johannesburg, South Africa + "lax", # Los Angeles, California (US + "lhr", # London, United Kingdom + "maa", # Chennai (Madras), India + "mad", # Madrid, Spain + "mia", # Miami, Florida (US) + "nrt", # Tokyo, Japan + "ord", # Chicago, Illinois (US) + "otp", # Bucharest, Romania + "qro", # Querétaro, Mexico + "scl", # Santiago, Chile + "sea", # Seattle, Washington (US) + "sin", # Singapore, Singapore + "sjc", # San Jose, California (US) + "syd", # Sydney, Australia + "waw", # Warsaw, Poland + "yul", # Montreal, Canada + "yyz", # Toronto, Canada +] + +FLY_APP_VM_SIZES = [ + "shared-cpu-1x", + "dedicated-cpu-1x", + "dedicated-cpu-2x", + "dedicated-cpu-4x", + "dedicated-cpu-8x", +] + +FLY_MACHINE_VM_SIZES = [ + "shared-cpu-1x", + "shared-cpu-2x", + "shared-cpu-4x", + "shared-cpu-8x", + "performance-1x", + "performance-2x", + "performance-4x", + "performance-8x", + "performance-16x", +] diff --git a/dask_cloudprovider/fly/sdk/exceptions.py b/dask_cloudprovider/fly/sdk/exceptions.py new file mode 100644 index 00000000..a937f442 --- /dev/null +++ b/dask_cloudprovider/fly/sdk/exceptions.py @@ -0,0 +1,41 @@ +# @see https://github.com/bwhli/fly-python-sdk/blob/main/fly_python_sdk/exceptions.py + + +class AppInterfaceError(Exception): + def __init__(self, message): + self.message = message + + def __str__(self): + return self.message + + +class MissingApiHostnameError(Exception): + def __init__(self, message): + self.message = message + + def __str__(self): + return self.message + + +class MissingApiTokenError(Exception): + def __init__(self, message): + self.message = message + + def __str__(self): + return self.message + + +class MissingMachineIdsError(Exception): + def __init__(self, message): + self.message = message + + def __str__(self): + return self.message + + +class MachineInterfaceError(Exception): + def __init__(self, message): + self.message = message + + def __str__(self): + return self.message diff --git a/dask_cloudprovider/fly/sdk/fly.py b/dask_cloudprovider/fly/sdk/fly.py new file mode 100644 index 00000000..e1cd9df4 --- /dev/null +++ b/dask_cloudprovider/fly/sdk/fly.py @@ -0,0 +1,360 @@ +# @see https://github.com/bwhli/fly-python-sdk/blob/main/fly_python_sdk/fly.py + +import os +from typing import Union + +import httpx +from pydantic import BaseModel + +from .exceptions import ( + AppInterfaceError, + MachineInterfaceError, + MissingMachineIdsError, +) +from .models.apps import ( + FlyAppCreateRequest, + FlyAppCreateResponse, + FlyAppDetailsResponse, + FlyAppDeleteResponse, +) +from .models.machines import FlyMachineConfig, FlyMachineDetails + + +class Fly: + """ + A class for interacting with the Fly.io platform. + """ + + def __init__(self, api_token: str) -> None: + self.api_token = api_token + self.api_version = 1 + + ######## + # Apps # + ######## + + async def create_app( + self, + app_name: str, + org_slug: str = "personal", + ) -> FlyAppCreateResponse: + """Creates a new app on Fly.io. + + Args: + app_name: The name of the new Fly.io app. + org_slug: The slug of the organization to create the app within. + If None, the personal organization will be used. + """ + path = "apps" + app_details = FlyAppCreateRequest(app_name=app_name, org_slug=org_slug) + r = await self._make_api_post_request(path, app_details.dict()) + + # Raise an exception if HTTP status code is not 201. + if r.status_code != 201: + error_msg = r.json().get("error", {}).get("message", "Unknown error!") + raise AppInterfaceError( + message=f"Unable to create {app_name} in {org_slug}! Error: {error_msg}" + ) + return FlyAppCreateResponse(app_name=app_name, org_slug=org_slug) + + async def get_app( + self, + app_name: str, + ) -> FlyAppDetailsResponse: + """Returns information about a Fly.io application. + + Args: + app_name: The name of the new Fly.io app. + """ + path = f"apps/{app_name}" + r = await self._make_api_get_request(path) + + # Raise an exception if HTTP status code is not 200. + if r.status_code != 200: + raise AppInterfaceError(message=f"Unable to get {app_name}!") + + return FlyAppDetailsResponse(**r.json()) + + async def delete_app( + self, + app_name: str, + force: bool = False, + ) -> None: + """Deletes a Fly.io application. + + Args: + app_name: The name of the new Fly.io app. + force: If True, the app will be deleted even if it has active machines. + """ + path = f"apps/{app_name}?force={str(force).lower()}" + r = await self._make_api_delete_request(path) + + # Raise an exception if HTTP status code is not 200. + if r.status_code != 202: + raise AppInterfaceError( + message=f"Unable to delete {app_name}! status_code={r.status_code}" + ) + + return FlyAppDeleteResponse( + status=r.status_code, + app_name=app_name, + ) + + ############ + # Machines # + ############ + + async def list_machines( + self, + app_name: str, + ids_only: bool = False, + ) -> Union[list[FlyMachineDetails], list[str]]: + """Returns a list of machines that belong to a Fly.io application. + + Args: + ids_only: If True, only machine IDs will be returned. Defaults to False. + """ + path = f"apps/{app_name}/machines" + r = await self._make_api_get_request(path) + + # Raise an exception if HTTP status code is not 200. + if r.status_code != 200: + raise AppInterfaceError(message=f"Unable to get machines in {app_name}!") + + # Create a FlyMachineDetails object for each machine. + machines = [FlyMachineDetails(**machine) for machine in r.json()] + + # Filter and return a list of ids if ids_only is True. + if ids_only is True: + return [machine.id for machine in machines] + + return machines + + async def create_machine( + self, + app_name: str, + config: FlyMachineConfig, + name: str = None, + region: str = None, + ): + """Creates a Fly.io machine. + + Args: + app_name: The name of the new Fly.io app. + config: A FlyMachineConfig object containing creation details. + name: The name of the machine. + region: The deployment region for the machine. + """ + path = f"apps/{app_name}/machines" + + # Create Pydantic model for machine creation requests. + class _FlyMachineCreateRequest(BaseModel): + name: Union[str, None] = None + region: Union[str, None] = None + config: FlyMachineConfig + + # Create FlyMachineCreateRequest object + machine_create_request = _FlyMachineCreateRequest( + name=name, + region=region, + config=config, + ) + + r = await self._make_api_post_request( + path, + payload=machine_create_request.dict(exclude_defaults=True), + ) + + # Raise an exception if HTTP status code is not 200. + if r.status_code != 200: + raise MachineInterfaceError( + message=f"{r.status_code}: Unable to create machine!" + ) + + return FlyMachineDetails(**r.json()) + + async def delete_machine( + self, + app_name: str, + machine_id: str, + force: bool = False, + ) -> None: + """Deletes a Fly.io machine. + + Args: + app_name: The name of the new Fly.io app. + machine_id: The id string for a Fly.io machine. + force: If True, the machine will be deleted even if it is running. + """ + path = f"apps/{app_name}/machines/{machine_id}?force={str(force).lower()}" + r = await self._make_api_delete_request(path) + + # Raise an exception if HTTP status code is not 200. + if r.status_code != 200: + raise MachineInterfaceError( + message=f"Unable to delete {machine_id} in {app_name}!" + ) + + return + + async def delete_machines( + self, + app_name: str, + machine_ids: list[str] = [], + delete_all: bool = False, + force: bool = False, + ) -> None: + """Deletes multiple Fly.io machines. + + Args: + app_name: The name of the new Fly.io app. + machine_ids: An array of machine IDs to delete. + delete_all: Delete all machines in the app if True. + force: Delete even if running + """ + # If delete_all is True, override provided machine_ids. + if delete_all is True: + machine_ids = self.list_machines(app_name, ids_only=True) + + # Raise an exception if there are no machine IDs to delete. + if len(machine_ids) == 0: + raise MissingMachineIdsError( + "Please provide at least one machine ID to delete." + ) + + # Stop machines. + for machine_id in machine_ids: + self.stop_machine(app_name, machine_id) + + # Delete machines. + for machine_id in machine_ids: + self.delete_machine(app_name, machine_id, force=force) + + return + + async def get_machine( + self, + app_name: str, + machine_id: str, + ) -> FlyMachineDetails: + """Returns information about a Fly.io machine. + + Args: + app_name: The name of the new Fly.io app. + machine_id: The id string for a Fly.io machine. + """ + path = f"apps/{app_name}/machines/{machine_id}" + r = await self._make_api_get_request(path) + + # Raise an exception if HTTP status code is not 200. + if r.status_code != 200: + raise MachineInterfaceError( + message=f"Unable to delete {machine_id} in {app_name}!" + ) + + return FlyMachineDetails(**r.json()) + + async def start_machine( + self, + app_name: str, + machine_id: str, + ) -> None: + """Starts a Fly.io machine. + + Args: + app_name: The name of the new Fly.io app. + machine_id: The id string for a Fly.io machine. + """ + path = f"apps/{app_name}/machines/{machine_id}/start" + r = await self._make_api_post_request(path) + + # Raise an exception if HTTP status code is not 200. + if r.status_code != 200: + raise MachineInterfaceError( + message=f"Unable to start {machine_id} in {app_name}!" + ) + + return + + async def stop_machine( + self, + app_name: str, + machine_id: str, + ) -> None: + """Stop a Fly.io machine. + + Args: + app_name: The name of the new Fly.io app. + machine_id: The id string for a Fly.io machine. + """ + path = f"apps/{app_name}/machines/{machine_id}/stop" + r = await self._make_api_post_request(path) + + # Raise an exception if HTTP status code is not 200. + if r.status_code != 200: + raise MachineInterfaceError( + message=f"Unable to stop {machine_id} in {app_name}!" + ) + + return + + ############# + # Utilities # + ############# + + async def _make_api_delete_request( + self, + path: str, + ) -> httpx.Response: + """An internal function for making DELETE requests to the Fly.io API.""" + api_hostname = self._get_api_hostname() + url = f"{api_hostname}/v{self.api_version}/{path}" + async with httpx.AsyncClient(verify=False, timeout=None) as client: + r = await client.delete(url, headers=self._generate_headers()) + r.raise_for_status() + return r + + async def _make_api_get_request( + self, + path: str, + ) -> httpx.Response: + """An internal function for making GET requests to the Fly.io API.""" + api_hostname = self._get_api_hostname() + url = f"{api_hostname}/v{self.api_version}/{path}" + async with httpx.AsyncClient(verify=False, timeout=None) as client: + r = await client.get(url, headers=self._generate_headers()) + r.raise_for_status() + return r + + async def _make_api_post_request( + self, + path: str, + payload: dict = {}, + ) -> httpx.Response: + """An internal function for making POST requests to the Fly.io API.""" + api_hostname = self._get_api_hostname() + url = f"{api_hostname}/v{self.api_version}/{path}" + headers = self._generate_headers() + async with httpx.AsyncClient(verify=False, timeout=None) as client: + r = await client.post(url, headers=headers, json=payload) + r.raise_for_status() + return r + + def _generate_headers(self) -> dict: + """Returns a dictionary containing headers for requests to the Fly.io API.""" + headers = { + "Authorization": f"Bearer {self.api_token}", + "Content-Type": "application/json", + } + return headers + + def _get_api_hostname(self) -> str: + """Returns the hostname that will be used to connect to the Fly.io API. + + Returns: + The hostname that will be used to connect to the Fly.io API. + If the FLY_API_HOSTNAME environment variable is not set, + the hostname returned will default to https://api.machines.dev. + """ + api_hostname = os.getenv("FLY_API_HOSTNAME", "https://api.machines.dev") + return api_hostname diff --git a/dask_cloudprovider/fly/sdk/models/__init__.py b/dask_cloudprovider/fly/sdk/models/__init__.py new file mode 100644 index 00000000..19087351 --- /dev/null +++ b/dask_cloudprovider/fly/sdk/models/__init__.py @@ -0,0 +1,4 @@ +from . import ( + apps, + machines, +) diff --git a/dask_cloudprovider/fly/sdk/models/apps/__init__.py b/dask_cloudprovider/fly/sdk/models/apps/__init__.py new file mode 100644 index 00000000..034584a0 --- /dev/null +++ b/dask_cloudprovider/fly/sdk/models/apps/__init__.py @@ -0,0 +1,28 @@ +from pydantic import BaseModel +from typing import Union + + +class FlyAppCreateRequest(BaseModel): + app_name: str + org_slug: Union[str, None] = None + + +class FlyAppCreateResponse(BaseModel): + app_name: str + org_slug: Union[str, None] = None + + +class FlyAppDetailsResponse(BaseModel): + name: str + status: str + organization: dict + + +class FlyAppDeleteRequest(BaseModel): + app_name: str + org_slug: Union[str, None] = None + + +class FlyAppDeleteResponse(BaseModel): + app_name: str + status: int diff --git a/dask_cloudprovider/fly/sdk/models/machines/__init__.py b/dask_cloudprovider/fly/sdk/models/machines/__init__.py new file mode 100644 index 00000000..18e2cdc9 --- /dev/null +++ b/dask_cloudprovider/fly/sdk/models/machines/__init__.py @@ -0,0 +1,149 @@ +# @see https://github.com/bwhli/fly-python-sdk/blob/main/fly_python_sdk/models/machines/__init__.py + +import logging +from datetime import datetime +from ipaddress import IPv6Address +from typing import Union + +from pydantic import BaseModel, validator + +# FlyMachineConfig.checks + + +class FlyMachineConfigCheck(BaseModel): + """Model for FlyMachineConfig.checks""" + + port: int + type: str + interval: str + timeout: str + method: str + path: str + + +# FlyMachineConfig.guest + + +class FlyMachineConfigGuest(BaseModel): + """Model for FlyMachineConfig.guest""" + + cpu_kind: str + cpus: int + memory_mb: int + + +# FlyMachineConfig.init + + +class FlyMachineConfigInit(BaseModel): + """Model for FlyMachineConfig.init""" + + exec: Union[str, None] = None + entrypoint: Union[str, None] = None + cmd: Union[str, None] = None + tty: bool = False + + +# FlyMachineConfig.mounts + + +class FlyMachineConfigMount(BaseModel): + volume: str + path: str + + +# FlyMachineConfig.processes + + +class FlyMachineConfigProcess(BaseModel): + name: str = "app" + entrypoint: Union[list[str], None] = None + cmd: Union[list[str], None] = None + env: Union[dict[str, str], None] = None + user: Union[str, None] = None + + +# FlyMachineConfig.services.port + + +class FlyMachineRequestConfigServicesPort(BaseModel): + """Model for FlyMachineConfig.services.port""" + + port: int + handlers: list[str] = [] + + @validator("port") + def validate_port(cls, port: int) -> int: + assert port >= 0 and port <= 65536 + return port + + @validator("handlers") + def validate_handlers(cls, handlers: list[str]) -> list[str]: + logging.debug(handlers) + # Only run validation if there is 1 or more handlers. + if len(handlers) > 0: + # Convert handlers to lowercase. + handlers = [handler.casefold() for handler in handlers] + assert ( + all(handler in ["http", "tcp", "tls", "udp"] for handler in handlers) + is True + ) + return handlers + + +# FlyMachineConfig.services + + +class FlyMachineConfigServices(BaseModel): + """Model for FlyMachineConfig.services""" + + ports: list[FlyMachineRequestConfigServicesPort] = [] + protocol: str + internal_port: int + + @validator("internal_port") + def validate_internal_port(cls, internal_port: int) -> int: + assert internal_port >= 0 and internal_port <= 65536 + return internal_port + + @validator("protocol") + def validate_protocol(cls, protocol: str) -> str: + assert protocol in ["http", "tcp", "udp"] + return protocol + + +class FlyMachineConfig(BaseModel): + env: Union[dict[str, str], None] = None + init: Union[FlyMachineConfigInit, None] = None + image: str + metadata: Union[dict[str, str], None] = None + restart: Union[dict[str, str], None] = None + services: Union[list[FlyMachineConfigServices], None] = None + guest: Union[FlyMachineConfigGuest, None] = None + size: str = None + metrics: Union[None, Union[dict[str, str], dict[str, int]]] = None + processes: Union[list[FlyMachineConfigProcess], None] = None + schedule: Union[str, None] = None + mounts: Union[list[FlyMachineConfigMount], None] = None + checks: Union[dict[str, FlyMachineConfigCheck], None] = None + auto_destroy: bool = False + + +class FlyMachineImageRef(BaseModel): + registry: str + repository: str + tag: str + digest: str + + +class FlyMachineDetails(BaseModel): + id: str + name: str + state: str + region: str + instance_id: str + private_ip: IPv6Address + config: FlyMachineConfig + image_ref: FlyMachineImageRef + created_at: datetime + updated_at: datetime diff --git a/dask_cloudprovider/fly/tests/test_machine.py b/dask_cloudprovider/fly/tests/test_machine.py new file mode 100644 index 00000000..4b9e8518 --- /dev/null +++ b/dask_cloudprovider/fly/tests/test_machine.py @@ -0,0 +1,76 @@ +import pytest + +import dask + +# sdk = pytest.importorskip(".sdk") + +from dask_cloudprovider.fly.machine import FlyMachineCluster +from dask.distributed import Client +from distributed.core import Status + + +async def skip_without_credentials(config): + if config.get("token") is None: + pytest.skip( + """ + You must configure a Fly.io API token to run this test. + + Either set this in your config + + # cloudprovider.yaml + cloudprovider: + fly: + token: "yourtoken" + + Or by setting it as an environment variable + + export DASK_CLOUDPROVIDER__FLY__TOKEN="yourtoken" + + """ + ) + + +@pytest.fixture +async def config(): + return dask.config.get("cloudprovider.fly", {}) + + +@pytest.fixture +@pytest.mark.external +async def cluster(config): + await skip_without_credentials(config) + async with FlyMachineCluster(asynchronous=True) as cluster: + yield cluster + + +@pytest.mark.asyncio +@pytest.mark.external +async def test_init(): + cluster = FlyMachineCluster(asynchronous=True) + assert cluster.status == Status.created + + +@pytest.mark.asyncio +@pytest.mark.timeout(600) +@pytest.mark.external +async def test_create_cluster(cluster): + assert cluster.status == Status.running + + cluster.scale(1) + await cluster + assert len(cluster.workers) == 1 + + async with Client(cluster, asynchronous=True) as client: + + def inc(x): + return x + 1 + + assert await client.submit(inc, 10).result() == 11 + + +# @pytest.mark.asyncio +# async def test_get_cloud_init(): +# cloud_init = FlyMachineCluster.get_cloud_init( +# docker_args="--privileged", +# ) +# assert " --privileged " in cloud_init diff --git a/dask_cloudprovider/utils/socket.py b/dask_cloudprovider/utils/socket.py index 9acd1315..d6f6f963 100644 --- a/dask_cloudprovider/utils/socket.py +++ b/dask_cloudprovider/utils/socket.py @@ -1,4 +1,5 @@ import socket +import asyncio def is_socket_open(ip, port): @@ -9,3 +10,23 @@ def is_socket_open(ip, port): return True except Exception: return False + + +def is_ipv6_socket_open(ip, port): + connection = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) + try: + connection.connect((ip, int(port))) + connection.shutdown(2) + return True + except Exception: + return False + + +async def async_socket_open(address, port): + loop = asyncio.get_event_loop() + try: + res = await loop.sock_connect(address, port) + res.close() + return True + except Exception: + return False diff --git a/doc/source/fly.rst b/doc/source/fly.rst new file mode 100644 index 00000000..c856bf3a --- /dev/null +++ b/doc/source/fly.rst @@ -0,0 +1,37 @@ +Fly.io +====== + +.. currentmodule:: dask_cloudprovider.fly + +.. autosummary:: + FlyMachineCluster + +Overview +-------- + +Authentication +^^^^^^^^^^^^^^ + +To authenticate with Fly you must first generate a +`personal access token `_. + +Then you must put this in your Dask configuration at ``cloudprovider.fly.token``. This can be done by +adding the token to your YAML configuration or exporting an environment variable. + +.. code-block:: yaml + + # ~/.config/dask/cloudprovider.yaml + + cloudprovider: + fly: + token: "yourtoken" + +.. code-block:: console + + $ export DASK_CLOUDPROVIDER__FLY__TOKEN="yourtoken" + +FlyMachine +---------- + +.. autoclass:: FlyMachineCluster + :members: diff --git a/doc/source/index.rst b/doc/source/index.rst index a87aaf94..e414623e 100644 --- a/doc/source/index.rst +++ b/doc/source/index.rst @@ -97,6 +97,7 @@ It can be used on any cluster that has workers running on Azure VMs, not just on aws.rst digitalocean.rst + fly.rst gcp.rst azure.rst hetzner.rst diff --git a/doc/source/installation.rst b/doc/source/installation.rst index c28a2be0..cff6e014 100644 --- a/doc/source/installation.rst +++ b/doc/source/installation.rst @@ -16,6 +16,7 @@ You can also restrict your install to just a specific cloud provider by giving t $ pip install dask-cloudprovider[azure]  # or $ pip install dask-cloudprovider[azureml]  # or $ pip install dask-cloudprovider[digitalocean]  # or + $ pip install dask-cloudprovider[fly]  # or $ pip install dask-cloudprovider[gcp]  # or $ pip install dask-cloudprovider[ibm]  # or $ pip install dask-cloudprovider[openstack] @@ -25,4 +26,4 @@ Conda .. code-block:: console - $ conda install -c conda-forge dask-cloudprovider \ No newline at end of file + $ conda install -c conda-forge dask-cloudprovider diff --git a/setup.py b/setup.py index c00dbcdb..37e2853e 100644 --- a/setup.py +++ b/setup.py @@ -15,6 +15,7 @@ "digitalocean": ["python-digitalocean>=1.15.0"], "gcp": ["google-api-python-client>=1.12.5", "google-auth>=1.23.0"], "hetzner": ["hcloud>=1.10.0"], + "fly": ["httpx>=0.24.0", "pydantic>=1.10.7"], "ibm": ["ibm_code_engine_sdk>=3.1.0"], "openstack": ["openstacksdk>=3.3.0"], }