Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DAG task execution and API fails if dag_run.conf is provided with an array or string (instead of dict) #15023

Closed
jscheffl1 opened this issue Mar 25, 2021 · 6 comments · Fixed by #15057
Labels
affected_version:2.0 Issues Reported for 2.0 area:core kind:bug This is a clearly a bug priority:high High priority bug that should be patched quickly but does not require immediate new release
Milestone

Comments

@jscheffl1
Copy link
Contributor

jscheffl1 commented Mar 25, 2021

Apache Airflow version: 2.0.1

Kubernetes version (if you are using kubernetes) (use kubectl version): Tried both pip install and k8s image

Environment: Dev Workstation of K8s execution - both the same

  • OS (e.g. from /etc/os-release): Ubuntu 20.04 LTS
  • Others: Python 3.6

What happened:

We use Airflow 1.10.14 currently in production and have a couple of DAGs defined today which digest a batch call. We implemented the batch (currently) in a way that the jobs are provided as dag_run.conf as an array of dicts, e.g. "[ { "job": "1" }, { "job": "2" } ]".
Trying to upgrade to Airflow 2.0.1 we see that such calls are still possible to submit but all further actions are failing:

  • It is not possible to query status via REST API, generates a HTTP 500
  • DAG starts but all tasks fail.
  • Logs can not be displayed (actually there are none produced on the file system)
  • Error logging is a bit complex, Celery worker does not provide meaningful logs on console nor produces log files, running a scheduler as SequentialExecutor reveals at least one meaningful sack trace as below
  • (probably a couple of other internal logic is also failing
  • Note that the dag_run.conf can be seen as submitted (so is correctly received) in Browse--> DAG Runs menu

As a regression using the same dag and passing a dag_run.conf = "{ "batch": [ { "job": "1" }, { "job": "2" } ] }" as well as "{}".

Example (simple) DAG to reproduce:

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
from datetime import timedelta

dag = DAG(
    'test1',
    description='My first DAG',
    default_args={
        'owner': 'jscheffl',
        'email': ['***@***.de'],
        'email_on_failure': True,
        'email_on_retry': True,
        'retries': 5,
        'retry_delay': timedelta(minutes=5),
    },
    start_date=days_ago(2)
)

hello_world = BashOperator(
    task_id='hello_world',
    bash_command='echo hello world',
    dag=dag,
)

Stack trace from SequentialExecutor:

Traceback (most recent call last):
  File "/home/jscheffl/Programmieren/Python/Airflow/syncignore/airflow-venv/bin/airflow", line 8, in <module>
    sys.exit(main())
  File "/home/jscheffl/Programmieren/Python/Airflow/syncignore/airflow-venv/lib/python3.8/site-packages/airflow/__main__.py", line 40, in main
    args.func(args)
  File "/home/jscheffl/Programmieren/Python/Airflow/syncignore/airflow-venv/lib/python3.8/site-packages/airflow/cli/cli_parser.py", line 48, in command
    return func(*args, **kwargs)
  File "/home/jscheffl/Programmieren/Python/Airflow/syncignore/airflow-venv/lib/python3.8/site-packages/airflow/utils/cli.py", line 89, in wrapper
    return f(*args, **kwargs)
  File "/home/jscheffl/Programmieren/Python/Airflow/syncignore/airflow-venv/lib/python3.8/site-packages/airflow/cli/commands/task_command.py", line 225, in task_run
    ti.init_run_context(raw=args.raw)
  File "/home/jscheffl/Programmieren/Python/Airflow/syncignore/airflow-venv/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1987, in init_run_context
    self._set_context(self)
  File "/home/jscheffl/Programmieren/Python/Airflow/syncignore/airflow-venv/lib/python3.8/site-packages/airflow/utils/log/logging_mixin.py", line 54, in _set_context
    set_context(self.log, context)
  File "/home/jscheffl/Programmieren/Python/Airflow/syncignore/airflow-venv/lib/python3.8/site-packages/airflow/utils/log/logging_mixin.py", line 174, in set_context
    handler.set_context(value)
  File "/home/jscheffl/Programmieren/Python/Airflow/syncignore/airflow-venv/lib/python3.8/site-packages/airflow/utils/log/file_task_handler.py", line 56, in set_context
    local_loc = self._init_file(ti)
  File "/home/jscheffl/Programmieren/Python/Airflow/syncignore/airflow-venv/lib/python3.8/site-packages/airflow/utils/log/file_task_handler.py", line 245, in _init_file
    relative_path = self._render_filename(ti, ti.try_number)
  File "/home/jscheffl/Programmieren/Python/Airflow/syncignore/airflow-venv/lib/python3.8/site-packages/airflow/utils/log/file_task_handler.py", line 77, in _render_filename
    jinja_context = ti.get_template_context()
  File "/home/jscheffl/Programmieren/Python/Airflow/syncignore/airflow-venv/lib/python3.8/site-packages/airflow/utils/session.py", line 65, in wrapper
    return func(*args, session=session, **kwargs)
  File "/home/jscheffl/Programmieren/Python/Airflow/syncignore/airflow-venv/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1606, in get_template_context
    self.overwrite_params_with_dag_run_conf(params=params, dag_run=dag_run)
  File "/home/jscheffl/Programmieren/Python/Airflow/syncignore/airflow-venv/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1743, in overwrite_params_with_dag_run_conf
    params.update(dag_run.conf)
ValueError: dictionary update sequence element #0 has length 4; 2 is required
{sequential_executor.py:66} ERROR - Failed to execute task Command '['airflow', 'tasks', 'run', 'test1', 'hello_world', '2021-03-25T22:22:36.732899+00:00', '--local', '--pool', 'default_pool', '--subdir', '/home/jscheffl/Programmieren/Python/Airflow/airflow-home/dags/test1.py']' returned non-zero exit status 1..
[2021-03-25 23:42:47,209] {scheduler_job.py:1199} INFO - Executor reports execution of test1.hello_world execution_date=2021-03-25 22:22:36.732899+00:00 exited with status failed for try_number 5

What you expected to happen:

  • EITHER the submission of arrays as dag_run.conf is supported like in 1.10.14
  • OR I would expect that the submission contains a validation if array values are not supported by Airflow (which it seems it was at least working in 1.10)

How to reproduce it: See DAG code above, reproduce the error e.g. by triggering with "[ "test" ]" as dag_run.conf

Anything else we need to know: I assume not :-)

@jscheffl1 jscheffl1 added the kind:bug This is a clearly a bug label Mar 25, 2021
@boring-cyborg
Copy link

boring-cyborg bot commented Mar 25, 2021

Thanks for opening your first issue here! Be sure to follow the issue template!

@vikramkoka vikramkoka added area:core affected_version:2.0 Issues Reported for 2.0 labels Mar 25, 2021
@mik-laj
Copy link
Member

mik-laj commented Mar 26, 2021

It's expected behavior. We only support Dict as a DAG argument. See discussion: https://lists.apache.org/thread.html/r88f5f57b4586f75eda382ce2c7309bd32b58cd9cfa1b01f681b6c8d1%40%3Cdev.airflow.apache.org%3E

@mik-laj mik-laj closed this as completed Mar 26, 2021
@mik-laj mik-laj reopened this Mar 26, 2021
@mik-laj
Copy link
Member

mik-laj commented Mar 26, 2021

Are you willing to submit a PR with validation?

@jscheffl1
Copy link
Contributor Author

Hi mik-laj,
I'll try to propose a PR for option #2 on the weekend, let me have a bit of time :-)
Jens

@jscheffl1
Copy link
Contributor Author

Hi @mik-laj I made my first PR (after struggeling a bit with GIT and GitHub processes) - I needed to open a second (at least I was not able to figure out how to update the first PR afte rpylint errors) - Can you support to get a review?

@jscheffl1 jscheffl1 changed the title DAG task execution and API fails if dag_run.conf is provided with an array (instead of dict) DAG task execution and API fails if dag_run.conf is provided with an array or string (instead of dict) Mar 28, 2021
@ashb ashb added the priority:high High priority bug that should be patched quickly but does not require immediate new release label Mar 30, 2021
@ashb ashb added this to the Airflow 2.0.2 milestone Mar 30, 2021
@ashb ashb modified the milestones: Airflow 2.0.2, Airflow 2.0.3 Apr 22, 2021
@jscheffl1
Copy link
Contributor Author

Hi @ashb I saw that my proposed bugfix/PR was pushed from Airflow 2.0.2 to 2.0.3. Is there any reason why this is not merged yet? Lag in quality? Do I need to push something (harder)? Or just related to missing capacity?
Please let me know if I can support in any way getting this on master - else my first contribution was a waste and you probably lose someone who would be motivated to contribute :-(

@ashb ashb modified the milestones: Airflow 2.0.3, Airflow 2.1.1 May 7, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
affected_version:2.0 Issues Reported for 2.0 area:core kind:bug This is a clearly a bug priority:high High priority bug that should be patched quickly but does not require immediate new release
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants