Skip to content

Commit

Permalink
debug and fully expand tests
Browse files Browse the repository at this point in the history
  • Loading branch information
CodyCBakerPhD committed Aug 5, 2024
1 parent adcf16e commit 720bb91
Show file tree
Hide file tree
Showing 2 changed files with 159 additions and 42 deletions.
86 changes: 55 additions & 31 deletions src/neuroconv/tools/aws/_submit_aws_batch_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,7 @@ def submit_aws_batch_job(
The name of the job queue to use for the job.
job_definition_name : str, optional
The name of the job definition to use for the job.
Defaults to f"neuroconv_batch_{ name of docker image }",
but replaces any colons from tags in the docker image name with underscores.
If unspecified, a name starting with 'neuroconv_batch_' will be generated.
minimum_worker_ram_in_gib : int, default: 4
The minimum amount of base worker memory required to run this job.
Determines the EC2 instance type selected by the automatic 'best fit' selector.
Expand Down Expand Up @@ -135,7 +134,14 @@ def submit_aws_batch_job(
# Ensure all job submission requirements are met
_ensure_compute_environment_exists(compute_environment_name=compute_environment_name, batch_client=batch_client)
_ensure_job_queue_exists(job_queue_name=job_queue_name, batch_client=batch_client)

job_definition_name = job_definition_name or _generate_job_definition_name(
docker_image=docker_image,
minimum_worker_ram_in_gib=minimum_worker_ram_in_gib,
minimum_worker_cpus=minimum_worker_cpus,
)
_ensure_job_definition_exists(
job_definition_name=job_definition_name,
docker_image=docker_image,
minimum_worker_ram_in_gib=minimum_worker_ram_in_gib,
minimum_worker_cpus=minimum_worker_cpus,
Expand Down Expand Up @@ -187,11 +193,6 @@ def _attempt_to_load_region_from_config() -> Optional[str]:
Returns `None` if any issue is encountered.
Parameters
----------
file_path : str
The path to the configuration file.
Returns
-------
region : str or None
Expand All @@ -207,7 +208,7 @@ def _attempt_to_load_region_from_config() -> Optional[str]:
if not default_config_file_path.exists():
return None

with open(file=file_path, mode="r") as io:
with open(file=default_config_file_path, mode="r") as io:
all_lines = io.readlines()
lines_with_region = [line for line in all_lines if "region" == line[:7]]

Expand All @@ -219,15 +220,10 @@ def _attempt_to_load_region_from_config() -> Optional[str]:
return region


def _attempt_to_load_aws_credentials_from_config(file_path: str) -> Tuple[Optional[str], Optional[str]]:
def _attempt_to_load_aws_credentials_from_config() -> Tuple[Optional[str], Optional[str]]:
"""
Attempts to load the AWS credentials from a configuration file.
Parameters
----------
file_path : str
The path to the configuration file.
Returns
-------
aws_access_key_id : str or None
Expand All @@ -238,15 +234,15 @@ def _attempt_to_load_aws_credentials_from_config(file_path: str) -> Tuple[Option
- No line starts with either key.
- Multiple lines start with either key.
"""
default_config_file_path = Path.home() / ".aws" / "config"
default_credentials_file_path = Path.home() / ".aws" / "credentials"

if not default_config_file_path.exists():
if not default_credentials_file_path.exists():
return (None, None)

with open(file=file_path, mode="r") as io:
with open(file=default_credentials_file_path, mode="r") as io:
all_lines = io.readlines()
lines_with_access_key_id = [line for line in all_lines if "aws_access_key_id" == line[:16]]
lines_with_secret_access_key = [line for line in all_lines if "aws_secret_access_key" == line[:20]]
lines_with_access_key_id = [line for line in all_lines if "aws_access_key_id" == line[:17]]
lines_with_secret_access_key = [line for line in all_lines if "aws_secret_access_key" == line[:21]]

if len(lines_with_access_key_id) != 1 or len(lines_with_secret_access_key) != 1:
return (None, None)
Expand Down Expand Up @@ -389,19 +385,16 @@ def _ensure_job_queue_exists(*, job_queue_name: str, batch_client: "boto3.client
return None


def _ensure_job_definition_exists(
def _generate_job_definition_name(
*,
docker_image: str,
minimum_worker_ram_in_gib: int,
minimum_worker_cpus: int,
role_info: dict,
batch_client: "boto3.client.Batch",
) -> None:
) -> str:
"""
Ensure that the job definition exists in AWS Batch.
Generate a job definition name for the AWS Batch job.
Automatically generates a job definition name using the docker image, its tags, and worker configuration.
The creation date is also appended if the docker image was not tagged or was tagged as 'latest'.
Note that Docker images don't strictly require a tag to be pulled or used - 'latest' is always used by default.
Parameters
----------
Expand All @@ -414,12 +407,7 @@ def _ensure_job_definition_exists(
minimum_worker_cpus : int
The minimum number of CPUs required to run this job.
A minimum of 4 is required, even if only one will be used in the actual process.
role_info : dict
The IAM role information for the job.
batch_client : boto3.client.Batch
The AWS Batch client to use for the job.
"""
# Images don't strictly need tags - 'latest' is always used by default
docker_tags = docker_image.split(":")[1:]
docker_tag = docker_tags[0] if len(docker_tags) > 1 else None
parsed_docker_image_name = docker_image.replace(":", "-") # AWS Batch does not allow colons in job definition names
Expand All @@ -432,6 +420,42 @@ def _ensure_job_definition_exists(
date = datetime.now().strftime("%Y-%m-%d")
job_definition_name += f"_created-on-{date}"

return job_definition_name


def _ensure_job_definition_exists(
*,
job_definition_name: str,
docker_image: str,
minimum_worker_ram_in_gib: int,
minimum_worker_cpus: int,
role_info: dict,
batch_client: "boto3.client.Batch",
) -> None:
"""
Ensure that the job definition exists in AWS Batch.
Automatically generates a job definition name using the docker image, its tags, and worker configuration.
The creation date is also appended if the docker image was not tagged or was tagged as 'latest'.
Parameters
----------
job_definition_name : str
The name of the job definition to use for the job.
docker_image : str
The name of the Docker image to use for the job.
minimum_worker_ram_in_gib : int
The minimum amount of base worker memory required to run this job.
Determines the EC2 instance type selected by the automatic 'best fit' selector.
Recommended to be several GiB to allow comfortable buffer space for data chunk iterators.
minimum_worker_cpus : int
The minimum number of CPUs required to run this job.
A minimum of 4 is required, even if only one will be used in the actual process.
role_info : dict
The IAM role information for the job.
batch_client : boto3.client.Batch
The AWS Batch client to use for the job.
"""
current_job_definitions = [
definition["jobDefinitionName"] for definition in batch_client.describe_job_definitions()["jobDefinitions"]
]
Expand Down
115 changes: 104 additions & 11 deletions tests/test_minimal/test_tools/aws_tools.py
Original file line number Diff line number Diff line change
@@ -1,45 +1,138 @@
import os
import time

import boto3

from neuroconv.tools.aws import submit_aws_batch_job


def test_submit_aws_batch_job():
aws_access_key_id = os.environ.get("AWS_ACCESS_KEY_ID", None)
aws_secret_access_key = os.environ.get("AWS_SECRET_ACCESS_KEY", None)

dynamodb_resource = boto3.resource(
service_name="dynamodb",
region_name=region,
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
)
batch_client = boto3.client(
service_name="batch",
region_name="us-east-2",
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
)

job_name = "test_submit_aws_batch_job"
docker_image = "ubuntu:latest"
command = "echo 'Testing NeuroConv AWS Batch submission."

submit_aws_batch_job(
info = submit_aws_batch_job(
job_name=job_name,
docker_image=docker_image,
command=command,
)

# TODO: check job ID from boto3
# TODO: check status on table, should be 'Job submitted...'
# Wait for AWS to process the job
time.sleep(60)

job_id = info["job_submission_info"]["jobId"]

all_jobs_response = batch_client.describe_jobs(jobs=[job_id])
assert all_jobs_response["ResponseMetadata"]["HTTPStatusCode"] == 200

jobs = jobs_response["jobs"]
assert len(jobs) == 1

job = jobs[0]
assert job["jobName"] == job_name
assert "neuroconv_batch_queue" in job["jobQueue"]
assert "neuroconv_batch_ubuntu-latest-image_4-GiB-RAM_4-CPU" in job["jobDefinition"]
assert job["status"] == "SUCCEEDED"

status_tracker_table_name = "neuroconv_batch_status_tracker"
table = dynamodb_resource.Table(name=status_tracker_table_name)
submission_id = info["table_submission_info"]["submission_id"]

table_item_response = table.get_item(Key={"id": submission_id})
assert table_item_response["ResponseMetadata"]["HTTPStatusCode"] == 200

table_item = table_item_response["Item"]
assert table_item["job_name"] == job_name
assert table_item["status"] == "Job submitted..."

table.update_item(
Key={"id": submission_id}, AttributeUpdates={"status": {"Action": "PUT", "Value": "Test passed."}}
)


def test_submit_aws_batch_job_with_dependencies():
job_name_1 = "test_submit_aws_batch_job_with_dependencies_1"
docker_image = "ubuntu:latest"
command_1 = "echo 'Testing NeuroConv AWS Batch submission."

info = submit_aws_batch_job(
job_info_1 = submit_aws_batch_job(
job_name=job_name_1,
docker_image=docker_image,
command=command_1,
)
job_submission_info = info["job_submission_info"]

# TODO: check job ID from boto3
# TODO: check status on table, should be 'Job submitted...'

job_name_2 = "test_submit_aws_batch_job_with_dependencies_1"
command_2 = "echo 'Testing NeuroConv AWS Batch submission with dependencies."
job_dependencies = [{"jobId": job_submission_info["jobId"], "type": "SEQUENTIAL"}]
submit_aws_batch_job(
job_info_2 = submit_aws_batch_job(
job_name=job_name_2,
docker_image=docker_image,
command=command_2,
job_dependencies=job_dependencies,
)

# TODO: check job ID from boto3
# TODO: check status on table, should be 'Job submitted...'
# Wait for AWS to process the jobs
time.sleep(120)

job_id_1 = job_info_1["job_submission_info"]["jobId"]
job_id_2 = job_info_2["job_submission_info"]["jobId"]

all_jobs_response = batch_client.describe_jobs(jobs=[job_id_1, job_id_2])
assert all_jobs_response["ResponseMetadata"]["HTTPStatusCode"] == 200

jobs_by_id = {job["jobId"]: job for job in jobs_response["jobs"]}
assert len(jobs_by_id) == 2

job_1 = jobs_by_id[job_id_1]
assert job_1["jobName"] == job_name_1
assert "neuroconv_batch_queue" in job_1["jobQueue"]
assert "neuroconv_batch_ubuntu-latest-image_4-GiB-RAM_4-CPU" in job_1["jobDefinition"]
assert job_1["status"] == "SUCCEEDED"

job_2 = jobs_by_id[job_id_2]
assert job_2["jobName"] == job_name_2
assert "neuroconv_batch_queue" in job_2["jobQueue"]
assert "neuroconv_batch_ubuntu-latest-image_4-GiB-RAM_4-CPU" in job_2["jobDefinition"]
assert job_2["status"] == "SUCCEEDED"

status_tracker_table_name = "neuroconv_batch_status_tracker"
table = dynamodb_resource.Table(name=status_tracker_table_name)

submission_id_1 = job_info_1["table_submission_info"]["submission_id"]
table_item_response_1 = table.get_item(Key={"id": submission_id_1})
assert table_item_response_1["ResponseMetadata"]["HTTPStatusCode"] == 200

table_item_1 = table_item_response_1["Item"]
assert table_item_1["job_name"] == job_name
assert table_item_1["status"] == "Job submitted..."

submission_id_2 = job_info_2["table_submission_info"]["submission_id"]
table_item_response_2 = table.get_item(Key={"id": submission_id_2})
assert table_item_response_2["ResponseMetadata"]["HTTPStatusCode"] == 200

table_item_2 = table_item_response_2["Item"]
assert table_item_2["job_name"] == job_name
assert table_item_2["status"] == "Job submitted..."

table.update_item(
Key={"id": submission_id_1}, AttributeUpdates={"status": {"Action": "PUT", "Value": "Test passed."}}
)
table.update_item(
Key={"id": submission_id_2}, AttributeUpdates={"status": {"Action": "PUT", "Value": "Test passed."}}
)

0 comments on commit 720bb91

Please sign in to comment.