Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove lazy load from TaskInstance.DagModel #6

Merged
merged 2 commits into from
Apr 2, 2024

Conversation

shinny-taojiachun
Copy link
Collaborator

@shinny-taojiachun shinny-taojiachun commented Mar 27, 2024

背景

单个dag含有多个任务,且多个任务都发生故障进入僵尸状态后会导致scheduler崩溃
相关上游issue:

方案

cherry-pick 上有更改修复sql查询语句问题

日志

Mar 27 20:27:03 airflow-service-ali-all-in-one airflow[3724886]: [2024-03-27 20:27:03,752] {scheduler_job.py:1503} WARNING - Failing (2) jobs without heartbeat after 2024-03-27 12:22:03.749656+00:00
Mar 27 20:27:03 airflow-service-ali-all-in-one airflow[3724886]: [2024-03-27 20:27:03,755] {scheduler_job.py:1514} ERROR - Detected zombie job: {'full_filepath': '/var/lib/ef/airflow/dags/ins-serve-ins-serve-i>
Mar 27 20:27:03 airflow-service-ali-all-in-one airflow[3724886]: [2024-03-27 20:27:03,758] {scheduler_job.py:763} ERROR - Exception when executing SchedulerJob._run_scheduler_loop
Mar 27 20:27:03 airflow-service-ali-all-in-one airflow[3724886]: Traceback (most recent call last):
Mar 27 20:27:03 airflow-service-ali-all-in-one airflow[3724886]:   File "/usr/local/lib/python3.9/dist-packages/airflow/jobs/scheduler_job.py", line 746, in _execute
Mar 27 20:27:03 airflow-service-ali-all-in-one airflow[3724886]:     self._run_scheduler_loop()
Mar 27 20:27:03 airflow-service-ali-all-in-one airflow[3724886]:   File "/usr/local/lib/python3.9/dist-packages/airflow/jobs/scheduler_job.py", line 873, in _run_scheduler_loop
Mar 27 20:27:03 airflow-service-ali-all-in-one airflow[3724886]:     next_event = timers.run(blocking=False)
Mar 27 20:27:03 airflow-service-ali-all-in-one airflow[3724886]:   File "/usr/lib/python3.9/sched.py", line 151, in run
Mar 27 20:27:03 airflow-service-ali-all-in-one airflow[3724886]:     action(*argument, **kwargs)
Mar 27 20:27:03 airflow-service-ali-all-in-one airflow[3724886]:   File "/usr/local/lib/python3.9/dist-packages/airflow/utils/event_scheduler.py", line 37, in repeat
Mar 27 20:27:03 airflow-service-ali-all-in-one airflow[3724886]:     action(*args, **kwargs)
Mar 27 20:27:03 airflow-service-ali-all-in-one airflow[3724886]:   File "/usr/local/lib/python3.9/dist-packages/airflow/utils/session.py", line 75, in wrapper
Mar 27 20:27:03 airflow-service-ali-all-in-one airflow[3724886]:     return func(*args, session=session, **kwargs)
Mar 27 20:27:03 airflow-service-ali-all-in-one airflow[3724886]:     self._run_scheduler_loop()
Mar 27 20:27:03 airflow-service-ali-all-in-one airflow[3724886]:   File "/usr/local/lib/python3.9/dist-packages/airflow/jobs/scheduler_job.py", line 873, in _run_scheduler_loop
Mar 27 20:27:03 airflow-service-ali-all-in-one airflow[3724886]:     next_event = timers.run(blocking=False)
Mar 27 20:27:03 airflow-service-ali-all-in-one airflow[3724886]:   File "/usr/lib/python3.9/sched.py", line 151, in run
Mar 27 20:27:03 airflow-service-ali-all-in-one airflow[3724886]:     action(*argument, **kwargs)
Mar 27 20:27:03 airflow-service-ali-all-in-one airflow[3724886]:   File "/usr/local/lib/python3.9/dist-packages/airflow/utils/event_scheduler.py", line 37, in repeat
Mar 27 20:27:03 airflow-service-ali-all-in-one airflow[3724886]:     action(*args, **kwargs)
Mar 27 20:27:03 airflow-service-ali-all-in-one airflow[3724886]:   File "/usr/local/lib/python3.9/dist-packages/airflow/utils/session.py", line 75, in wrapper
Mar 27 20:27:03 airflow-service-ali-all-in-one airflow[3724886]:     return func(*args, session=session, **kwargs)
Mar 27 20:27:03 airflow-service-ali-all-in-one airflow[3724886]:   File "/usr/local/lib/python3.9/dist-packages/airflow/jobs/scheduler_job.py", line 1510, in _find_zombies
Mar 27 20:27:03 airflow-service-ali-all-in-one airflow[3724886]:     processor_subdir=ti.dag_model.processor_subdir,
Mar 27 20:27:03 airflow-service-ali-all-in-one airflow[3724886]:   File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/orm/attributes.py", line 481, in __get__
Mar 27 20:27:03 airflow-service-ali-all-in-one airflow[3724886]:     return self.impl.get(state, dict_)
Mar 27 20:27:03 airflow-service-ali-all-in-one airflow[3724886]:   File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/orm/attributes.py", line 926, in get
Mar 27 20:27:03 airflow-service-ali-all-in-one airflow[3724886]:     value = self._fire_loader_callables(state, key, passive)
Mar 27 20:27:03 airflow-service-ali-all-in-one airflow[3724886]:   File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/orm/attributes.py", line 962, in _fire_loader_callables
Mar 27 20:27:03 airflow-service-ali-all-in-one airflow[3724886]:     return self.callable_(state, passive)
Mar 27 20:27:03 airflow-service-ali-all-in-one airflow[3724886]:   File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/orm/strategies.py", line 861, in _load_for_state
Mar 27 20:27:03 airflow-service-ali-all-in-one airflow[3724886]:     raise orm_exc.DetachedInstanceError(
Mar 27 20:27:03 airflow-service-ali-all-in-one airflow[3724886]: sqlalchemy.orm.exc.DetachedInstanceError: Parent instance <TaskInstance at 0x7fc88e0d90a0> is not bound to a Session; lazy load operation of att>
Mar 27 20:27:03 airflow-service-ali-all-in-one airflow[3724886]: [2024-03-27 20:27:03,760] {local_executor.py:428} INFO - Shutting down LocalExecutor; waiting for running tasks to finish.  Signal again if you >
Mar 27 20:27:03 airflow-service-ali-all-in-one airflow[3724886]: [2024-03-27 20:27:03,839] {scheduler_job.py:775} INFO - Exited execute loop
Mar 27 20:27:03 airflow-service-ali-all-in-one airflow[3724892]: [2024-03-27 20:27:03 +0800] [3724892] [INFO] Handling signal: term
Mar 27 20:27:03 airflow-service-ali-all-in-one airflow[3724894]: [2024-03-27 20:27:03 +0800] [3724894] [INFO] Worker exiting (pid: 3724894)
Mar 27 20:27:03 airflow-service-ali-all-in-one airflow[3724886]: Traceback (most recent call last):
Mar 27 20:27:03 airflow-service-ali-all-in-one airflow[3724886]:   File "/usr/local/bin/airflow", line 8, in <module>
Mar 27 20:27:03 airflow-service-ali-all-in-one airflow[3724886]:     sys.exit(main())
Mar 27 20:27:03 airflow-service-ali-all-in-one airflow[3724886]:   File "/usr/local/lib/python3.9/dist-packages/airflow/__main__.py", line 39, in main
Mar 27 20:27:03 airflow-service-ali-all-in-one airflow[3724886]:     args.func(args)
Mar 27 20:27:03 airflow-service-ali-all-in-one airflow[3724886]:   File "/usr/local/lib/python3.9/dist-packages/airflow/cli/cli_parser.py", line 52, in command
Mar 27 20:27:03 airflow-service-ali-all-in-one airflow[3724886]:     return func(*args, **kwargs)
Mar 27 20:27:03 airflow-service-ali-all-in-one airflow[3724886]:   File "/usr/local/lib/python3.9/dist-packages/airflow/utils/cli.py", line 99, in wrapper
Mar 27 20:27:03 airflow-service-ali-all-in-one airflow[3724886]:     return f(*args, **kwargs)
Mar 27 20:27:03 airflow-service-ali-all-in-one airflow[3724886]:   File "/usr/local/lib/python3.9/dist-packages/airflow/cli/commands/scheduler_command.py", line 85, in scheduler
Mar 27 20:27:03 airflow-service-ali-all-in-one airflow[3724886]:     _run_scheduler_job(args=args)
Mar 27 20:27:03 airflow-service-ali-all-in-one airflow[3724886]:   File "/usr/local/lib/python3.9/dist-packages/airflow/cli/commands/scheduler_command.py", line 50, in _run_scheduler_job
Mar 27 20:27:03 airflow-service-ali-all-in-one airflow[3724886]:     job.run()
Mar 27 20:27:03 airflow-service-ali-all-in-one airflow[3724886]:   File "/usr/local/lib/python3.9/dist-packages/airflow/jobs/base_job.py", line 247, in run
Mar 27 20:27:03 airflow-service-ali-all-in-one airflow[3724886]:     self._execute()
Mar 27 20:27:03 airflow-service-ali-all-in-one airflow[3724886]:   File "/usr/local/lib/python3.9/dist-packages/airflow/jobs/scheduler_job.py", line 746, in _execute
Mar 27 20:27:03 airflow-service-ali-all-in-one airflow[3724886]:     self._run_scheduler_loop()
Mar 27 20:27:03 airflow-service-ali-all-in-one airflow[3724886]:   File "/usr/local/lib/python3.9/dist-packages/airflow/jobs/scheduler_job.py", line 873, in _run_scheduler_loop
Mar 27 20:27:03 airflow-service-ali-all-in-one airflow[3724886]:     next_event = timers.run(blocking=False)
Mar 27 20:27:03 airflow-service-ali-all-in-one airflow[3724886]:   File "/usr/lib/python3.9/sched.py", line 151, in run
Mar 27 20:27:03 airflow-service-ali-all-in-one airflow[3724886]:     action(*argument, **kwargs)
Mar 27 20:27:03 airflow-service-ali-all-in-one airflow[3724886]:   File "/usr/local/lib/python3.9/dist-packages/airflow/utils/event_scheduler.py", line 37, in repeat
Mar 27 20:27:03 airflow-service-ali-all-in-one airflow[3724886]:     action(*args, **kwargs)
Mar 27 20:27:03 airflow-service-ali-all-in-one airflow[3724886]:   File "/usr/local/lib/python3.9/dist-packages/airflow/utils/session.py", line 75, in wrapper
Mar 27 20:27:03 airflow-service-ali-all-in-one airflow[3724886]:     return func(*args, session=session, **kwargs)
Mar 27 20:27:03 airflow-service-ali-all-in-one airflow[3724886]:   File "/usr/local/lib/python3.9/dist-packages/airflow/jobs/scheduler_job.py", line 1510, in _find_zombies
Mar 27 20:27:03 airflow-service-ali-all-in-one airflow[3724886]:     processor_subdir=ti.dag_model.processor_subdir,
Mar 27 20:27:03 airflow-service-ali-all-in-one airflow[3724886]:   File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/orm/attributes.py", line 481, in __get__
Mar 27 20:27:03 airflow-service-ali-all-in-one airflow[3724886]:     return self.impl.get(state, dict_)
Mar 27 20:27:03 airflow-service-ali-all-in-one airflow[3724886]:   File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/orm/attributes.py", line 926, in get
Mar 27 20:27:03 airflow-service-ali-all-in-one airflow[3724886]:     value = self._fire_loader_callables(state, key, passive)
Mar 27 20:27:03 airflow-service-ali-all-in-one airflow[3724886]:   File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/orm/attributes.py", line 962, in _fire_loader_callables
Mar 27 20:27:03 airflow-service-ali-all-in-one airflow[3724886]:     return self.callable_(state, passive)
Mar 27 20:27:03 airflow-service-ali-all-in-one airflow[3724886]:   File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/orm/strategies.py", line 861, in _load_for_state
Mar 27 20:27:03 airflow-service-ali-all-in-one airflow[3724886]:     raise orm_exc.DetachedInstanceError(
Mar 27 20:27:03 airflow-service-ali-all-in-one airflow[3724886]: sqlalchemy.orm.exc.DetachedInstanceError: Parent instance <TaskInstance at 0x7fc88e0d90a0> is not bound to a Session; lazy load operation of att>
Mar 27 20:27:03 airflow-service-ali-all-in-one airflow[3724961]: [2024-03-27 20:27:03 +0800] [3724961] [INFO] Worker exiting (pid: 3724961)
Mar 27 20:27:03 airflow-service-ali-all-in-one airflow[3724892]: [2024-03-27 20:27:03 +0800] [3724892] [INFO] Shutting down: Master

Copy link

coderabbitai bot commented Mar 27, 2024

漫步

这次更新的整体变更是为了解决多个来自同一DAG的任务进入僵尸状态可能阻止第二个任务懒加载DAG对象的问题。

变更

文件 变更摘要
airflow/models/taskinstance.py
airflow/jobs/scheduler_job.py
tests/jobs/test_scheduler_job.py
taskinstance.py中的TaskInstance类的dag关系定义新增了lazy="immediate"参数,scheduler_job.py中的Airflow调度作业的_find_zombies方法进行了修正,test_scheduler_job.py中的测试方法进行了重构以适应会话处理和僵尸检测逻辑的调整。

🐰

在代码的迷宫里,兔子轻巧跳跃,

发现了僵尸,轻轻一躲,让它们远离。

"立即"加载,问题解决,兔子欢呼,

DAG清晰,任务顺畅,兔子的胜利!

🌟🥕🌟


Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai generate interesting stats about this repository and render them as a table.
    • @coderabbitai show all the console.log statements in this repository.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (invoked as PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger a review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai help to get help.

Additionally, you can add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.

CodeRabbit Configration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • The JSON schema for the configuration file is available here.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/coderabbit-overrides.v2.json

CodeRabbit Discord Community

Join our Discord Community to get help, request features, and share feedback.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review Status

Actionable comments generated: 0

Configuration used: CodeRabbit UI

Commits Files that changed from the base of the PR and between f763058 and 80f3f21.
Files selected for processing (1)
  • airflow/models/taskinstance.py (1 hunks)
Additional comments: 1
airflow/models/taskinstance.py (1)
  • 507-507: 在 TaskInstance 类的 dag_model 关系定义中添加了 lazy="immediate" 参数。这个更改旨在解决当一个 DAG 中的多个任务进入僵尸状态时,导致调度器崩溃的问题。通过设置为 "immediate",可以确保在访问 TaskInstance 时立即加载 DAG 对象,从而避免了在描述的情况下失败的懒加载操作。这是一个针对特定问题的解决方案,应该在实际环境中进行充分测试,以确保它不会引入新的性能问题或其他意外行为。

BobDu and others added 2 commits March 29, 2024 16:24
…ss (apache#28198)

```
[2022-12-06T14:20:21.622+0000] {base_job.py:229} DEBUG - [heartbeat]
[2022-12-06T14:20:21.623+0000] {scheduler_job.py:1495} DEBUG - Finding 'running' jobs without a recent heartbeat
[2022-12-06T14:20:21.637+0000] {scheduler_job.py:1515} WARNING - Failing (2) jobs without heartbeat after 2022-12-06 14:15:21.623199+00:00
[2022-12-06T14:20:21.641+0000] {scheduler_job.py:1526} ERROR - Detected zombie job: {'full_filepath': '/opt/airflow/dags/xxx_dag.py', 'processor_subdir': '/opt/airflow/dags', 'msg': "{'DAG Id': 'xxx', 'Task Id': 'xxx', 'Run Id': 'scheduled__2022-12-05T00:15:00+00:00', 'Hostname': 'airflow-worker-0.airflow-worker.airflow2.svc.cluster.local', 'External Executor Id': '9520cb9f-3245-497a-8e17-e9dec29d4549'}", 'simple_task_instance': <airflow.models.taskinstance.SimpleTaskInstance object at 0x7f1cd4de4130>, 'is_failure_callback': True}
[2022-12-06T14:20:21.645+0000] {scheduler_job.py:763} ERROR - Exception when executing SchedulerJob._run_scheduler_loop
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", line 746, in _execute
    self._run_scheduler_loop()
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", line 878, in _run_scheduler_loop
    next_event = timers.run(blocking=False)
  File "/usr/local/lib/python3.10/sched.py", line 151, in run
    action(*argument, **kwargs)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/event_scheduler.py", line 37, in repeat
    action(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py", line 75, in wrapper
    return func(*args, session=session, **kwargs)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", line 1522, in _find_zombies
    processor_subdir=ti.dag_model.processor_subdir,
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/attributes.py", line 481, in __get__
    return self.impl.get(state, dict_)
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/attributes.py", line 926, in get
    value = self._fire_loader_callables(state, key, passive)
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/attributes.py", line 962, in _fire_loader_callables
    return self.callable_(state, passive)
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/strategies.py", line 861, in _load_for_state
    raise orm_exc.DetachedInstanceError(
sqlalchemy.orm.exc.DetachedInstanceError: Parent instance <TaskInstance at 0x7f1ccc3e8520> is not bound to a Session; lazy load operation of attribute 'dag_model' cannot proceed (Background on this error at: https://sqlalche.me/e/14/bhk3)
[2022-12-06T14:20:21.647+0000] {celery_executor.py:443} DEBUG - Inquiring about 5 celery task(s)
[2022-12-06T14:20:21.669+0000] {celery_executor.py:602} DEBUG - Fetched 5 state(s) for 5 task(s)
[2022-12-06T14:20:21.669+0000] {celery_executor.py:446} DEBUG - Inquiries completed.
[2022-12-06T14:20:21.669+0000] {scheduler_job.py:775} INFO - Exited execute loop
[2022-12-06T14:20:21.674+0000] {cli_action_loggers.py:83} DEBUG - Calling callbacks: []
Traceback (most recent call last):
  File "/home/airflow/.local/bin/airflow", line 8, in <module>
    sys.exit(main())
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/__main__.py", line 39, in main
    args.func(args)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/cli/cli_parser.py", line 52, in command
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/cli.py", line 103, in wrapper
    return f(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/cli/commands/scheduler_command.py", line 85, in scheduler
    _run_scheduler_job(args=args)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/cli/commands/scheduler_command.py", line 50, in _run_scheduler_job
    job.run()
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/base_job.py", line 247, in run
    self._execute()
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", line 746, in _execute
    self._run_scheduler_loop()
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", line 878, in _run_scheduler_loop
    next_event = timers.run(blocking=False)
  File "/usr/local/lib/python3.10/sched.py", line 151, in run
    action(*argument, **kwargs)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/event_scheduler.py", line 37, in repeat
    action(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py", line 75, in wrapper
    return func(*args, session=session, **kwargs)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", line 1522, in _find_zombies
    processor_subdir=ti.dag_model.processor_subdir,
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/attributes.py", line 481, in __get__
    return self.impl.get(state, dict_)
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/attributes.py", line 926, in get
    value = self._fire_loader_callables(state, key, passive)
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/attributes.py", line 962, in _fire_loader_callables
    return self.callable_(state, passive)
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/strategies.py", line 861, in _load_for_state
    raise orm_exc.DetachedInstanceError(
sqlalchemy.orm.exc.DetachedInstanceError: Parent instance <TaskInstance at 0x7f1ccc3e8520> is not bound to a Session; lazy load operation of attribute 'dag_model' cannot proceed (Background on this error at: https://sqlalche.me/e/14/bhk3)
```

When in standalone dag processor mode, will use `DatabaseCallbackSink`

`_find_zombies` func call `self.executor.send_callback(request)` func.
But not propagation orm `session` , provide_session in `send` func again.

```
class DatabaseCallbackSink(BaseCallbackSink):
    """Sends callbacks to database."""

    @provide_session
    def send(self, callback: CallbackRequest, session: Session = NEW_SESSION) -> None:
        """Sends callback for execution."""
        db_callback = DbCallbackRequest(callback=callback, priority_weight=10)
        session.add(db_callback)
```

Signed-off-by: BobDu <[email protected]>

(cherry picked from commit 4b340b7)
…e#28544)

apache#28198 accidentally dropped a join in a query, leading to this:

    airflow/jobs/scheduler_job.py:1547 SAWarning: SELECT statement has a
    cartesian product between FROM element(s) "dag_run_1", "task_instance",
    "job" and FROM element "dag". Apply join condition(s) between each element to resolve.

(cherry picked from commit a24d18a)
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Review Status

Configuration used: CodeRabbit UI

Commits Files that changed from the base of the PR and between f763058 and a9e9b3f.
Files selected for processing (2)
  • airflow/jobs/scheduler_job.py (2 hunks)
  • tests/jobs/test_scheduler_job.py (4 hunks)
Additional Context Used
Additional comments not posted (9)
airflow/jobs/scheduler_job.py (1)

1482-1512: > 📝 NOTE

This review was outside the diff hunks and was mapped to the diff hunk with the greatest overlap. Original lines [1474-1509]

_find_zombies 方法中,你移除了 @provide_session 装饰器并改为使用 create_session() 来管理会话。这是一个好的实践,因为它提供了更细粒度的控制,确保会话在使用完毕后被正确关闭。此外,你还调整了查询结构并更新了变量赋值,这有助于提高代码的可读性和维护性。整体上,这些更改有助于提高方法的效率和可靠性。

tests/jobs/test_scheduler_job.py (8)

4054-4074: > 📝 NOTE

This review was outside the diff hunks and was mapped to the diff hunk with the greatest overlap. Original lines [4008-4071]

test_find_zombies测试中,确保了僵尸任务被正确地识别和处理。这个测试模拟了一个场景,其中有两个任务实例被标记为运行状态,但其中一个由于超时被视为僵尸任务。测试验证了只有超时的任务实例被发送到了执行器的回调接收器中,这是预期的行为。


4054-4074: > 📝 NOTE

This review was outside the diff hunks and was mapped to the diff hunk with the greatest overlap. Original lines [4071-4145]

test_zombie_message测试中,检查了生成的僵尸消息是否符合预期格式。这个测试通过修改任务实例的不同属性来验证僵尸消息的内容是否正确反映了这些属性的变化。这有助于确保在任务实例变成僵尸任务时,相关的错误信息能够准确地被记录和报告。


4143-4153: > 📝 NOTE

This review was outside the diff hunks and was mapped to the diff hunk with the greatest overlap. Original lines [4146-4178]

test_find_zombies_handle_failure_callbacks_are_correctly_passed_to_dag_processor测试确保了当发现僵尸任务时,失败回调被正确地传递给DAG处理器。这个测试模拟了一个场景,其中一个任务实例因为超时而被视为僵尸任务,并验证了相应的失败回调是否被发送到了DAG文件处理器。这是重要的,因为它确保了在任务失败时,可以触发相应的失败处理逻辑。


4176-4182: > 📝 NOTE

This review was outside the diff hunks and was mapped to the diff hunk with the greatest overlap. Original lines [4179-4208]

test_cleanup_stale_dags测试检查了清理过时DAGs的逻辑是否按预期工作。这个测试首先创建了一个DAG并将其标记为过时,然后运行清理逻辑,并验证过时的DAG是否被正确地从数据库中移除。这有助于确保系统中不会积累无用的DAG记录,从而保持数据库的整洁。


4005-4017: > 📝 NOTE

This review was outside the diff hunks, and no overlapping diff hunk was found. Original lines [4209-4271]

test_should_mark_empty_task_as_success测试验证了空任务(没有执行命令的任务)是否被正确地标记为成功。这个测试创建了一个包含空任务的DAG,并运行调度器来处理这些任务。测试检查了任务实例的状态,以确保空任务在没有执行任何操作的情况下被标记为成功。这是重要的,因为它确保了即使是空任务也能正确地通过调度器的处理流程。


4005-4017: > 📝 NOTE

This review was outside the diff hunks, and no overlapping diff hunk was found. Original lines [4272-4372]

test_catchup_works_correctly测试验证了调度器的追赶(catchup)逻辑是否按预期工作。这个测试通过创建一个需要追赶的DAG,并运行调度器来处理过去的执行日期。测试检查了DAG运行的创建情况,以及调度器是否正确地更新了下一个DAG运行的计划时间。这有助于确保调度器能够正确地处理需要追赶的DAG运行,而不会错过任何计划的执行。


4005-4017: > 📝 NOTE

This review was outside the diff hunks, and no overlapping diff hunk was found. Original lines [4373-4445]

test_schedule_dag_run_with_upstream_skip测试验证了当一个任务的上游任务被跳过时,调度器是否正确地处理这种情况。这个测试创建了一个包含有依赖关系的任务的DAG,并将其中一个上游任务标记为跳过。测试检查了依赖于被跳过任务的下游任务的状态,以确保调度器正确地将这些任务也标记为跳过。这是重要的,因为它确保了任务的依赖关系在调度过程中得到了正确的处理。


4005-4017: > 📝 NOTE

This review was outside the diff hunks, and no overlapping diff hunk was found. Original lines [4446-4545]

TestSchedulerJobQueriesCount类包含了一系列测试,旨在检测调度器处理不同DAG文件时数据库查询数量的变化。这些测试通过模拟不同数量的DAG和任务,以及不同的调度间隔和结构,来评估调度器的性能。这些测试对于及时发现可能影响调度器性能的代码更改非常有用。

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Review Status

Configuration used: CodeRabbit UI

Commits Files that changed from the base of the PR and between a9e9b3f and a46302b.
Files selected for processing (1)
  • airflow/dag_processing/manager.py (1 hunks)
Additional Context Used
Additional comments not posted (1)
airflow/dag_processing/manager.py (1)

541-551: 在 _run_parsing_loop 方法中,移除了基于 _async_mode 的条件赋值,直接将 poll_time 设置为 None。这个更改意味着无论是否处于异步模式,都将采用阻塞等待的方式。这种更改可能会影响到调度器的性能和响应能力,特别是在处理大量 DAG 文件时。建议详细评估这一更改对调度器性能的影响,并考虑是否有其他方法可以解决原始问题,同时保持异步处理的能力。

@shinny-taojiachun shinny-taojiachun merged commit 720d2d0 into patch-2.4.0 Apr 2, 2024
2 checks passed
@shinny-taojiachun shinny-taojiachun deleted the fix-dag-zombie branch April 2, 2024 09:49
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants