From ce4ec529d78edbdc24f9d7c7331aae0c846e5eab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fully=2Eis=28=ED=92=80=EB=A6=AC=29?= Date: Thu, 11 Jul 2024 17:07:04 +0900 Subject: [PATCH 01/11] add eks properties overrride --- airflow/providers/amazon/aws/operators/batch.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/airflow/providers/amazon/aws/operators/batch.py b/airflow/providers/amazon/aws/operators/batch.py index 20c2e9dced6485..ca4ba8bfad8c87 100644 --- a/airflow/providers/amazon/aws/operators/batch.py +++ b/airflow/providers/amazon/aws/operators/batch.py @@ -68,6 +68,7 @@ class BatchOperator(BaseOperator): :param overrides: DEPRECATED, use container_overrides instead with the same value. :param container_overrides: the `containerOverrides` parameter for boto3 (templated) :param ecs_properties_override: the `ecsPropertiesOverride` parameter for boto3 (templated) + :param eks_properties_override: the `eksPropertiesOverride` parameter for boto3 (templated) :param node_overrides: the `nodeOverrides` parameter for boto3 (templated) :param share_identifier: The share identifier for the job. Don't specify this parameter if the job queue doesn't have a scheduling policy. @@ -116,6 +117,7 @@ class BatchOperator(BaseOperator): "container_overrides", "array_properties", "ecs_properties_override", + "eks_properties_override", "node_overrides", "parameters", "retry_strategy", @@ -129,6 +131,7 @@ class BatchOperator(BaseOperator): "container_overrides": "json", "parameters": "json", "ecs_properties_override": "json", + "eks_properties_override": "json", "node_overrides": "json", "retry_strategy": "json", } @@ -166,6 +169,7 @@ def __init__( container_overrides: dict | None = None, array_properties: dict | None = None, ecs_properties_override: dict | None = None, + eks_properties_override: dict | None = None, node_overrides: dict | None = None, share_identifier: str | None = None, scheduling_priority_override: int | None = None, @@ -208,6 +212,7 @@ def __init__( ) self.ecs_properties_override = ecs_properties_override + self.eks_properties_override = eks_properties_override self.node_overrides = node_overrides self.share_identifier = share_identifier self.scheduling_priority_override = scheduling_priority_override @@ -307,6 +312,8 @@ def submit_job(self, context: Context): self.log.info("AWS Batch job - array properties: %s", self.array_properties) if self.ecs_properties_override: self.log.info("AWS Batch job - ECS properties: %s", self.ecs_properties_override) + if self.eks_properties_override: + self.log.info("AWS Batch job - EKS properties: %s", self.eks_properties_override) if self.node_overrides: self.log.info("AWS Batch job - node properties: %s", self.node_overrides) @@ -319,6 +326,7 @@ def submit_job(self, context: Context): "tags": self.tags, "containerOverrides": self.container_overrides, "ecsPropertiesOverride": self.ecs_properties_override, + "eksPropertiesOverride": self.eks_properties_override, "nodeOverrides": self.node_overrides, "retryStrategy": self.retry_strategy, "shareIdentifier": self.share_identifier, From b89ca4572d4762482a7a792afc6141144d75d19c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fully=2Eis=28=ED=92=80=EB=A6=AC=29?= Date: Thu, 11 Jul 2024 17:12:16 +0900 Subject: [PATCH 02/11] add eks properties override --- airflow/providers/amazon/aws/hooks/batch_client.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/airflow/providers/amazon/aws/hooks/batch_client.py b/airflow/providers/amazon/aws/hooks/batch_client.py index f024134560d842..299c949e1e68bb 100644 --- a/airflow/providers/amazon/aws/hooks/batch_client.py +++ b/airflow/providers/amazon/aws/hooks/batch_client.py @@ -103,6 +103,7 @@ def submit_job( parameters: dict, containerOverrides: dict, ecsPropertiesOverride: dict, + eksPropertiesOverride: dict, tags: dict, ) -> dict: """ @@ -122,6 +123,8 @@ def submit_job( :param ecsPropertiesOverride: the same parameter that boto3 will receive + :param eksPropertiesOverride: the same parameter that boto3 will receive + :param tags: the same parameter that boto3 will receive :return: an API response From 68602869276ca3de2a8af4811454d06b98de21f7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fully=2Eis=28=ED=92=80=EB=A6=AC=29?= Date: Thu, 11 Jul 2024 17:20:53 +0900 Subject: [PATCH 03/11] add eks --- .../amazon/aws/operators/test_batch.py | 119 +++++++++++++++++- 1 file changed, 118 insertions(+), 1 deletion(-) diff --git a/tests/providers/amazon/aws/operators/test_batch.py b/tests/providers/amazon/aws/operators/test_batch.py index 137044a212539b..c04097b7fcc96f 100644 --- a/tests/providers/amazon/aws/operators/test_batch.py +++ b/tests/providers/amazon/aws/operators/test_batch.py @@ -133,6 +133,7 @@ def test_init_defaults(self): assert batch_job.container_overrides is None assert batch_job.array_properties is None assert batch_job.ecs_properties_override is None + assert batch_job.eks_properties_override is None assert batch_job.node_overrides is None assert batch_job.share_identifier is None assert batch_job.scheduling_priority_override is None @@ -151,6 +152,7 @@ def test_template_fields_overrides(self): "container_overrides", "array_properties", "ecs_properties_override", + "eks_properties_override", "node_overrides", "parameters", "retry_strategy", @@ -262,6 +264,120 @@ def test_execute_with_ecs_overrides(self, check_mock, wait_mock, job_description tags={}, ) + + @mock.patch.object(BatchClientHook, "get_job_description") + @mock.patch.object(BatchClientHook, "wait_for_job") + @mock.patch.object(BatchClientHook, "check_job_success") + def test_execute_with_eks_overrides(self, check_mock, wait_mock, job_description_mock): + self.batch.container_overrides = None + self.batch.eks_properties_override = { + "podProperties": [ + { + "containers": [ + { + "image": "string", + "command": [ + "string", + ], + "args": [ + "string", + ], + "env": [ + {"name": "string", "value": "string"}, + ], + "resources": [ + {"limits": {"string": "string"}, + "requests": {"string": "string"} + } + ], + }, + ], + "initContainers": [ + { + "image": "string", + "command": [ + "string", + ], + "args": [ + "string", + ], + "env": [ + {"name": "string", "value": "string"}, + ], + "resources": [ + { + "limits": {"string": "string"}, + "requests": {"string": "string"}, + }, + ] + }, + ] + "metadata": { + "labels": { + "string": "string" + }, + }, + }, + ] + } + self.batch.execute(self.mock_context) + + self.client_mock.submit_job.assert_called_once_with( + jobQueue="queue", + jobName=JOB_NAME, + jobDefinition="hello-world", + eksPropertiesOverride={ + "podProperties": [ + { + "containers": [ + { + "image": "string", + "command": [ + "string", + ], + "args": [ + "string", + ], + "env": [ + {"name": "string", "value": "string"}, + ], + "resources": [ + {"limits": {"string": "string"}, + "requests": {"string": "string"} + } + ], + }, + ], + "initContainers": [ + { + "image": "string", + "command": [ + "string", + ], + "args": [ + "string", + ], + "env": [ + {"name": "string", "value": "string"}, + ], + "resources": [ + { + "limits": {"string": "string"}, + "requests": {"string": "string"}, + }, + ] + }, + ] + "metadata": { + "labels": { + "string": "string" + }, + }, + }, + ] + } + ) + @mock.patch.object(BatchClientHook, "check_job_success") def test_wait_job_complete_using_waiters(self, check_mock): mock_waiters = mock.Mock() @@ -296,7 +412,7 @@ def test_kill_job(self): self.batch.on_kill() self.client_mock.terminate_job.assert_called_once_with(jobId=JOB_ID, reason="Task killed by the user") - @pytest.mark.parametrize("override", ["overrides", "node_overrides", "ecs_properties_override"]) + @pytest.mark.parametrize("override", ["overrides", "node_overrides", "ecs_properties_override", "eks_properties_override"]) @patch( "airflow.providers.amazon.aws.hooks.batch_client.BatchClientHook.client", new_callable=mock.PropertyMock, @@ -348,6 +464,7 @@ def test_override_not_sent_if_not_set(self, client_mock, override): "overrides": "containerOverrides", "node_overrides": "nodeOverrides", "ecs_properties_override": "ecsPropertiesOverride", + "eks_properties_override": "eksPropertiesOverride", } expected_args[py2api[override]] = {"a": "a"} From 7c98328ea32b6c66a9a186b31a609593686fae4a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fully=2Eis=28=ED=92=80=EB=A6=AC=29?= Date: Fri, 12 Jul 2024 08:13:33 +0900 Subject: [PATCH 04/11] fix --- tests/providers/amazon/aws/operators/test_batch.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/providers/amazon/aws/operators/test_batch.py b/tests/providers/amazon/aws/operators/test_batch.py index c04097b7fcc96f..6bfc3673615135 100644 --- a/tests/providers/amazon/aws/operators/test_batch.py +++ b/tests/providers/amazon/aws/operators/test_batch.py @@ -311,7 +311,7 @@ def test_execute_with_eks_overrides(self, check_mock, wait_mock, job_description }, ] }, - ] + ], "metadata": { "labels": { "string": "string" @@ -367,7 +367,7 @@ def test_execute_with_eks_overrides(self, check_mock, wait_mock, job_description }, ] }, - ] + ], "metadata": { "labels": { "string": "string" From e90d42a96f16cfbd411f5a96607fc6de2e3ba696 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fully=2Eis=28=ED=92=80=EB=A6=AC=29?= Date: Fri, 12 Jul 2024 16:55:59 +0900 Subject: [PATCH 05/11] fix test --- tests/providers/amazon/aws/operators/test_batch.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/tests/providers/amazon/aws/operators/test_batch.py b/tests/providers/amazon/aws/operators/test_batch.py index 6bfc3673615135..6376f507256ac7 100644 --- a/tests/providers/amazon/aws/operators/test_batch.py +++ b/tests/providers/amazon/aws/operators/test_batch.py @@ -372,10 +372,15 @@ def test_execute_with_eks_overrides(self, check_mock, wait_mock, job_description "labels": { "string": "string" }, - }, + }, }, ] - } + }, + "parameters": {}, + "retryStrategy": { + "attempts": 1, + }, + "tags": {}, ) @mock.patch.object(BatchClientHook, "check_job_success") From bb77d5aa5a9cfe26e95484209ae051a60f4f4192 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fully=2Eis=28=ED=92=80=EB=A6=AC=29?= Date: Mon, 15 Jul 2024 16:56:52 +0900 Subject: [PATCH 06/11] fix --- tests/providers/amazon/aws/operators/test_batch.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/providers/amazon/aws/operators/test_batch.py b/tests/providers/amazon/aws/operators/test_batch.py index 6376f507256ac7..129471c26bf8db 100644 --- a/tests/providers/amazon/aws/operators/test_batch.py +++ b/tests/providers/amazon/aws/operators/test_batch.py @@ -376,11 +376,11 @@ def test_execute_with_eks_overrides(self, check_mock, wait_mock, job_description }, ] }, - "parameters": {}, - "retryStrategy": { + parameters: {}, + retryStrategy: { "attempts": 1, }, - "tags": {}, + tags: {}, ) @mock.patch.object(BatchClientHook, "check_job_success") From bb5068875020e2367ae35a062756085e7a6a7068 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fully=2Eis=28=ED=92=80=EB=A6=AC=29?= Date: Tue, 16 Jul 2024 08:18:00 +0900 Subject: [PATCH 07/11] fix --- tests/providers/amazon/aws/operators/test_batch.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/tests/providers/amazon/aws/operators/test_batch.py b/tests/providers/amazon/aws/operators/test_batch.py index 129471c26bf8db..09048b21d148d5 100644 --- a/tests/providers/amazon/aws/operators/test_batch.py +++ b/tests/providers/amazon/aws/operators/test_batch.py @@ -372,15 +372,13 @@ def test_execute_with_eks_overrides(self, check_mock, wait_mock, job_description "labels": { "string": "string" }, - }, + }, }, ] }, - parameters: {}, - retryStrategy: { - "attempts": 1, - }, - tags: {}, + parameters={}, + retryStrategy={"attempts": 1}, + tags={}, ) @mock.patch.object(BatchClientHook, "check_job_success") From 2240139674b6ee4aa429793f57aa8164e715346e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fully=2Eis=28=ED=92=80=EB=A6=AC=29?= Date: Tue, 16 Jul 2024 20:47:25 +0900 Subject: [PATCH 08/11] fix --- .../amazon/aws/operators/test_batch.py | 34 ++++++------------- 1 file changed, 11 insertions(+), 23 deletions(-) diff --git a/tests/providers/amazon/aws/operators/test_batch.py b/tests/providers/amazon/aws/operators/test_batch.py index 09048b21d148d5..a35590032894d3 100644 --- a/tests/providers/amazon/aws/operators/test_batch.py +++ b/tests/providers/amazon/aws/operators/test_batch.py @@ -286,9 +286,7 @@ def test_execute_with_eks_overrides(self, check_mock, wait_mock, job_description {"name": "string", "value": "string"}, ], "resources": [ - {"limits": {"string": "string"}, - "requests": {"string": "string"} - } + {"limits": {"string": "string"}, "requests": {"string": "string"}} ], }, ], @@ -305,17 +303,12 @@ def test_execute_with_eks_overrides(self, check_mock, wait_mock, job_description {"name": "string", "value": "string"}, ], "resources": [ - { - "limits": {"string": "string"}, - "requests": {"string": "string"}, - }, - ] + {"limits": {"string": "string"}, "requests": {"string": "string"}} + ], }, ], "metadata": { - "labels": { - "string": "string" - }, + "labels": {"string": "string"}, }, }, ] @@ -342,9 +335,7 @@ def test_execute_with_eks_overrides(self, check_mock, wait_mock, job_description {"name": "string", "value": "string"}, ], "resources": [ - {"limits": {"string": "string"}, - "requests": {"string": "string"} - } + {"limits": {"string": "string"}, "requests": {"string": "string"}} ], }, ], @@ -361,17 +352,12 @@ def test_execute_with_eks_overrides(self, check_mock, wait_mock, job_description {"name": "string", "value": "string"}, ], "resources": [ - { - "limits": {"string": "string"}, - "requests": {"string": "string"}, - }, - ] + {"limits": {"string": "string"}, "requests": {"string": "string"}} + ], }, ], "metadata": { - "labels": { - "string": "string" - }, + "labels": {"string": "string"}, }, }, ] @@ -415,7 +401,9 @@ def test_kill_job(self): self.batch.on_kill() self.client_mock.terminate_job.assert_called_once_with(jobId=JOB_ID, reason="Task killed by the user") - @pytest.mark.parametrize("override", ["overrides", "node_overrides", "ecs_properties_override", "eks_properties_override"]) + @pytest.mark.parametrize( + "override", ["overrides", "node_overrides", "ecs_properties_override", "eks_properties_override"] + ) @patch( "airflow.providers.amazon.aws.hooks.batch_client.BatchClientHook.client", new_callable=mock.PropertyMock, From e77149f82a18f494e23d3b22240f0d691d57eee2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fully=2Eis=28=ED=92=80=EB=A6=AC=29?= Date: Tue, 16 Jul 2024 20:48:39 +0900 Subject: [PATCH 09/11] fix --- airflow/providers/amazon/aws/hooks/batch_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/providers/amazon/aws/hooks/batch_client.py b/airflow/providers/amazon/aws/hooks/batch_client.py index 299c949e1e68bb..53c1b5b7e6e3eb 100644 --- a/airflow/providers/amazon/aws/hooks/batch_client.py +++ b/airflow/providers/amazon/aws/hooks/batch_client.py @@ -124,7 +124,7 @@ def submit_job( :param ecsPropertiesOverride: the same parameter that boto3 will receive :param eksPropertiesOverride: the same parameter that boto3 will receive - + :param tags: the same parameter that boto3 will receive :return: an API response From fced1e4796b23054e4a45cc993be909d3a2d299b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fully=2Eis=28=ED=92=80=EB=A6=AC=29?= Date: Tue, 16 Jul 2024 20:49:52 +0900 Subject: [PATCH 10/11] fix --- tests/providers/amazon/aws/operators/test_batch.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/providers/amazon/aws/operators/test_batch.py b/tests/providers/amazon/aws/operators/test_batch.py index a35590032894d3..4e0b224a7ebce8 100644 --- a/tests/providers/amazon/aws/operators/test_batch.py +++ b/tests/providers/amazon/aws/operators/test_batch.py @@ -264,7 +264,7 @@ def test_execute_with_ecs_overrides(self, check_mock, wait_mock, job_description tags={}, ) - + @mock.patch.object(BatchClientHook, "get_job_description") @mock.patch.object(BatchClientHook, "wait_for_job") @mock.patch.object(BatchClientHook, "check_job_success") From af6ebf658d27d8306146182edb58dc33d66de2c6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fully=2Eis=28=ED=92=80=EB=A6=AC=29?= Date: Wed, 17 Jul 2024 00:08:08 +0900 Subject: [PATCH 11/11] fix --- tests/providers/amazon/aws/operators/test_batch.py | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/tests/providers/amazon/aws/operators/test_batch.py b/tests/providers/amazon/aws/operators/test_batch.py index 4e0b224a7ebce8..7d9f27a6f4c5ae 100644 --- a/tests/providers/amazon/aws/operators/test_batch.py +++ b/tests/providers/amazon/aws/operators/test_batch.py @@ -264,7 +264,6 @@ def test_execute_with_ecs_overrides(self, check_mock, wait_mock, job_description tags={}, ) - @mock.patch.object(BatchClientHook, "get_job_description") @mock.patch.object(BatchClientHook, "wait_for_job") @mock.patch.object(BatchClientHook, "check_job_success") @@ -285,9 +284,7 @@ def test_execute_with_eks_overrides(self, check_mock, wait_mock, job_description "env": [ {"name": "string", "value": "string"}, ], - "resources": [ - {"limits": {"string": "string"}, "requests": {"string": "string"}} - ], + "resources": [{"limits": {"string": "string"}, "requests": {"string": "string"}}], }, ], "initContainers": [ @@ -302,9 +299,7 @@ def test_execute_with_eks_overrides(self, check_mock, wait_mock, job_description "env": [ {"name": "string", "value": "string"}, ], - "resources": [ - {"limits": {"string": "string"}, "requests": {"string": "string"}} - ], + "resources": [{"limits": {"string": "string"}, "requests": {"string": "string"}}], }, ], "metadata": {