From c96af8e3fe57a1414f0eaa78ec44e01f74833ffc Mon Sep 17 00:00:00 2001 From: Kaxil Naik Date: Fri, 12 Jun 2020 01:17:43 +0100 Subject: [PATCH] Further validation that only task commands are run by executors (#9240) (cherry-picked from 99c534e9faf) --- .../contrib/executors/kubernetes_executor.py | 6 ++---- .../executors/test_kubernetes_executor.py | 3 ++- tests/executors/test_dask_executor.py | 6 +++--- tests/executors/test_local_executor.py | 20 ++++++++++++++----- 4 files changed, 22 insertions(+), 13 deletions(-) diff --git a/airflow/contrib/executors/kubernetes_executor.py b/airflow/contrib/executors/kubernetes_executor.py index 415d698d12e03..f7b82d8bc5f5b 100644 --- a/airflow/contrib/executors/kubernetes_executor.py +++ b/airflow/contrib/executors/kubernetes_executor.py @@ -508,11 +508,9 @@ def run_next(self, next_job): self.log.info('Kubernetes job is %s', str(next_job)) key, command, kube_executor_config = next_job dag_id, task_id, execution_date, try_number = key - if isinstance(command, str): - command = [command] + if command[0:2] != ["airflow", "run"]: + raise ValueError('The command must start with ["airflow", "run"].') - if command[0] != "airflow": - raise ValueError('The first element of command must be equal to "airflow".') self.log.debug("Kubernetes running for command %s", command) self.log.debug("Kubernetes launching image %s", self.kube_config.kube_image) pod = self.worker_configuration.make_pod( diff --git a/tests/contrib/executors/test_kubernetes_executor.py b/tests/contrib/executors/test_kubernetes_executor.py index 7d2fe39494597..2700ccb1d0f46 100644 --- a/tests/contrib/executors/test_kubernetes_executor.py +++ b/tests/contrib/executors/test_kubernetes_executor.py @@ -1004,7 +1004,8 @@ def test_run_next_exception(self, mock_get_kube_client, mock_kubernetes_job_watc # Execute a task while the Api Throws errors try_number = 1 kubernetesExecutor.execute_async(key=('dag', 'task', datetime.utcnow(), try_number), - command='command', executor_config={}) + command=['airflow', 'run', 'true', 'some_parameter'], + executor_config={}) kubernetesExecutor.sync() kubernetesExecutor.sync() diff --git a/tests/executors/test_dask_executor.py b/tests/executors/test_dask_executor.py index 0654494362631..59e86cd5144f1 100644 --- a/tests/executors/test_dask_executor.py +++ b/tests/executors/test_dask_executor.py @@ -50,12 +50,12 @@ class BaseDaskTest(unittest.TestCase): def assert_tasks_on_executor(self, executor): + + success_command = ['airflow', 'run', '--help'] + fail_command = ['airflow', 'run', 'false'] # start the executor executor.start() - success_command = ['true', 'some_parameter'] - fail_command = ['false', 'some_parameter'] - executor.execute_async(key='success', command=success_command) executor.execute_async(key='fail', command=fail_command) diff --git a/tests/executors/test_local_executor.py b/tests/executors/test_local_executor.py index 3aebbe270400e..67f0b287c7e1b 100644 --- a/tests/executors/test_local_executor.py +++ b/tests/executors/test_local_executor.py @@ -17,6 +17,7 @@ # specific language governing permissions and limitations # under the License. +import subprocess import unittest from tests.compat import mock @@ -28,13 +29,22 @@ class LocalExecutorTest(unittest.TestCase): TEST_SUCCESS_COMMANDS = 5 - def execution_parallelism(self, parallelism=0): + @mock.patch('airflow.executors.local_executor.subprocess.check_call') + def execution_parallelism(self, mock_check_call, parallelism=0): + success_command = ['airflow', 'run', 'true', 'some_parameter'] + fail_command = ['airflow', 'run', 'false'] + + def fake_execute_command(command, close_fds=True): # pylint: disable=unused-argument + if command != success_command: + raise subprocess.CalledProcessError(returncode=1, cmd=command) + else: + return 0 + + mock_check_call.side_effect = fake_execute_command executor = LocalExecutor(parallelism=parallelism) executor.start() success_key = 'success {}' - success_command = ['true', 'some_parameter'] - fail_command = ['false', 'some_parameter'] self.assertTrue(executor.result_queue.empty()) for i in range(self.TEST_SUCCESS_COMMANDS): @@ -58,11 +68,11 @@ def execution_parallelism(self, parallelism=0): self.assertEqual(executor.workers_used, expected) def test_execution_unlimited_parallelism(self): - self.execution_parallelism(parallelism=0) + self.execution_parallelism(parallelism=0) # pylint: disable=no-value-for-parameter def test_execution_limited_parallelism(self): test_parallelism = 2 - self.execution_parallelism(parallelism=test_parallelism) + self.execution_parallelism(parallelism=test_parallelism) # pylint: disable=no-value-for-parameter @mock.patch('airflow.executors.local_executor.LocalExecutor.sync') @mock.patch('airflow.executors.base_executor.BaseExecutor.trigger_tasks')