From f87de8107443be7d543dd26d5e8292f0c53e7326 Mon Sep 17 00:00:00 2001 From: Yingbo Wang Date: Tue, 28 Aug 2018 05:33:22 -0700 Subject: [PATCH] [AIRFLOW-2930] Fix celery excecutor scheduler crash (#3784) Caused by an update in PR #3740. execute_command.apply_async(args=command, ...) -command is a list of short unicode strings and the above code pass multiple arguments to a function defined as taking only one argument. -command = ["airflow", "run", "dag323",...] -args = command = ["airflow", "run", "dag323", ...] -execute_command("airflow","run","dag3s3", ...) will be error and exit. --- airflow/executors/celery_executor.py | 2 +- tests/executors/dask_executor.py | 4 ++-- tests/executors/test_celery_executor.py | 4 ++-- tests/executors/test_local_executor.py | 4 ++-- 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/airflow/executors/celery_executor.py b/airflow/executors/celery_executor.py index 2128ae7b09228..61bbc667160d0 100644 --- a/airflow/executors/celery_executor.py +++ b/airflow/executors/celery_executor.py @@ -82,7 +82,7 @@ def execute_async(self, key, command, self.log.info("[celery] queuing {key} through celery, " "queue={queue}".format(**locals())) self.tasks[key] = execute_command.apply_async( - args=command, queue=queue) + args=[command], queue=queue) self.last_state[key] = celery_states.PENDING def sync(self): diff --git a/tests/executors/dask_executor.py b/tests/executors/dask_executor.py index 9bf051f5805d2..4f0009e1ccb21 100644 --- a/tests/executors/dask_executor.py +++ b/tests/executors/dask_executor.py @@ -55,8 +55,8 @@ def assert_tasks_on_executor(self, executor): # start the executor executor.start() - success_command = ['true', ] - fail_command = ['false', ] + success_command = ['true', 'some_parameter'] + fail_command = ['false', 'some_parameter'] executor.execute_async(key='success', command=success_command) executor.execute_async(key='fail', command=fail_command) diff --git a/tests/executors/test_celery_executor.py b/tests/executors/test_celery_executor.py index 69f9fbfe9fea8..95ad58f6a2d96 100644 --- a/tests/executors/test_celery_executor.py +++ b/tests/executors/test_celery_executor.py @@ -34,8 +34,8 @@ def test_celery_integration(self): executor.start() with start_worker(app=app, logfile=sys.stdout, loglevel='debug'): - success_command = ['true', ] - fail_command = ['false', ] + success_command = ['true', 'some_parameter'] + fail_command = ['false', 'some_parameter'] executor.execute_async(key='success', command=success_command) # errors are propagated for some reason diff --git a/tests/executors/test_local_executor.py b/tests/executors/test_local_executor.py index 846e1325618ac..59cb09c74e6b0 100644 --- a/tests/executors/test_local_executor.py +++ b/tests/executors/test_local_executor.py @@ -33,8 +33,8 @@ def execution_parallelism(self, parallelism=0): executor.start() success_key = 'success {}' - success_command = ['true', ] - fail_command = ['false', ] + success_command = ['true', 'some_parameter'] + fail_command = ['false', 'some_parameter'] for i in range(self.TEST_SUCCESS_COMMANDS): key, command = success_key.format(i), success_command