From bd18fd2a0a816cd98ecf2767f9db85ca28e71d04 Mon Sep 17 00:00:00 2001 From: Bihan Rana Date: Mon, 8 Apr 2024 19:08:59 +0545 Subject: [PATCH 1/5] Add spot provision in runpod --- src/dstack/_internal/cli/utils/run.py | 2 +- .../core/backends/runpod/api_client.py | 130 ++++++++++++++++++ 2 files changed, 131 insertions(+), 1 deletion(-) diff --git a/src/dstack/_internal/cli/utils/run.py b/src/dstack/_internal/cli/utils/run.py index ae1f321e8..0c2113312 100644 --- a/src/dstack/_internal/cli/utils/run.py +++ b/src/dstack/_internal/cli/utils/run.py @@ -69,7 +69,7 @@ def th(s: str) -> str: offers.add_column("PRICE") offers.add_column() - job_plan.offers = job_plan.offers[:offers_limit] + # job_plan.offers = job_plan.offers[:offers_limit] for i, offer in enumerate(job_plan.offers, start=1): r = offer.instance.resources diff --git a/src/dstack/_internal/core/backends/runpod/api_client.py b/src/dstack/_internal/core/backends/runpod/api_client.py index d24309629..14fbeafdc 100644 --- a/src/dstack/_internal/core/backends/runpod/api_client.py +++ b/src/dstack/_internal/core/backends/runpod/api_client.py @@ -48,6 +48,7 @@ def create_pod( template_id: Optional[str] = None, network_volume_id: Optional[str] = None, allowed_cuda_versions: Optional[list] = None, + bid_per_gpu: Optional[float] = None, ) -> Dict: resp = self._make_request( { @@ -73,6 +74,30 @@ def create_pod( network_volume_id, allowed_cuda_versions, ) + if bid_per_gpu is None + else generate_pod_rent_interruptable_mutation( + bid_per_gpu, + name, + image_name, + gpu_type_id, + cloud_type, + support_public_ip, + start_ssh, + data_center_id, + country_code, + gpu_count, + volume_in_gb, + container_disk_in_gb, + min_vcpu_count, + min_memory_in_gb, + docker_args, + ports, + volume_mount_path, + env, + template_id, + network_volume_id, + allowed_cuda_versions, + ) } ) data = resp.json() @@ -284,6 +309,111 @@ def generate_pod_deployment_mutation( """ +def generate_pod_rent_interruptable_mutation( + bid_per_gpu: float, + name: str, + image_name: str, + gpu_type_id: str, + cloud_type: str = "ALL", + support_public_ip: bool = True, + start_ssh: bool = True, + data_center_id=None, + country_code=None, + gpu_count=None, + volume_in_gb=None, + container_disk_in_gb=None, + min_vcpu_count=None, + min_memory_in_gb=None, + docker_args=None, + ports=None, + volume_mount_path=None, + env: dict = None, + template_id=None, + network_volume_id=None, + allowed_cuda_versions: Optional[List[str]] = None, +) -> str: + """ + Generates a mutation to deploy a pod on demand. + """ + input_fields = [] + + # ------------------------------ Required Fields ----------------------------- # + input_fields.append(f"bidPerGpu: {bid_per_gpu}") + input_fields.append(f'name: "{name}"') + input_fields.append(f'imageName: "{image_name}"') + input_fields.append(f'gpuTypeId: "{gpu_type_id}"') + + # ------------------------------ Default Fields ------------------------------ # + input_fields.append(f"cloudType: {cloud_type}") + + if start_ssh: + input_fields.append("startSsh: true") + + if support_public_ip: + input_fields.append("supportPublicIp: true") + else: + input_fields.append("supportPublicIp: false") + + # ------------------------------ Optional Fields ----------------------------- # + if data_center_id is not None: + input_fields.append(f'dataCenterId: "{data_center_id}"') + if country_code is not None: + input_fields.append(f'countryCode: "{country_code}"') + if gpu_count is not None: + input_fields.append(f"gpuCount: {gpu_count}") + if volume_in_gb is not None: + input_fields.append(f"volumeInGb: {volume_in_gb}") + if container_disk_in_gb is not None: + input_fields.append(f"containerDiskInGb: {container_disk_in_gb}") + if min_vcpu_count is not None: + input_fields.append(f"minVcpuCount: {min_vcpu_count}") + if min_memory_in_gb is not None: + input_fields.append(f"minMemoryInGb: {min_memory_in_gb}") + if docker_args is not None: + input_fields.append(f'dockerArgs: "{docker_args}"') + if ports is not None: + ports = ports.replace(" ", "") + input_fields.append(f'ports: "{ports}"') + if volume_mount_path is not None: + input_fields.append(f'volumeMountPath: "{volume_mount_path}"') + if env is not None: + env_string = ", ".join( + [f'{{ key: "{key}", value: "{value}" }}' for key, value in env.items()] + ) + input_fields.append(f"env: [{env_string}]") + if template_id is not None: + input_fields.append(f'templateId: "{template_id}"') + + if network_volume_id is not None: + input_fields.append(f'networkVolumeId: "{network_volume_id}"') + + if allowed_cuda_versions is not None: + allowed_cuda_versions_string = ", ".join( + [f'"{version}"' for version in allowed_cuda_versions] + ) + input_fields.append(f"allowedCudaVersions: [{allowed_cuda_versions_string}]") + + # Format input fields + input_string = ", ".join(input_fields) + print(f"{input_string}") + return f""" + mutation {{ + podRentInterruptable( + input: {{ + {input_string} + }} + ) {{ + id + lastStatusChange + imageName + machine {{ + podHostId + }} + }} + }} + """ + + def generate_pod_terminate_mutation(pod_id: str) -> str: """ Generates a mutation to terminate a pod. From 3ab6f2b12d747877d10ac1922bf076c29787a13c Mon Sep 17 00:00:00 2001 From: Bihan Rana Date: Thu, 11 Apr 2024 20:33:53 +0545 Subject: [PATCH 2/5] Add Spot support in Runpod --- src/dstack/_internal/cli/utils/run.py | 2 +- src/dstack/_internal/core/backends/runpod/api_client.py | 5 ++--- src/dstack/_internal/core/backends/runpod/compute.py | 1 + 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/dstack/_internal/cli/utils/run.py b/src/dstack/_internal/cli/utils/run.py index 0c2113312..ae1f321e8 100644 --- a/src/dstack/_internal/cli/utils/run.py +++ b/src/dstack/_internal/cli/utils/run.py @@ -69,7 +69,7 @@ def th(s: str) -> str: offers.add_column("PRICE") offers.add_column() - # job_plan.offers = job_plan.offers[:offers_limit] + job_plan.offers = job_plan.offers[:offers_limit] for i, offer in enumerate(job_plan.offers, start=1): r = offer.instance.resources diff --git a/src/dstack/_internal/core/backends/runpod/api_client.py b/src/dstack/_internal/core/backends/runpod/api_client.py index 14fbeafdc..bd1c782bf 100644 --- a/src/dstack/_internal/core/backends/runpod/api_client.py +++ b/src/dstack/_internal/core/backends/runpod/api_client.py @@ -100,8 +100,8 @@ def create_pod( ) } ) - data = resp.json() - return data["data"]["podFindAndDeployOnDemand"] + data = resp.json()["data"] + return data["podRentInterruptable"] if bid_per_gpu else data["podFindAndDeployOnDemand"] def get_pod(self, pod_id: str) -> Dict: resp = self._make_request({"query": generate_pod_query(pod_id)}) @@ -395,7 +395,6 @@ def generate_pod_rent_interruptable_mutation( # Format input fields input_string = ", ".join(input_fields) - print(f"{input_string}") return f""" mutation {{ podRentInterruptable( diff --git a/src/dstack/_internal/core/backends/runpod/compute.py b/src/dstack/_internal/core/backends/runpod/compute.py index 56143d8c9..047f8e5c2 100644 --- a/src/dstack/_internal/core/backends/runpod/compute.py +++ b/src/dstack/_internal/core/backends/runpod/compute.py @@ -77,6 +77,7 @@ def run_job( support_public_ip=True, docker_args=get_docker_args(authorized_keys), ports="10022/tcp", + bid_per_gpu=instance_offer.price if instance_offer.instance.resources.spot else None, ) instance_id = resp["id"] return JobProvisioningData( From 3ffb6b44b4eda96314458435a3473fd6d0e21129 Mon Sep 17 00:00:00 2001 From: Bihan Rana Date: Sun, 28 Apr 2024 12:14:31 +0545 Subject: [PATCH 3/5] Fix Cudo Create VM response error --- src/dstack/_internal/core/backends/cudo/compute.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dstack/_internal/core/backends/cudo/compute.py b/src/dstack/_internal/core/backends/cudo/compute.py index d6064b6e3..1b6df307b 100644 --- a/src/dstack/_internal/core/backends/cudo/compute.py +++ b/src/dstack/_internal/core/backends/cudo/compute.py @@ -121,7 +121,7 @@ def create_instance( instance_id=resp_data["id"], hostname=None, internal_ip=None, - region=resp_data["vm"]["regionId"], + region=instance_offer.region, price=instance_offer.price, ssh_port=22, username="root", From 9ebedb50622d1508764dfcfcc29656893aa514f3 Mon Sep 17 00:00:00 2001 From: Sergey Mezentsev Date: Thu, 2 May 2024 18:58:32 +0300 Subject: [PATCH 4/5] Improve type hints in runpod --- .../_internal/core/backends/runpod/api_client.py | 10 +++++----- src/dstack/_internal/core/backends/runpod/compute.py | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/dstack/_internal/core/backends/runpod/api_client.py b/src/dstack/_internal/core/backends/runpod/api_client.py index bd1c782bf..fb8fb0145 100644 --- a/src/dstack/_internal/core/backends/runpod/api_client.py +++ b/src/dstack/_internal/core/backends/runpod/api_client.py @@ -44,10 +44,10 @@ def create_pod( docker_args: str = "", ports: Optional[str] = None, volume_mount_path: str = "/runpod-volume", - env: Optional[dict] = None, + env: Optional[Dict[str, Any]] = None, template_id: Optional[str] = None, network_volume_id: Optional[str] = None, - allowed_cuda_versions: Optional[list] = None, + allowed_cuda_versions: Optional[List[str]] = None, bid_per_gpu: Optional[float] = None, ) -> Dict: resp = self._make_request( @@ -159,7 +159,7 @@ def wait_for_instance(self, instance_id) -> Optional[Dict]: """ -def generate_pod_query(pod_id) -> str: +def generate_pod_query(pod_id: str) -> str: """ Generate a query for a specific GPU type """ @@ -221,7 +221,7 @@ def generate_pod_deployment_mutation( docker_args=None, ports=None, volume_mount_path=None, - env: dict = None, + env: Optional[Dict[str, Any]] = None, template_id=None, network_volume_id=None, allowed_cuda_versions: Optional[List[str]] = None, @@ -327,7 +327,7 @@ def generate_pod_rent_interruptable_mutation( docker_args=None, ports=None, volume_mount_path=None, - env: dict = None, + env: Optional[Dict[str, Any]] = None, template_id=None, network_volume_id=None, allowed_cuda_versions: Optional[List[str]] = None, diff --git a/src/dstack/_internal/core/backends/runpod/compute.py b/src/dstack/_internal/core/backends/runpod/compute.py index 047f8e5c2..20a0383d8 100644 --- a/src/dstack/_internal/core/backends/runpod/compute.py +++ b/src/dstack/_internal/core/backends/runpod/compute.py @@ -123,7 +123,7 @@ def update_provisioning_data( provisioning_data.ssh_port = port["publicPort"] -def get_docker_args(authorized_keys): +def get_docker_args(authorized_keys: List[str]) -> str: commands = get_docker_commands(authorized_keys, False) command = " && ".join(commands) command_escaped = command.replace('"', '\\"') From 61a0c3de7577fda99f3af9352258714d196828d9 Mon Sep 17 00:00:00 2001 From: Bihan Rana Date: Fri, 3 May 2024 16:04:10 +0545 Subject: [PATCH 5/5] Combine on-demand and spot mutations --- .../core/backends/runpod/api_client.py | 138 +----------------- 1 file changed, 6 insertions(+), 132 deletions(-) diff --git a/src/dstack/_internal/core/backends/runpod/api_client.py b/src/dstack/_internal/core/backends/runpod/api_client.py index fb8fb0145..5bb0b325a 100644 --- a/src/dstack/_internal/core/backends/runpod/api_client.py +++ b/src/dstack/_internal/core/backends/runpod/api_client.py @@ -73,30 +73,7 @@ def create_pod( template_id, network_volume_id, allowed_cuda_versions, - ) - if bid_per_gpu is None - else generate_pod_rent_interruptable_mutation( bid_per_gpu, - name, - image_name, - gpu_type_id, - cloud_type, - support_public_ip, - start_ssh, - data_center_id, - country_code, - gpu_count, - volume_in_gb, - container_disk_in_gb, - min_vcpu_count, - min_memory_in_gb, - docker_args, - ports, - volume_mount_path, - env, - template_id, - network_volume_id, - allowed_cuda_versions, ) } ) @@ -225,120 +202,14 @@ def generate_pod_deployment_mutation( template_id=None, network_volume_id=None, allowed_cuda_versions: Optional[List[str]] = None, + bid_per_gpu: Optional[float] = None, ) -> str: """ - Generates a mutation to deploy a pod on demand. - """ - input_fields = [] - - # ------------------------------ Required Fields ----------------------------- # - input_fields.append(f'name: "{name}"') - input_fields.append(f'imageName: "{image_name}"') - input_fields.append(f'gpuTypeId: "{gpu_type_id}"') - - # ------------------------------ Default Fields ------------------------------ # - input_fields.append(f"cloudType: {cloud_type}") - - if start_ssh: - input_fields.append("startSsh: true") - - if support_public_ip: - input_fields.append("supportPublicIp: true") - else: - input_fields.append("supportPublicIp: false") - - # ------------------------------ Optional Fields ----------------------------- # - if data_center_id is not None: - input_fields.append(f'dataCenterId: "{data_center_id}"') - if country_code is not None: - input_fields.append(f'countryCode: "{country_code}"') - if gpu_count is not None: - input_fields.append(f"gpuCount: {gpu_count}") - if volume_in_gb is not None: - input_fields.append(f"volumeInGb: {volume_in_gb}") - if container_disk_in_gb is not None: - input_fields.append(f"containerDiskInGb: {container_disk_in_gb}") - if min_vcpu_count is not None: - input_fields.append(f"minVcpuCount: {min_vcpu_count}") - if min_memory_in_gb is not None: - input_fields.append(f"minMemoryInGb: {min_memory_in_gb}") - if docker_args is not None: - input_fields.append(f'dockerArgs: "{docker_args}"') - if ports is not None: - ports = ports.replace(" ", "") - input_fields.append(f'ports: "{ports}"') - if volume_mount_path is not None: - input_fields.append(f'volumeMountPath: "{volume_mount_path}"') - if env is not None: - env_string = ", ".join( - [f'{{ key: "{key}", value: "{value}" }}' for key, value in env.items()] - ) - input_fields.append(f"env: [{env_string}]") - if template_id is not None: - input_fields.append(f'templateId: "{template_id}"') - - if network_volume_id is not None: - input_fields.append(f'networkVolumeId: "{network_volume_id}"') - - if allowed_cuda_versions is not None: - allowed_cuda_versions_string = ", ".join( - [f'"{version}"' for version in allowed_cuda_versions] - ) - input_fields.append(f"allowedCudaVersions: [{allowed_cuda_versions_string}]") - - # Format input fields - input_string = ", ".join(input_fields) - - return f""" - mutation {{ - podFindAndDeployOnDemand( - input: {{ - {input_string} - }} - ) {{ - id - desiredStatus - imageName - env - machineId - machine {{ - podHostId - }} - }} - }} - """ - - -def generate_pod_rent_interruptable_mutation( - bid_per_gpu: float, - name: str, - image_name: str, - gpu_type_id: str, - cloud_type: str = "ALL", - support_public_ip: bool = True, - start_ssh: bool = True, - data_center_id=None, - country_code=None, - gpu_count=None, - volume_in_gb=None, - container_disk_in_gb=None, - min_vcpu_count=None, - min_memory_in_gb=None, - docker_args=None, - ports=None, - volume_mount_path=None, - env: Optional[Dict[str, Any]] = None, - template_id=None, - network_volume_id=None, - allowed_cuda_versions: Optional[List[str]] = None, -) -> str: - """ - Generates a mutation to deploy a pod on demand. + Generates a mutation to deploy pod. """ input_fields = [] # ------------------------------ Required Fields ----------------------------- # - input_fields.append(f"bidPerGpu: {bid_per_gpu}") input_fields.append(f'name: "{name}"') input_fields.append(f'imageName: "{image_name}"') input_fields.append(f'gpuTypeId: "{gpu_type_id}"') @@ -355,6 +226,8 @@ def generate_pod_rent_interruptable_mutation( input_fields.append("supportPublicIp: false") # ------------------------------ Optional Fields ----------------------------- # + if bid_per_gpu is not None: + input_fields.append(f"bidPerGpu: {bid_per_gpu}") if data_center_id is not None: input_fields.append(f'dataCenterId: "{data_center_id}"') if country_code is not None: @@ -393,11 +266,12 @@ def generate_pod_rent_interruptable_mutation( ) input_fields.append(f"allowedCudaVersions: [{allowed_cuda_versions_string}]") + pod_deploy = "podFindAndDeployOnDemand" if bid_per_gpu is None else "podRentInterruptable" # Format input fields input_string = ", ".join(input_fields) return f""" mutation {{ - podRentInterruptable( + {pod_deploy}( input: {{ {input_string} }}