Skip to content

Commit

Permalink
feat: create gpu job sample (#11893)
Browse files Browse the repository at this point in the history
* feat: create gpu job sample

* Correct param dependency

* fix: Changed approach to add accelerators

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

---------

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
BigBlackWolf and gcf-owl-bot[bot] authored Jun 27, 2024
1 parent a257ea3 commit 9a23fe1
Show file tree
Hide file tree
Showing 5 changed files with 231 additions and 8 deletions.
100 changes: 100 additions & 0 deletions batch/create/create_gpu_with_script_no_mounting.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# [START batch_create_gpu_job]
from google.cloud import batch_v1


def create_gpu_job(project_id: str, region: str, job_name: str) -> batch_v1.Job:
"""
This method shows how to create a sample Batch Job that will run
a simple command on Cloud Compute instances on GPU machines.
Args:
project_id: project ID or project number of the Cloud project you want to use.
region: name of the region you want to use to run the job. Regions that are
available for Batch are listed on: https://cloud.google.com/batch/docs/get-started#locations
job_name: the name of the job that will be created.
It needs to be unique for each project and region pair.
Returns:
A job object representing the job created.
"""
client = batch_v1.BatchServiceClient()

# Define what will be done as part of the job.
task = batch_v1.TaskSpec()
runnable = batch_v1.Runnable()
runnable.script = batch_v1.Runnable.Script()
runnable.script.text = "echo Hello world! This is task ${BATCH_TASK_INDEX}. This job has a total of ${BATCH_TASK_COUNT} tasks."
# You can also run a script from a file. Just remember, that needs to be a script that's
# already on the VM that will be running the job. Using runnable.script.text and runnable.script.path is mutually
# exclusive.
# runnable.script.path = '/tmp/test.sh'
task.runnables = [runnable]

# We can specify what resources are requested by each task.
resources = batch_v1.ComputeResource()
resources.cpu_milli = 2000 # in milliseconds per cpu-second. This means the task requires 2 whole CPUs.
resources.memory_mib = 16 # in MiB
task.compute_resource = resources

task.max_retry_count = 2
task.max_run_duration = "3600s"

# Tasks are grouped inside a job using TaskGroups.
# Currently, it's possible to have only one task group.
group = batch_v1.TaskGroup()
group.task_count = 4
group.task_spec = task

# Policies are used to define on what kind of virtual machines the tasks will run on.
# In this case, we tell the system to use "g2-standard-4" machine type.
# Read more about machine types here: https://cloud.google.com/compute/docs/machine-types
policy = batch_v1.AllocationPolicy.InstancePolicy()
policy.machine_type = "g2-standard-4"

instances = batch_v1.AllocationPolicy.InstancePolicyOrTemplate()
instances.policy = policy
instances.install_gpu_drivers = True
allocation_policy = batch_v1.AllocationPolicy()
allocation_policy.instances = [instances]

job = batch_v1.Job()
job.task_groups = [group]
job.allocation_policy = allocation_policy
job.labels = {"env": "testing", "type": "container"}
# We use Cloud Logging as it's an out of the box available option
job.logs_policy = batch_v1.LogsPolicy()
job.logs_policy.destination = batch_v1.LogsPolicy.Destination.CLOUD_LOGGING

create_request = batch_v1.CreateJobRequest()
create_request.job = job
create_request.job_id = job_name
# The job's parent is the region in which the job will run
create_request.parent = f"projects/{project_id}/locations/{region}"

return client.create_job(create_request)


# [END batch_create_gpu_job]


if __name__ == "__main__":
import google.auth

PROJECT = google.auth.default()[1]
REGION = "us-east1"
job = create_gpu_job(PROJECT, REGION, "gpu-job-batch")
print(job)
115 changes: 115 additions & 0 deletions batch/create/create_with_gpu_no_mounting.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# [START batch_create_gpu_job]
from google.cloud import batch_v1


def create_gpu_job(
project_id: str, region: str, zone: str, job_name: str
) -> batch_v1.Job:
"""
This method shows how to create a sample Batch Job that will run
a simple command on Cloud Compute instances on GPU machines.
Args:
project_id: project ID or project number of the Cloud project you want to use.
region: name of the region you want to use to run the job. Regions that are
available for Batch are listed on: https://cloud.google.com/batch/docs/get-started#locations
zone: name of the zone you want to use to run the job. Important in regard to GPUs availability.
GPUs availability can be found here: https://cloud.google.com/compute/docs/gpus/gpu-regions-zones
job_name: the name of the job that will be created.
It needs to be unique for each project and region pair.
Returns:
A job object representing the job created.
"""
client = batch_v1.BatchServiceClient()

# Define what will be done as part of the job.
task = batch_v1.TaskSpec()
runnable = batch_v1.Runnable()
runnable.script = batch_v1.Runnable.Script()
runnable.script.text = "echo Hello world! This is task ${BATCH_TASK_INDEX}. This job has a total of ${BATCH_TASK_COUNT} tasks."
# You can also run a script from a file. Just remember, that needs to be a script that's
# already on the VM that will be running the job. Using runnable.script.text and runnable.script.path is mutually
# exclusive.
# runnable.script.path = '/tmp/test.sh'
task.runnables = [runnable]

# We can specify what resources are requested by each task.
resources = batch_v1.ComputeResource()
resources.cpu_milli = 2000 # in milliseconds per cpu-second. This means the task requires 2 whole CPUs.
resources.memory_mib = 16 # in MiB
task.compute_resource = resources

task.max_retry_count = 2
task.max_run_duration = "3600s"

# Tasks are grouped inside a job using TaskGroups.
# Currently, it's possible to have only one task group.
group = batch_v1.TaskGroup()
group.task_count = 4
group.task_spec = task

# Policies are used to define on what kind of virtual machines the tasks will run on.
# Read more about machine types here: https://cloud.google.com/compute/docs/machine-types
policy = batch_v1.AllocationPolicy.InstancePolicy()
policy.machine_type = "n1-standard-16"

accelerator = batch_v1.AllocationPolicy.Accelerator()
# Note: not every accelerator is compatible with instance type
# Read more here: https://cloud.google.com/compute/docs/gpus#t4-gpus
accelerator.type_ = "nvidia-tesla-t4"
accelerator.count = 1

policy.accelerators = [accelerator]
instances = batch_v1.AllocationPolicy.InstancePolicyOrTemplate()
instances.policy = policy
instances.install_gpu_drivers = True
allocation_policy = batch_v1.AllocationPolicy()
allocation_policy.instances = [instances]

location = batch_v1.AllocationPolicy.LocationPolicy()
location.allowed_locations = ["zones/us-central1-b"]
allocation_policy.location = location

job = batch_v1.Job()
job.task_groups = [group]
job.allocation_policy = allocation_policy
job.labels = {"env": "testing", "type": "container"}
# We use Cloud Logging as it's an out of the box available option
job.logs_policy = batch_v1.LogsPolicy()
job.logs_policy.destination = batch_v1.LogsPolicy.Destination.CLOUD_LOGGING

create_request = batch_v1.CreateJobRequest()
create_request.job = job
create_request.job_id = job_name
# The job's parent is the region in which the job will run
create_request.parent = f"projects/{project_id}/locations/{region}"

return client.create_job(create_request)


# [END batch_create_gpu_job]


if __name__ == "__main__":
import google.auth

PROJECT = google.auth.default()[1]
REGION = "europe-central2"
ZONE = "europe-central2-b"
job = create_gpu_job(PROJECT, REGION, ZONE, "gpu-job-batch")
print(job)
20 changes: 14 additions & 6 deletions batch/tests/test_basics.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import pytest

from ..create.create_with_container_no_mounting import create_container_job
from ..create.create_with_gpu_no_mounting import create_gpu_job
from ..create.create_with_script_no_mounting import create_script_job

from ..delete.delete_job import delete_job
Expand All @@ -34,7 +35,8 @@
from ..logs.read_job_logs import print_job_logs

PROJECT = google.auth.default()[1]
REGION = "europe-north1"
REGION = "europe-central2"
ZONE = "europe-central2-b"

TIMEOUT = 600 # 10 minutes

Expand All @@ -52,20 +54,20 @@ def job_name():
return f"test-job-{uuid.uuid4().hex[:10]}"


def _test_body(test_job: batch_v1.Job, additional_test: Callable = None):
def _test_body(test_job: batch_v1.Job, additional_test: Callable = None, region=REGION):
start_time = time.time()
try:
while test_job.status.state in WAIT_STATES:
if time.time() - start_time > TIMEOUT:
pytest.fail("Timed out while waiting for job to complete!")
test_job = get_job(
PROJECT, REGION, test_job.name.rsplit("/", maxsplit=1)[1]
PROJECT, region, test_job.name.rsplit("/", maxsplit=1)[1]
)
time.sleep(5)

assert test_job.status.state == batch_v1.JobStatus.State.SUCCEEDED

for job in list_jobs(PROJECT, REGION):
for job in list_jobs(PROJECT, region):
if test_job.uid == job.uid:
break
else:
Expand All @@ -74,9 +76,9 @@ def _test_body(test_job: batch_v1.Job, additional_test: Callable = None):
if additional_test:
additional_test()
finally:
delete_job(PROJECT, REGION, test_job.name.rsplit("/", maxsplit=1)[1]).result()
delete_job(PROJECT, region, test_job.name.rsplit("/", maxsplit=1)[1]).result()

for job in list_jobs(PROJECT, REGION):
for job in list_jobs(PROJECT, region):
if job.uid == test_job.uid:
pytest.fail("The test job should be deleted at this point!")

Expand Down Expand Up @@ -110,3 +112,9 @@ def test_script_job(job_name, capsys):
def test_container_job(job_name):
job = create_container_job(PROJECT, REGION, job_name)
_test_body(job, additional_test=lambda: _check_tasks(job_name))


@flaky(max_runs=3, min_passes=1)
def test_create_gpu_job(job_name):
job = create_gpu_job(PROJECT, REGION, ZONE, job_name)
_test_body(job, additional_test=lambda: _check_tasks)
2 changes: 1 addition & 1 deletion batch/tests/test_bucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,4 @@ def _test_bucket_content(test_bucket):
@flaky(max_runs=3, min_passes=1)
def test_bucket_job(job_name, test_bucket):
job = create_script_job_with_bucket(PROJECT, REGION, job_name, test_bucket)
_test_body(job, lambda: _test_bucket_content(test_bucket))
_test_body(job, lambda: _test_bucket_content(test_bucket), REGION)
2 changes: 1 addition & 1 deletion batch/tests/test_template.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,4 +124,4 @@ def test_template_job(job_name, instance_template):
job = create_script_job_with_template(
PROJECT, REGION, job_name, instance_template.self_link
)
_test_body(job)
_test_body(job, region=REGION)

0 comments on commit 9a23fe1

Please sign in to comment.