diff --git a/src/deadline_test_fixtures/__init__.py b/src/deadline_test_fixtures/__init__.py index b4bb230..01ad83f 100644 --- a/src/deadline_test_fixtures/__init__.py +++ b/src/deadline_test_fixtures/__init__.py @@ -36,6 +36,7 @@ PosixSessionUser, S3Object, ServiceModel, + OperatingSystem, ) from ._version import __version__ as version # noqa @@ -60,6 +61,7 @@ "PosixSessionUser", "S3Object", "ServiceModel", + "OperatingSystem", "Queue", "QueueFleetAssociation", "TaskStatus", diff --git a/src/deadline_test_fixtures/deadline/worker.py b/src/deadline_test_fixtures/deadline/worker.py index 1da5dee..524f03f 100644 --- a/src/deadline_test_fixtures/deadline/worker.py +++ b/src/deadline_test_fixtures/deadline/worker.py @@ -21,6 +21,7 @@ from ..models import ( PipInstall, PosixSessionUser, + OperatingSystem, ) from ..util import call_api, wait_for @@ -31,10 +32,11 @@ DOCKER_CONTEXT_DIR = os.path.join(os.path.dirname(__file__), "..", "containers", "worker") -def configure_worker_command(*, config: DeadlineWorkerConfiguration) -> str: # pragma: no cover +def linux_worker_command(config: DeadlineWorkerConfiguration) -> str: # pragma: no cover """Get the command to configure the Worker. This must be run as root.""" + cmds = [ - config.worker_agent_install.install_command, + config.worker_agent_install.install_command_for_linux, *(config.pre_install_commands or []), # fmt: off ( @@ -67,6 +69,54 @@ def configure_worker_command(*, config: DeadlineWorkerConfiguration) -> str: # return " && ".join(cmds) +def windows_worker_command(config: DeadlineWorkerConfiguration) -> str: # pragma: no cover + """Get the command to configure the Worker. This must be run as root.""" + + cmds = [ + config.worker_agent_install.install_command_for_windows, + *(config.pre_install_commands or []), + # fmt: off + ( + "install-deadline-worker " + + "-y " + + f"--farm-id {config.farm_id} " + + f"--fleet-id {config.fleet_id} " + + f"--region {config.region} " + + f"--user {config.user} " + + f"{'--allow-shutdown ' if config.allow_shutdown else ''}" + + "--start" + ), + # fmt: on + ] + + if config.service_model_path: + cmds.append( + f"aws configure add-model --service-model file://{config.service_model_path} --service-name deadline; " + f"Copy-Item -Path ~\\.aws\\* -Destination C:\\Users\\Administrator\\.aws\\models -Recurse; " + f"Copy-Item -Path ~\\.aws\\* -Destination C:\\Users\\{config.user}\\.aws\\models -Recurse; " + f"Copy-Item -Path ~\\.aws\\* -Destination C:\\Users\\jobuser\\.aws\\models -Recurse" + ) + + if os.environ.get("AWS_ENDPOINT_URL_DEADLINE"): + LOG.info(f"Using AWS_ENDPOINT_URL_DEADLINE: {os.environ.get('AWS_ENDPOINT_URL_DEADLINE')}") + cmds.insert( + 0, + f"[System.Environment]::SetEnvironmentVariable('AWS_ENDPOINT_URL_DEADLINE', '{os.environ.get('AWS_ENDPOINT_URL_DEADLINE')}', [System.EnvironmentVariableTarget]::Machine); " + "$env:AWS_ENDPOINT_URL_DEADLINE = [System.Environment]::GetEnvironmentVariable('AWS_ENDPOINT_URL_DEADLINE','Machine')", + ) + + return "; ".join(cmds) + + +def configure_worker_command(*, config: DeadlineWorkerConfiguration) -> str: # pragma: no cover + """Get the command to configure the Worker. This must be run as root.""" + + if config.operating_system.name == "AL2023": + return linux_worker_command(config) + else: + return windows_worker_command(config) + + class DeadlineWorker(abc.ABC): @abc.abstractmethod def start(self) -> None: @@ -121,6 +171,7 @@ def __str__(self) -> str: @dataclass(frozen=True) class DeadlineWorkerConfiguration: + operating_system: OperatingSystem farm_id: str fleet_id: str region: str @@ -143,6 +194,7 @@ class DeadlineWorkerConfiguration: @dataclass class EC2InstanceWorker(DeadlineWorker): AL2023_AMI_NAME: ClassVar[str] = "al2023-ami-kernel-6.1-x86_64" + WIN2022_AMI_NAME: ClassVar[str] = "Windows_Server-2022-English-Full-Base" subnet_id: str security_group_id: str @@ -188,17 +240,23 @@ def send_command(self, command: str) -> CommandResult: # # If we send an SSM command then we will get an InvalidInstanceId error # if the instance isn't in that state. - NUM_RETRIES = 10 - SLEEP_INTERVAL_S = 5 + NUM_RETRIES = 20 + SLEEP_INTERVAL_S = 10 for i in range(0, NUM_RETRIES): LOG.info(f"Sending SSM command to instance {self.instance_id}") try: - send_command_response = self.ssm_client.send_command( - InstanceIds=[self.instance_id], - DocumentName="AWS-RunShellScript", - Parameters={"commands": [command]}, - ) - # Successfully sent. Bail out of the loop. + if self.configuration.operating_system.name == "AL2023": + send_command_response = self.ssm_client.send_command( + InstanceIds=[self.instance_id], + DocumentName="AWS-RunShellScript", + Parameters={"commands": [command]}, + ) + else: + send_command_response = self.ssm_client.send_command( + InstanceIds=[self.instance_id], + DocumentName="AWS-RunPowerShellScript", + Parameters={"commands": [command]}, + ) break except botocore.exceptions.ClientError as error: error_code = error.response["Error"]["Code"] @@ -274,12 +332,10 @@ def _stage_s3_bucket(self) -> list[tuple[str, str]] | None: return list(s3_to_dst_mapping.items()) - def _launch_instance(self, *, s3_files: list[tuple[str, str]] | None = None) -> None: - assert ( - not self.instance_id - ), "Attempted to launch EC2 instance when one was already launched" - + def linux_userdata(self, s3_files) -> str: copy_s3_command = "" + job_users_cmds = [] + if s3_files: copy_s3_command = " && ".join( [ @@ -287,8 +343,6 @@ def _launch_instance(self, *, s3_files: list[tuple[str, str]] | None = None) -> for s3_uri, dst in s3_files ] ) - - job_users_cmds = [] for job_user in self.configuration.job_users: job_users_cmds.append(f"groupadd {job_user.group}") job_users_cmds.append( @@ -308,6 +362,50 @@ def _launch_instance(self, *, s3_files: list[tuple[str, str]] | None = None) -> configure_job_users = "\n".join(job_users_cmds) + userdata = f"""#!/bin/bash + # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + set -x + groupadd --system {self.configuration.group} + useradd --create-home --system --shell=/bin/bash --groups={self.configuration.group} {self.configuration.user} + {configure_job_users} + {copy_s3_command} + + runuser --login {self.configuration.user} --command 'python3 -m venv $HOME/.venv && echo ". $HOME/.venv/bin/activate" >> $HOME/.bashrc' + """ + + return userdata + + def windows_userdata(self, s3_files) -> str: + copy_s3_command = "" + if s3_files: + copy_s3_command = " ; ".join([f"aws s3 cp {s3_uri} {dst}" for s3_uri, dst in s3_files]) + + userdata = f""" + Invoke-WebRequest -Uri "https://www.python.org/ftp/python/3.11.9/python-3.11.9-amd64.exe" -OutFile "C:\python-3.11.9-amd64.exe" + Start-Process -FilePath "C:\python-3.11.9-amd64.exe" -ArgumentList "/quiet InstallAllUsers=1 PrependPath=1 AppendPath=1" -Wait + Invoke-WebRequest -Uri "https://awscli.amazonaws.com/AWSCLIV2.msi" -Outfile "C:\AWSCLIV2.msi" + Start-Process msiexec.exe -ArgumentList "/i C:\AWSCLIV2.msi /quiet" -Wait + $env:Path = [System.Environment]::GetEnvironmentVariable("Path","Machine") + $secret = aws secretsmanager get-secret-value --secret-id WindowsPasswordSecret --query SecretString --output text | ConvertFrom-Json + $password = ConvertTo-SecureString -String $($secret.password) -AsPlainText -Force + New-LocalUser -Name "jobuser" -Password $password -FullName "jobuser" -Description "job user" + $Cred = New-Object System.Management.Automation.PSCredential "jobuser", $password + Start-Process cmd.exe -Credential $Cred -ArgumentList "/C" -LoadUserProfile -NoNewWindow + {copy_s3_command} + """ + + return userdata + + def _launch_instance(self, *, s3_files: list[tuple[str, str]] | None = None) -> None: + assert ( + not self.instance_id + ), "Attempted to launch EC2 instance when one was already launched" + + if self.configuration.operating_system.name == "AL2023": + userdata = self.linux_userdata(s3_files) + else: + userdata = self.windows_userdata(s3_files) + LOG.info("Launching EC2 instance") run_instance_response = self.ec2_client.run_instances( MinCount=1, @@ -329,16 +427,7 @@ def _launch_instance(self, *, s3_files: list[tuple[str, str]] | None = None) -> ], } ], - UserData=f"""#!/bin/bash -# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. -set -x -groupadd --system {self.configuration.group} -useradd --create-home --system --shell=/bin/bash --groups={self.configuration.group} {self.configuration.user} -{configure_job_users} -{copy_s3_command} - -runuser --login {self.configuration.user} --command 'python3 -m venv $HOME/.venv && echo ". $HOME/.venv/bin/activate" >> $HOME/.bashrc' -""", + UserData=userdata, ) self.instance_id = run_instance_response["Instances"][0]["InstanceId"] @@ -352,10 +441,7 @@ def _launch_instance(self, *, s3_files: list[tuple[str, str]] | None = None) -> ) LOG.info(f"EC2 instance {self.instance_id} status is OK") - def _start_worker_agent(self) -> None: # pragma: no cover - assert self.instance_id - - LOG.info(f"Sending SSM command to configure Worker agent on instance {self.instance_id}") + def start_linux_worker(self) -> None: cmd_result = self.send_command( f"cd /home/{self.configuration.user}; . .venv/bin/activate; AWS_DEFAULT_REGION={self.configuration.region} {configure_worker_command(config=self.configuration)}" ) @@ -378,6 +464,33 @@ def _start_worker_agent(self) -> None: # pragma: no cover assert cmd_result.exit_code == 0, f"Failed to start Worker agent: {cmd_result}" LOG.info("Successfully started Worker agent") + def start_windows_worker(self) -> None: + cmd_result = self.send_command(f"{configure_worker_command(config=self.configuration)}") + LOG.info("Successfully configured Worker agent") + LOG.info("Sending SSM Command to check if Worker Agent is running") + cmd_result = self.send_command( + " ; ".join( + [ + "echo Waiting 20s for the agent service to get started", + "sleep 20", + "echo 'Running running Get-Process to check if the agent is running'", + "IF(Get-Process pythonservice){echo 'service is running'}ELSE{exit 1}", + ] + ), + ) + assert cmd_result.exit_code == 0, f"Failed to start Worker agent: {cmd_result}" + LOG.info("Successfully started Worker agent") + + def _start_worker_agent(self) -> None: # pragma: no cover + assert self.instance_id + + LOG.info(f"Sending SSM command to configure Worker agent on instance {self.instance_id}") + + if self.configuration.operating_system.name == "AL2023": + self.start_linux_worker() + else: + self.start_windows_worker() + @property def worker_id(self) -> str: cmd_result = self.send_command("cat /var/lib/deadline/worker.json | jq -r '.worker_id'") @@ -392,13 +505,21 @@ def worker_id(self) -> str: @property def ami_id(self) -> str: if not hasattr(self, "_ami_id"): - # Grab the latest AL2023 AMI - # https://aws.amazon.com/blogs/compute/query-for-the-latest-amazon-linux-ami-ids-using-aws-systems-manager-parameter-store/ - ssm_param_name = ( - f"/aws/service/ami-amazon-linux-latest/{EC2InstanceWorker.AL2023_AMI_NAME}" - ) + if self.configuration.operating_system.name == "AL2023": + # Grab the latest AL2023 AMI + # https://aws.amazon.com/blogs/compute/query-for-the-latest-amazon-linux-ami-ids-using-aws-systems-manager-parameter-store/ + ssm_param_name = ( + f"/aws/service/ami-amazon-linux-latest/{EC2InstanceWorker.AL2023_AMI_NAME}" + ) + else: + # Grab the latest Windows Server 2022 AMI + # https://aws.amazon.com/blogs/mt/query-for-the-latest-windows-ami-using-systems-manager-parameter-store/ + ssm_param_name = ( + f"/aws/service/ami-windows-latest/{EC2InstanceWorker.WIN2022_AMI_NAME}" + ) + response = call_api( - description=f"Getting latest AL2023 AMI ID from SSM parameter {ssm_param_name}", + description=f"Getting latest {self.configuration.operating_system.name} AMI ID from SSM parameter {ssm_param_name}", fn=lambda: self.ssm_client.get_parameters(Names=[ssm_param_name]), ) @@ -407,7 +528,7 @@ def ami_id(self) -> str: len(parameters) == 1 ), f"Received incorrect number of SSM parameters. Expected 1, got response: {response}" self._ami_id = parameters[0]["Value"] - LOG.info(f"Using latest AL2023 AMI {self._ami_id}") + LOG.info(f"Using latest {self.configuration.operating_system.name} AMI {self._ami_id}") return self._ami_id @@ -425,7 +546,6 @@ def __post_init__(self) -> None: def start(self) -> None: self._tmpdir = pathlib.Path(tempfile.mkdtemp()) - # TODO: Support multiple job users on Docker assert ( len(self.configuration.job_users) == 1 ), f"Multiple job users not supported on Docker worker: {self.configuration.job_users}" diff --git a/src/deadline_test_fixtures/fixtures.py b/src/deadline_test_fixtures/fixtures.py index e183f38..56dceb6 100644 --- a/src/deadline_test_fixtures/fixtures.py +++ b/src/deadline_test_fixtures/fixtures.py @@ -38,6 +38,7 @@ PosixSessionUser, ServiceModel, S3Object, + OperatingSystem, ) from .cloudformation import WorkerBootstrapStack from .job_attachment_manager import JobAttachmentManager @@ -382,6 +383,7 @@ def worker_config( codeartifact: CodeArtifactRepositoryInfo, service_model: ServiceModel, region: str, + operating_system: OperatingSystem, ) -> Generator[DeadlineWorkerConfiguration, None, None]: """ Builds the configuration for a DeadlineWorker. @@ -418,7 +420,12 @@ def worker_config( ), f"Expected exactly one Worker agent whl path, but got {resolved_whl_paths} (from pattern {worker_agent_whl_path})" resolved_whl_path = resolved_whl_paths[0] - dest_path = posixpath.join("/tmp", os.path.basename(resolved_whl_path)) + if operating_system == "AL2023": + dest_path = posixpath.join("/tmp", os.path.basename(resolved_whl_path)) + else: + dest_path = posixpath.join( + "%USERPROFILE%\\AppData\\Local\\Temp", os.path.basename(resolved_whl_path) + ) file_mappings = [(resolved_whl_path, dest_path)] LOG.info(f"The whl file will be copied to {dest_path} on the Worker environment") @@ -438,7 +445,10 @@ def worker_config( with src_path.open(mode="w") as f: json.dump(service_model.model, f) - dst_path = posixpath.join("/tmp", src_path.name) + if operating_system == "AL2023": + dst_path = posixpath.join("/tmp", src_path.name) + else: + dst_path = posixpath.join("%USERPROFILE%\\AppData\\Local\\Temp", src_path.name) LOG.info(f"The service model will be copied to {dst_path} on the Worker environment") file_mappings.append((str(src_path), dst_path)) @@ -455,6 +465,7 @@ def worker_config( ), service_model_path=dst_path, file_mappings=file_mappings or None, + operating_system=operating_system, ) @@ -581,3 +592,11 @@ def _find_latest_service_model_file(service_name: str) -> str: f"Expected exactly one file to match glob '{service_model_path}.*, but got: {service_model_files}" ) return service_model_files[0] + + +@pytest.fixture(scope="session") +def operating_system(request) -> OperatingSystem: + if request.param == "linux": + return OperatingSystem(name="AL2023") + else: + return OperatingSystem(name="WIN2022") diff --git a/src/deadline_test_fixtures/models.py b/src/deadline_test_fixtures/models.py index c3e8e05..f775ce0 100644 --- a/src/deadline_test_fixtures/models.py +++ b/src/deadline_test_fixtures/models.py @@ -37,6 +37,11 @@ class JobRunAsUser: runAs: Literal["QUEUE_CONFIGURED_USER", "WORKER_AGENT_USER"] +@dataclass(frozen=True) +class OperatingSystem: + name: Literal["AL2023", "WIN2022"] + + @dataclass(frozen=True) class CodeArtifactRepositoryInfo: region: str @@ -176,7 +181,35 @@ def install_args(self) -> list[str]: return args @property - def install_command(self) -> str: + def install_command_for_linux(self) -> str: + cmds = [] + + if self.codeartifact: + cmds.append( + "aws codeartifact login --tool pip " + + f"--domain {self.codeartifact.domain} " + + f"--domain-owner {self.codeartifact.domain_owner} " + + f"--repository {self.codeartifact.repository} " + ) + + if self.upgrade_pip: + cmds.append("pip install --upgrade pip") + + cmds.append( + " ".join( + [ + "pip", + "install", + *self.install_args, + *self.requirement_specifiers, + ] + ) + ) + + return "&& ".join(cmds) + + @property + def install_command_for_windows(self) -> str: cmds = [] if self.codeartifact: @@ -201,4 +234,4 @@ def install_command(self) -> str: ) ) - return " && ".join(cmds) + return "; ".join(cmds) diff --git a/test/unit/deadline/test_worker.py b/test/unit/deadline/test_worker.py index 07c47fb..5ab0a4c 100644 --- a/test/unit/deadline/test_worker.py +++ b/test/unit/deadline/test_worker.py @@ -20,6 +20,7 @@ DockerContainerWorker, EC2InstanceWorker, PipInstall, + OperatingSystem, S3Object, ) @@ -81,6 +82,7 @@ def worker_config(region: str) -> DeadlineWorkerConfiguration: ("/aws/models/deadline.json", "/tmp/deadline.json"), ], service_model_path="/path/to/service-2.json", + operating_system=OperatingSystem(name="AL2023"), )