Skip to content

Commit

Permalink
Fix subtle bug in mocking processor_agent in our tests (#35221)
Browse files Browse the repository at this point in the history
Some of the scheduler tests tried to prevent DAG processor processing DAGs
from "tests/dags" directory by setting processor_agent to Mock object:

```python
   self.job_runner.processor_agent = mock.MagicMock()
```

This, in connection with scheduler job cleaning all the tables and
approach similar to:

```python
        dag = self.dagbag.get_dag("test_retry_handling_job")
        dag_task1 = dag.get_task("test_retry_handling_op")
        dag.clear()
        dag.sync_to_db()
```

Allowed the test to run in isolated space where only one or few
DAGS were present in the DB.

This probably worked perfectly in the past, but after some changes
in how DAGFileProcessor works this did not prevent DAGFileProcessor
from running when _execute method in scheduler_job_runner has been
executed, and standalone dag processor was not running, the
processor_agent has been overwritten by a new DagFileProcessor
in the `_execute` method of scheduler_job_runner.

```python
        if not self._standalone_dag_processor:
            self.processor_agent = DagFileProcessorAgent(
                dag_directory=Path(self.subdir),
                max_runs=self.num_times_parse_dags,
                processor_timeout=processor_timeout,
                dag_ids=[],
                pickle_dags=pickle_dags,
                async_mode=async_mode,
            )
```

This led to a very subtle race condition which was more likely on
machines with multiple cores/faster disk (so for example it
led to #35204 which appeared on self-hosted (8 core) runners and
did not appear on Public (2-core runners) or it could appear on
an 8 core ARM Mac but not appear on 6 core Intel Mac (only on
sqlite)

If the DAGFileProcessor managed to start and spawn some
parsing processes and grab the DB write access for sqlite and those
processes managed to parse some of the DAG files from tests/dags/
folder, those DAGs could have polutted the DAGs in the DB - leading
to undesired effects (for example with test hanging while the
scheduler job run attempted to process an unwanted subdag and
got deadlocked in case of #35204.

The solution to that is to only set the processor_agent if not
set already. This can only happen in unit tests when the
`processor_agent` sets it to Mock object. For "production" the
agent is only set once in the `_execute` methods so there is no
risk involved in checking if it is not set already.

Fixes: #35204
(cherry picked from commit 6f3d294)
  • Loading branch information
potiuk authored and ephraimbuddy committed Oct 30, 2023
1 parent 719de69 commit 49ecb5a
Showing 1 changed file with 1 addition and 1 deletion.
2 changes: 1 addition & 1 deletion airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -809,7 +809,7 @@ def _execute(self) -> int | None:

processor_timeout_seconds: int = conf.getint("core", "dag_file_processor_timeout")
processor_timeout = timedelta(seconds=processor_timeout_seconds)
if not self._standalone_dag_processor:
if not self._standalone_dag_processor and not self.processor_agent:
self.processor_agent = DagFileProcessorAgent(
dag_directory=Path(self.subdir),
max_runs=self.num_times_parse_dags,
Expand Down

0 comments on commit 49ecb5a

Please sign in to comment.