Skip to content

Commit

Permalink
feat!: add windows support to worker fixture (#115)
Browse files Browse the repository at this point in the history
Signed-off-by: Charles Moore <[email protected]>
  • Loading branch information
moorec-aws authored Jun 17, 2024
1 parent 91fb6da commit ef7f133
Show file tree
Hide file tree
Showing 5 changed files with 219 additions and 43 deletions.
2 changes: 2 additions & 0 deletions src/deadline_test_fixtures/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
PosixSessionUser,
S3Object,
ServiceModel,
OperatingSystem,
)
from ._version import __version__ as version # noqa

Expand All @@ -60,6 +61,7 @@
"PosixSessionUser",
"S3Object",
"ServiceModel",
"OperatingSystem",
"Queue",
"QueueFleetAssociation",
"TaskStatus",
Expand Down
198 changes: 159 additions & 39 deletions src/deadline_test_fixtures/deadline/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from ..models import (
PipInstall,
PosixSessionUser,
OperatingSystem,
)
from ..util import call_api, wait_for

Expand All @@ -31,10 +32,11 @@
DOCKER_CONTEXT_DIR = os.path.join(os.path.dirname(__file__), "..", "containers", "worker")


def configure_worker_command(*, config: DeadlineWorkerConfiguration) -> str: # pragma: no cover
def linux_worker_command(config: DeadlineWorkerConfiguration) -> str: # pragma: no cover
"""Get the command to configure the Worker. This must be run as root."""

cmds = [
config.worker_agent_install.install_command,
config.worker_agent_install.install_command_for_linux,
*(config.pre_install_commands or []),
# fmt: off
(
Expand Down Expand Up @@ -67,6 +69,54 @@ def configure_worker_command(*, config: DeadlineWorkerConfiguration) -> str: #
return " && ".join(cmds)


def windows_worker_command(config: DeadlineWorkerConfiguration) -> str: # pragma: no cover
"""Get the command to configure the Worker. This must be run as root."""

cmds = [
config.worker_agent_install.install_command_for_windows,
*(config.pre_install_commands or []),
# fmt: off
(
"install-deadline-worker "
+ "-y "
+ f"--farm-id {config.farm_id} "
+ f"--fleet-id {config.fleet_id} "
+ f"--region {config.region} "
+ f"--user {config.user} "
+ f"{'--allow-shutdown ' if config.allow_shutdown else ''}"
+ "--start"
),
# fmt: on
]

if config.service_model_path:
cmds.append(
f"aws configure add-model --service-model file://{config.service_model_path} --service-name deadline; "
f"Copy-Item -Path ~\\.aws\\* -Destination C:\\Users\\Administrator\\.aws\\models -Recurse; "
f"Copy-Item -Path ~\\.aws\\* -Destination C:\\Users\\{config.user}\\.aws\\models -Recurse; "
f"Copy-Item -Path ~\\.aws\\* -Destination C:\\Users\\jobuser\\.aws\\models -Recurse"
)

if os.environ.get("AWS_ENDPOINT_URL_DEADLINE"):
LOG.info(f"Using AWS_ENDPOINT_URL_DEADLINE: {os.environ.get('AWS_ENDPOINT_URL_DEADLINE')}")
cmds.insert(
0,
f"[System.Environment]::SetEnvironmentVariable('AWS_ENDPOINT_URL_DEADLINE', '{os.environ.get('AWS_ENDPOINT_URL_DEADLINE')}', [System.EnvironmentVariableTarget]::Machine); "
"$env:AWS_ENDPOINT_URL_DEADLINE = [System.Environment]::GetEnvironmentVariable('AWS_ENDPOINT_URL_DEADLINE','Machine')",
)

return "; ".join(cmds)


def configure_worker_command(*, config: DeadlineWorkerConfiguration) -> str: # pragma: no cover
"""Get the command to configure the Worker. This must be run as root."""

if config.operating_system.name == "AL2023":
return linux_worker_command(config)
else:
return windows_worker_command(config)


class DeadlineWorker(abc.ABC):
@abc.abstractmethod
def start(self) -> None:
Expand Down Expand Up @@ -121,6 +171,7 @@ def __str__(self) -> str:

@dataclass(frozen=True)
class DeadlineWorkerConfiguration:
operating_system: OperatingSystem
farm_id: str
fleet_id: str
region: str
Expand All @@ -143,6 +194,7 @@ class DeadlineWorkerConfiguration:
@dataclass
class EC2InstanceWorker(DeadlineWorker):
AL2023_AMI_NAME: ClassVar[str] = "al2023-ami-kernel-6.1-x86_64"
WIN2022_AMI_NAME: ClassVar[str] = "Windows_Server-2022-English-Full-Base"

subnet_id: str
security_group_id: str
Expand Down Expand Up @@ -188,17 +240,23 @@ def send_command(self, command: str) -> CommandResult:
#
# If we send an SSM command then we will get an InvalidInstanceId error
# if the instance isn't in that state.
NUM_RETRIES = 10
SLEEP_INTERVAL_S = 5
NUM_RETRIES = 20
SLEEP_INTERVAL_S = 10
for i in range(0, NUM_RETRIES):
LOG.info(f"Sending SSM command to instance {self.instance_id}")
try:
send_command_response = self.ssm_client.send_command(
InstanceIds=[self.instance_id],
DocumentName="AWS-RunShellScript",
Parameters={"commands": [command]},
)
# Successfully sent. Bail out of the loop.
if self.configuration.operating_system.name == "AL2023":
send_command_response = self.ssm_client.send_command(
InstanceIds=[self.instance_id],
DocumentName="AWS-RunShellScript",
Parameters={"commands": [command]},
)
else:
send_command_response = self.ssm_client.send_command(
InstanceIds=[self.instance_id],
DocumentName="AWS-RunPowerShellScript",
Parameters={"commands": [command]},
)
break
except botocore.exceptions.ClientError as error:
error_code = error.response["Error"]["Code"]
Expand Down Expand Up @@ -274,21 +332,17 @@ def _stage_s3_bucket(self) -> list[tuple[str, str]] | None:

return list(s3_to_dst_mapping.items())

def _launch_instance(self, *, s3_files: list[tuple[str, str]] | None = None) -> None:
assert (
not self.instance_id
), "Attempted to launch EC2 instance when one was already launched"

def linux_userdata(self, s3_files) -> str:
copy_s3_command = ""
job_users_cmds = []

if s3_files:
copy_s3_command = " && ".join(
[
f"aws s3 cp {s3_uri} {dst} && chown {self.configuration.user} {dst}"
for s3_uri, dst in s3_files
]
)

job_users_cmds = []
for job_user in self.configuration.job_users:
job_users_cmds.append(f"groupadd {job_user.group}")
job_users_cmds.append(
Expand All @@ -308,6 +362,50 @@ def _launch_instance(self, *, s3_files: list[tuple[str, str]] | None = None) ->

configure_job_users = "\n".join(job_users_cmds)

userdata = f"""#!/bin/bash
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
set -x
groupadd --system {self.configuration.group}
useradd --create-home --system --shell=/bin/bash --groups={self.configuration.group} {self.configuration.user}
{configure_job_users}
{copy_s3_command}
runuser --login {self.configuration.user} --command 'python3 -m venv $HOME/.venv && echo ". $HOME/.venv/bin/activate" >> $HOME/.bashrc'
"""

return userdata

def windows_userdata(self, s3_files) -> str:
copy_s3_command = ""
if s3_files:
copy_s3_command = " ; ".join([f"aws s3 cp {s3_uri} {dst}" for s3_uri, dst in s3_files])

userdata = f"""<powershell>
Invoke-WebRequest -Uri "https://www.python.org/ftp/python/3.11.9/python-3.11.9-amd64.exe" -OutFile "C:\python-3.11.9-amd64.exe"
Start-Process -FilePath "C:\python-3.11.9-amd64.exe" -ArgumentList "/quiet InstallAllUsers=1 PrependPath=1 AppendPath=1" -Wait
Invoke-WebRequest -Uri "https://awscli.amazonaws.com/AWSCLIV2.msi" -Outfile "C:\AWSCLIV2.msi"
Start-Process msiexec.exe -ArgumentList "/i C:\AWSCLIV2.msi /quiet" -Wait
$env:Path = [System.Environment]::GetEnvironmentVariable("Path","Machine")
$secret = aws secretsmanager get-secret-value --secret-id WindowsPasswordSecret --query SecretString --output text | ConvertFrom-Json
$password = ConvertTo-SecureString -String $($secret.password) -AsPlainText -Force
New-LocalUser -Name "jobuser" -Password $password -FullName "jobuser" -Description "job user"
$Cred = New-Object System.Management.Automation.PSCredential "jobuser", $password
Start-Process cmd.exe -Credential $Cred -ArgumentList "/C" -LoadUserProfile -NoNewWindow
{copy_s3_command}
</powershell>"""

return userdata

def _launch_instance(self, *, s3_files: list[tuple[str, str]] | None = None) -> None:
assert (
not self.instance_id
), "Attempted to launch EC2 instance when one was already launched"

if self.configuration.operating_system.name == "AL2023":
userdata = self.linux_userdata(s3_files)
else:
userdata = self.windows_userdata(s3_files)

LOG.info("Launching EC2 instance")
run_instance_response = self.ec2_client.run_instances(
MinCount=1,
Expand All @@ -329,16 +427,7 @@ def _launch_instance(self, *, s3_files: list[tuple[str, str]] | None = None) ->
],
}
],
UserData=f"""#!/bin/bash
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
set -x
groupadd --system {self.configuration.group}
useradd --create-home --system --shell=/bin/bash --groups={self.configuration.group} {self.configuration.user}
{configure_job_users}
{copy_s3_command}
runuser --login {self.configuration.user} --command 'python3 -m venv $HOME/.venv && echo ". $HOME/.venv/bin/activate" >> $HOME/.bashrc'
""",
UserData=userdata,
)

self.instance_id = run_instance_response["Instances"][0]["InstanceId"]
Expand All @@ -352,10 +441,7 @@ def _launch_instance(self, *, s3_files: list[tuple[str, str]] | None = None) ->
)
LOG.info(f"EC2 instance {self.instance_id} status is OK")

def _start_worker_agent(self) -> None: # pragma: no cover
assert self.instance_id

LOG.info(f"Sending SSM command to configure Worker agent on instance {self.instance_id}")
def start_linux_worker(self) -> None:
cmd_result = self.send_command(
f"cd /home/{self.configuration.user}; . .venv/bin/activate; AWS_DEFAULT_REGION={self.configuration.region} {configure_worker_command(config=self.configuration)}"
)
Expand All @@ -378,6 +464,33 @@ def _start_worker_agent(self) -> None: # pragma: no cover
assert cmd_result.exit_code == 0, f"Failed to start Worker agent: {cmd_result}"
LOG.info("Successfully started Worker agent")

def start_windows_worker(self) -> None:
cmd_result = self.send_command(f"{configure_worker_command(config=self.configuration)}")
LOG.info("Successfully configured Worker agent")
LOG.info("Sending SSM Command to check if Worker Agent is running")
cmd_result = self.send_command(
" ; ".join(
[
"echo Waiting 20s for the agent service to get started",
"sleep 20",
"echo 'Running running Get-Process to check if the agent is running'",
"IF(Get-Process pythonservice){echo 'service is running'}ELSE{exit 1}",
]
),
)
assert cmd_result.exit_code == 0, f"Failed to start Worker agent: {cmd_result}"
LOG.info("Successfully started Worker agent")

def _start_worker_agent(self) -> None: # pragma: no cover
assert self.instance_id

LOG.info(f"Sending SSM command to configure Worker agent on instance {self.instance_id}")

if self.configuration.operating_system.name == "AL2023":
self.start_linux_worker()
else:
self.start_windows_worker()

@property
def worker_id(self) -> str:
cmd_result = self.send_command("cat /var/lib/deadline/worker.json | jq -r '.worker_id'")
Expand All @@ -392,13 +505,21 @@ def worker_id(self) -> str:
@property
def ami_id(self) -> str:
if not hasattr(self, "_ami_id"):
# Grab the latest AL2023 AMI
# https://aws.amazon.com/blogs/compute/query-for-the-latest-amazon-linux-ami-ids-using-aws-systems-manager-parameter-store/
ssm_param_name = (
f"/aws/service/ami-amazon-linux-latest/{EC2InstanceWorker.AL2023_AMI_NAME}"
)
if self.configuration.operating_system.name == "AL2023":
# Grab the latest AL2023 AMI
# https://aws.amazon.com/blogs/compute/query-for-the-latest-amazon-linux-ami-ids-using-aws-systems-manager-parameter-store/
ssm_param_name = (
f"/aws/service/ami-amazon-linux-latest/{EC2InstanceWorker.AL2023_AMI_NAME}"
)
else:
# Grab the latest Windows Server 2022 AMI
# https://aws.amazon.com/blogs/mt/query-for-the-latest-windows-ami-using-systems-manager-parameter-store/
ssm_param_name = (
f"/aws/service/ami-windows-latest/{EC2InstanceWorker.WIN2022_AMI_NAME}"
)

response = call_api(
description=f"Getting latest AL2023 AMI ID from SSM parameter {ssm_param_name}",
description=f"Getting latest {self.configuration.operating_system.name} AMI ID from SSM parameter {ssm_param_name}",
fn=lambda: self.ssm_client.get_parameters(Names=[ssm_param_name]),
)

Expand All @@ -407,7 +528,7 @@ def ami_id(self) -> str:
len(parameters) == 1
), f"Received incorrect number of SSM parameters. Expected 1, got response: {response}"
self._ami_id = parameters[0]["Value"]
LOG.info(f"Using latest AL2023 AMI {self._ami_id}")
LOG.info(f"Using latest {self.configuration.operating_system.name} AMI {self._ami_id}")

return self._ami_id

Expand All @@ -425,7 +546,6 @@ def __post_init__(self) -> None:
def start(self) -> None:
self._tmpdir = pathlib.Path(tempfile.mkdtemp())

# TODO: Support multiple job users on Docker
assert (
len(self.configuration.job_users) == 1
), f"Multiple job users not supported on Docker worker: {self.configuration.job_users}"
Expand Down
23 changes: 21 additions & 2 deletions src/deadline_test_fixtures/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
PosixSessionUser,
ServiceModel,
S3Object,
OperatingSystem,
)
from .cloudformation import WorkerBootstrapStack
from .job_attachment_manager import JobAttachmentManager
Expand Down Expand Up @@ -382,6 +383,7 @@ def worker_config(
codeartifact: CodeArtifactRepositoryInfo,
service_model: ServiceModel,
region: str,
operating_system: OperatingSystem,
) -> Generator[DeadlineWorkerConfiguration, None, None]:
"""
Builds the configuration for a DeadlineWorker.
Expand Down Expand Up @@ -418,7 +420,12 @@ def worker_config(
), f"Expected exactly one Worker agent whl path, but got {resolved_whl_paths} (from pattern {worker_agent_whl_path})"
resolved_whl_path = resolved_whl_paths[0]

dest_path = posixpath.join("/tmp", os.path.basename(resolved_whl_path))
if operating_system == "AL2023":
dest_path = posixpath.join("/tmp", os.path.basename(resolved_whl_path))
else:
dest_path = posixpath.join(
"%USERPROFILE%\\AppData\\Local\\Temp", os.path.basename(resolved_whl_path)
)
file_mappings = [(resolved_whl_path, dest_path)]

LOG.info(f"The whl file will be copied to {dest_path} on the Worker environment")
Expand All @@ -438,7 +445,10 @@ def worker_config(
with src_path.open(mode="w") as f:
json.dump(service_model.model, f)

dst_path = posixpath.join("/tmp", src_path.name)
if operating_system == "AL2023":
dst_path = posixpath.join("/tmp", src_path.name)
else:
dst_path = posixpath.join("%USERPROFILE%\\AppData\\Local\\Temp", src_path.name)
LOG.info(f"The service model will be copied to {dst_path} on the Worker environment")
file_mappings.append((str(src_path), dst_path))

Expand All @@ -455,6 +465,7 @@ def worker_config(
),
service_model_path=dst_path,
file_mappings=file_mappings or None,
operating_system=operating_system,
)


Expand Down Expand Up @@ -581,3 +592,11 @@ def _find_latest_service_model_file(service_name: str) -> str:
f"Expected exactly one file to match glob '{service_model_path}.*, but got: {service_model_files}"
)
return service_model_files[0]


@pytest.fixture(scope="session")
def operating_system(request) -> OperatingSystem:
if request.param == "linux":
return OperatingSystem(name="AL2023")
else:
return OperatingSystem(name="WIN2022")
Loading

0 comments on commit ef7f133

Please sign in to comment.