Skip to content

Commit

Permalink
[AIRFLOW-2930] Fix celery excecutor scheduler crash (apache#3784)
Browse files Browse the repository at this point in the history
Caused by an update in PR apache#3740.
execute_command.apply_async(args=command, ...)
-command is a list of short unicode strings and the above code pass multiple
arguments to a function defined as taking only one argument.
-command = ["airflow", "run", "dag323",...]
-args = command = ["airflow", "run", "dag323", ...]
-execute_command("airflow","run","dag3s3", ...) will be error and exit.
  • Loading branch information
YingboWang authored and Alice Berard committed Jan 3, 2019
1 parent e826e1e commit f87de81
Show file tree
Hide file tree
Showing 4 changed files with 7 additions and 7 deletions.
2 changes: 1 addition & 1 deletion airflow/executors/celery_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def execute_async(self, key, command,
self.log.info("[celery] queuing {key} through celery, "
"queue={queue}".format(**locals()))
self.tasks[key] = execute_command.apply_async(
args=command, queue=queue)
args=[command], queue=queue)
self.last_state[key] = celery_states.PENDING

def sync(self):
Expand Down
4 changes: 2 additions & 2 deletions tests/executors/dask_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ def assert_tasks_on_executor(self, executor):
# start the executor
executor.start()

success_command = ['true', ]
fail_command = ['false', ]
success_command = ['true', 'some_parameter']
fail_command = ['false', 'some_parameter']

executor.execute_async(key='success', command=success_command)
executor.execute_async(key='fail', command=fail_command)
Expand Down
4 changes: 2 additions & 2 deletions tests/executors/test_celery_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ def test_celery_integration(self):
executor.start()
with start_worker(app=app, logfile=sys.stdout, loglevel='debug'):

success_command = ['true', ]
fail_command = ['false', ]
success_command = ['true', 'some_parameter']
fail_command = ['false', 'some_parameter']

executor.execute_async(key='success', command=success_command)
# errors are propagated for some reason
Expand Down
4 changes: 2 additions & 2 deletions tests/executors/test_local_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ def execution_parallelism(self, parallelism=0):
executor.start()

success_key = 'success {}'
success_command = ['true', ]
fail_command = ['false', ]
success_command = ['true', 'some_parameter']
fail_command = ['false', 'some_parameter']

for i in range(self.TEST_SUCCESS_COMMANDS):
key, command = success_key.format(i), success_command
Expand Down

0 comments on commit f87de81

Please sign in to comment.