From 941b309bfa5bc962b546d501805f11b57aafaac2 Mon Sep 17 00:00:00 2001 From: Guy Driesen <19373791+guydriesen@users.noreply.github.com> Date: Tue, 7 May 2024 17:10:03 +0200 Subject: [PATCH 1/9] Add args to docker service ContainerSpec --- .../docker/operators/docker_swarm.py | 23 +++++++++++++ .../docker/operators/test_docker_swarm.py | 33 +++++++++++++++++++ 2 files changed, 56 insertions(+) diff --git a/airflow/providers/docker/operators/docker_swarm.py b/airflow/providers/docker/operators/docker_swarm.py index b9fc6c89a77f2..2de995a45e1da 100644 --- a/airflow/providers/docker/operators/docker_swarm.py +++ b/airflow/providers/docker/operators/docker_swarm.py @@ -18,7 +18,9 @@ from __future__ import annotations +import ast import re +import shlex from datetime import datetime from time import sleep from typing import TYPE_CHECKING @@ -58,6 +60,7 @@ class DockerSwarmOperator(DockerOperator): container's process exits. The default is False. :param command: Command to be run in the container. (templated) + :param args: Arguments to the command. :param docker_url: URL of the host running the docker daemon. Default is the value of the ``DOCKER_HOST`` environment variable or unix://var/run/docker.sock if it is unset. @@ -106,6 +109,7 @@ def __init__( self, *, image: str, + args: str | list[str] | None = None, enable_logging: bool = True, configs: list[types.ConfigReference] | None = None, secrets: list[types.SecretReference] | None = None, @@ -116,6 +120,7 @@ def __init__( **kwargs, ) -> None: super().__init__(image=image, **kwargs) + self.args = args self.enable_logging = enable_logging self.service = None self.configs = configs @@ -136,6 +141,7 @@ def _run_service(self) -> None: container_spec=types.ContainerSpec( image=self.image, command=self.format_command(self.command), + args=self.format_args(self.args), mounts=self.mounts, env=self.environment, user=self.user, @@ -225,6 +231,23 @@ def stream_new_logs(last_line_logged, since=0): sleep(2) last_line_logged, last_timestamp = stream_new_logs(last_line_logged, since=last_timestamp) + @staticmethod + def format_args(args: list[str] | str | None) -> list[str] | None: + """Retrieve args. + + The args string is parsed to a list. If it starts with ``[``, + the string is treated as a Python literal and parsed into a list. + + :param args: args to the docker service + + :return: the args as list + """ + if isinstance(args, str) and args.strip().startswith("["): + args = ast.literal_eval(args) + else: + args = shlex.split(args) + return args + def on_kill(self) -> None: if self.hook.client_created and self.service is not None: self.log.info("Removing docker service: %s", self.service["ID"]) diff --git a/tests/providers/docker/operators/test_docker_swarm.py b/tests/providers/docker/operators/test_docker_swarm.py index 5576eec0837c9..aca48891592c2 100644 --- a/tests/providers/docker/operators/test_docker_swarm.py +++ b/tests/providers/docker/operators/test_docker_swarm.py @@ -254,3 +254,36 @@ def test_container_resources(self, types_mock, docker_api_client_patcher): placement=None, ) types_mock.Resources.assert_not_called() + + @mock.patch("airflow.providers.docker.operators.docker_swarm.types") + def test_service_args(self, types_mock, docker_api_client_patcher): + mock_obj = mock.Mock() + + client_mock = mock.Mock(spec=APIClient) + client_mock.create_service.return_value = {"ID": "some_id"} + client_mock.images.return_value = [] + client_mock.pull.return_value = [b'{"status":"pull log"}'] + client_mock.tasks.return_value = [{"Status": {"State": "complete"}}] + types_mock.TaskTemplate.return_value = mock_obj + types_mock.ContainerSpec.return_value = mock_obj + types_mock.RestartPolicy.return_value = mock_obj + types_mock.Resources.return_value = mock_obj + + docker_api_client_patcher.return_value = client_mock + + operator = DockerSwarmOperator( + image="ubuntu:latest", + command="env", + args="--show", + task_id="unittest", + auto_remove="success", + enable_logging=False, + ) + operator.execute(None) + + types_mock.ContainerSpec.assert_called_once_with( + image="ubuntu:latest", + command="env", + args="--show", + user="unittest", + ) From 547266899b80836fd8a54d3b3c2aba85df837ada Mon Sep 17 00:00:00 2001 From: Guy Driesen <19373791+guydriesen@users.noreply.github.com> Date: Tue, 7 May 2024 17:36:28 +0200 Subject: [PATCH 2/9] args is a list in ContainerSpec --- tests/providers/docker/operators/test_docker_swarm.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/providers/docker/operators/test_docker_swarm.py b/tests/providers/docker/operators/test_docker_swarm.py index aca48891592c2..e5a99866f2864 100644 --- a/tests/providers/docker/operators/test_docker_swarm.py +++ b/tests/providers/docker/operators/test_docker_swarm.py @@ -284,6 +284,6 @@ def test_service_args(self, types_mock, docker_api_client_patcher): types_mock.ContainerSpec.assert_called_once_with( image="ubuntu:latest", command="env", - args="--show", + args=["--show"], user="unittest", ) From 45499b7b6755abb13e263f939d9cc16ccd6177a5 Mon Sep 17 00:00:00 2001 From: Guy Driesen <19373791+guydriesen@users.noreply.github.com> Date: Tue, 7 May 2024 17:56:31 +0200 Subject: [PATCH 3/9] fix ContainerSpec assertion --- tests/providers/docker/operators/test_docker_swarm.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/tests/providers/docker/operators/test_docker_swarm.py b/tests/providers/docker/operators/test_docker_swarm.py index e5a99866f2864..9ed012fb9e73c 100644 --- a/tests/providers/docker/operators/test_docker_swarm.py +++ b/tests/providers/docker/operators/test_docker_swarm.py @@ -285,5 +285,10 @@ def test_service_args(self, types_mock, docker_api_client_patcher): image="ubuntu:latest", command="env", args=["--show"], - user="unittest", + user= None, + mounts= [], + tty= False, + env={"AIRFLOW_TMP_DIR": "/tmp/airflow"}, + configs= None, + secrets= None, ) From b3964ce9902402b3188e8df0a26f53910f05988d Mon Sep 17 00:00:00 2001 From: Guy Driesen <19373791+guydriesen@users.noreply.github.com> Date: Tue, 7 May 2024 18:05:44 +0200 Subject: [PATCH 4/9] fix args formatter --- airflow/providers/docker/operators/docker_swarm.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/airflow/providers/docker/operators/docker_swarm.py b/airflow/providers/docker/operators/docker_swarm.py index 2de995a45e1da..9074969b4b8f5 100644 --- a/airflow/providers/docker/operators/docker_swarm.py +++ b/airflow/providers/docker/operators/docker_swarm.py @@ -242,10 +242,11 @@ def format_args(args: list[str] | str | None) -> list[str] | None: :return: the args as list """ - if isinstance(args, str) and args.strip().startswith("["): - args = ast.literal_eval(args) - else: - args = shlex.split(args) + if isinstance(args, str): + if args.strip().startswith("["): + return ast.literal_eval(args) + else: + return shlex.split(args) return args def on_kill(self) -> None: From d743085c9bfb7c229dc90e8a8be359e098472802 Mon Sep 17 00:00:00 2001 From: Guy Driesen <19373791+guydriesen@users.noreply.github.com> Date: Tue, 7 May 2024 19:04:27 +0200 Subject: [PATCH 5/9] fix ContainerSpec assert --- tests/providers/docker/operators/test_docker_swarm.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/providers/docker/operators/test_docker_swarm.py b/tests/providers/docker/operators/test_docker_swarm.py index 9ed012fb9e73c..cc8cd2bd46d87 100644 --- a/tests/providers/docker/operators/test_docker_swarm.py +++ b/tests/providers/docker/operators/test_docker_swarm.py @@ -84,6 +84,7 @@ def _client_service_logs_effect(): types_mock.ContainerSpec.assert_called_once_with( image="ubuntu:latest", command="env", + args=None, user="unittest", mounts=[types.Mount(source="/host/path", target="/container/path", type="bind")], tty=True, From 54bdb5a57db975a4462c9499a5f55852c3388f1a Mon Sep 17 00:00:00 2001 From: Guy Driesen <19373791+guydriesen@users.noreply.github.com> Date: Tue, 7 May 2024 19:05:55 +0200 Subject: [PATCH 6/9] remove some spaces --- tests/providers/docker/operators/test_docker_swarm.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tests/providers/docker/operators/test_docker_swarm.py b/tests/providers/docker/operators/test_docker_swarm.py index cc8cd2bd46d87..d39b313cc81e8 100644 --- a/tests/providers/docker/operators/test_docker_swarm.py +++ b/tests/providers/docker/operators/test_docker_swarm.py @@ -286,10 +286,10 @@ def test_service_args(self, types_mock, docker_api_client_patcher): image="ubuntu:latest", command="env", args=["--show"], - user= None, - mounts= [], - tty= False, + user=None, + mounts=[], + tty=False, env={"AIRFLOW_TMP_DIR": "/tmp/airflow"}, - configs= None, - secrets= None, + configs=None, + secrets=None, ) From a159f5889e59b7d270236f176e6bc8b307cff247 Mon Sep 17 00:00:00 2001 From: Guy Driesen Date: Wed, 8 May 2024 08:29:45 +0200 Subject: [PATCH 7/9] add docker service args list test case --- .../docker/operators/test_docker_swarm.py | 40 ++++++++++++++++++- 1 file changed, 39 insertions(+), 1 deletion(-) diff --git a/tests/providers/docker/operators/test_docker_swarm.py b/tests/providers/docker/operators/test_docker_swarm.py index d39b313cc81e8..29661123d518b 100644 --- a/tests/providers/docker/operators/test_docker_swarm.py +++ b/tests/providers/docker/operators/test_docker_swarm.py @@ -257,7 +257,7 @@ def test_container_resources(self, types_mock, docker_api_client_patcher): types_mock.Resources.assert_not_called() @mock.patch("airflow.providers.docker.operators.docker_swarm.types") - def test_service_args(self, types_mock, docker_api_client_patcher): + def test_service_args_str(self, types_mock, docker_api_client_patcher): mock_obj = mock.Mock() client_mock = mock.Mock(spec=APIClient) @@ -293,3 +293,41 @@ def test_service_args(self, types_mock, docker_api_client_patcher): configs=None, secrets=None, ) + + @mock.patch("airflow.providers.docker.operators.docker_swarm.types") + def test_service_args_list(self, types_mock, docker_api_client_patcher): + mock_obj = mock.Mock() + + client_mock = mock.Mock(spec=APIClient) + client_mock.create_service.return_value = {"ID": "some_id"} + client_mock.images.return_value = [] + client_mock.pull.return_value = [b'{"status":"pull log"}'] + client_mock.tasks.return_value = [{"Status": {"State": "complete"}}] + types_mock.TaskTemplate.return_value = mock_obj + types_mock.ContainerSpec.return_value = mock_obj + types_mock.RestartPolicy.return_value = mock_obj + types_mock.Resources.return_value = mock_obj + + docker_api_client_patcher.return_value = client_mock + + operator = DockerSwarmOperator( + image="ubuntu:latest", + command="env", + args=["--show"], + task_id="unittest", + auto_remove="success", + enable_logging=False, + ) + operator.execute(None) + + types_mock.ContainerSpec.assert_called_once_with( + image="ubuntu:latest", + command="env", + args=["--show"], + user=None, + mounts=[], + tty=False, + env={"AIRFLOW_TMP_DIR": "/tmp/airflow"}, + configs=None, + secrets=None, + ) From d9508c3d6935f296db9d5ad1964f5dfb82ae1065 Mon Sep 17 00:00:00 2001 From: Guy Driesen Date: Wed, 8 May 2024 12:03:10 +0200 Subject: [PATCH 8/9] replace ast.literal_eval with json.loads --- airflow/providers/docker/operators/docker_swarm.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow/providers/docker/operators/docker_swarm.py b/airflow/providers/docker/operators/docker_swarm.py index 9074969b4b8f5..bc9108614a048 100644 --- a/airflow/providers/docker/operators/docker_swarm.py +++ b/airflow/providers/docker/operators/docker_swarm.py @@ -18,7 +18,7 @@ from __future__ import annotations -import ast +import json import re import shlex from datetime import datetime @@ -244,7 +244,7 @@ def format_args(args: list[str] | str | None) -> list[str] | None: """ if isinstance(args, str): if args.strip().startswith("["): - return ast.literal_eval(args) + return json.loads(args) else: return shlex.split(args) return args From fef1d571b6f9b533643151e98e7eaa1eb3b94b56 Mon Sep 17 00:00:00 2001 From: Guy Driesen Date: Wed, 8 May 2024 14:47:27 +0200 Subject: [PATCH 9/9] remove json string representation --- airflow/providers/docker/operators/docker_swarm.py | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/airflow/providers/docker/operators/docker_swarm.py b/airflow/providers/docker/operators/docker_swarm.py index bc9108614a048..a05bfdc897864 100644 --- a/airflow/providers/docker/operators/docker_swarm.py +++ b/airflow/providers/docker/operators/docker_swarm.py @@ -18,7 +18,6 @@ from __future__ import annotations -import json import re import shlex from datetime import datetime @@ -235,18 +234,14 @@ def stream_new_logs(last_line_logged, since=0): def format_args(args: list[str] | str | None) -> list[str] | None: """Retrieve args. - The args string is parsed to a list. If it starts with ``[``, - the string is treated as a Python literal and parsed into a list. + The args string is parsed to a list. :param args: args to the docker service :return: the args as list """ if isinstance(args, str): - if args.strip().startswith("["): - return json.loads(args) - else: - return shlex.split(args) + return shlex.split(args) return args def on_kill(self) -> None: