Skip to content

Commit

Permalink
Revert "Fix: DAGs are not marked as stale if the dags folder change (#…
Browse files Browse the repository at this point in the history
…41433)" (#42197)

This reverts commit 9f30a41.
  • Loading branch information
utkarsharma2 authored Sep 12, 2024
1 parent a094f91 commit 3bc2d3a
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 77 deletions.
10 changes: 2 additions & 8 deletions airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -526,20 +526,14 @@ def deactivate_stale_dags(
dags_parsed = session.execute(query)

for dag in dags_parsed:
# When the DAG processor runs as part of the scheduler, and the user changes the DAGs folder,
# DAGs from the previous DAGs folder will be marked as stale. Note that this change has no impact
# on standalone DAG processors.
dag_not_in_current_dag_folder = os.path.commonpath([dag.fileloc, dag_directory]) != dag_directory
# The largest valid difference between a DagFileStat's last_finished_time and a DAG's
# last_parsed_time is the processor_timeout. Longer than that indicates that the DAG is
# no longer present in the file. We have a stale_dag_threshold configured to prevent a
# significant delay in deactivation of stale dags when a large timeout is configured
dag_removed_from_dag_folder_or_file = (
if (
dag.fileloc in last_parsed
and (dag.last_parsed_time + timedelta(seconds=stale_dag_threshold)) < last_parsed[dag.fileloc]
)

if dag_not_in_current_dag_folder or dag_removed_from_dag_folder_or_file:
):
cls.logger().info("DAG %s is missing and will be deactivated.", dag.dag_id)
to_deactivate.add(dag.dag_id)

Expand Down
72 changes: 4 additions & 68 deletions tests/dag_processing/test_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import random
import socket
import sys
import tempfile
import textwrap
import threading
import time
Expand Down Expand Up @@ -639,7 +638,7 @@ def test_scan_stale_dags(self):
manager = DagProcessorJobRunner(
job=Job(),
processor=DagFileProcessorManager(
dag_directory=str(TEST_DAG_FOLDER),
dag_directory="directory",
max_runs=1,
processor_timeout=timedelta(minutes=10),
signal_conn=MagicMock(),
Expand Down Expand Up @@ -713,11 +712,11 @@ def test_scan_stale_dags_standalone_mode(self):
"""
Ensure only dags from current dag_directory are updated
"""
dag_directory = str(TEST_DAG_FOLDER)
dag_directory = "directory"
manager = DagProcessorJobRunner(
job=Job(),
processor=DagFileProcessorManager(
dag_directory=TEST_DAG_FOLDER,
dag_directory=dag_directory,
max_runs=1,
processor_timeout=timedelta(minutes=10),
signal_conn=MagicMock(),
Expand All @@ -741,7 +740,7 @@ def test_scan_stale_dags_standalone_mode(self):
# Add stale DAG to the DB
other_dag = other_dagbag.get_dag("test_start_date_scheduling")
other_dag.last_parsed_time = timezone.utcnow()
other_dag.sync_to_db(processor_subdir="/other")
other_dag.sync_to_db(processor_subdir="other")

# Add DAG to the file_parsing_stats
stat = DagFileStat(
Expand All @@ -763,69 +762,6 @@ def test_scan_stale_dags_standalone_mode(self):
active_dag_count = session.query(func.count(DagModel.dag_id)).filter(DagModel.is_active).scalar()
assert active_dag_count == 1

@pytest.mark.skip_if_database_isolation_mode # Test is broken in db isolation mode
def test_scan_stale_dags_when_dag_folder_change(self):
"""
Ensure dags from old dag_folder is marked as stale when dag processor
is running as part of scheduler.
"""

def get_dag_string(filename) -> str:
return open(TEST_DAG_FOLDER / filename).read()

with tempfile.TemporaryDirectory() as tmpdir:
old_dag_home = tempfile.mkdtemp(dir=tmpdir)
old_dag_file = tempfile.NamedTemporaryFile(dir=old_dag_home, suffix=".py")
old_dag_file.write(get_dag_string("test_example_bash_operator.py").encode())
old_dag_file.flush()
new_dag_home = tempfile.mkdtemp(dir=tmpdir)
new_dag_file = tempfile.NamedTemporaryFile(dir=new_dag_home, suffix=".py")
new_dag_file.write(get_dag_string("test_scheduler_dags.py").encode())
new_dag_file.flush()

manager = DagProcessorJobRunner(
job=Job(),
processor=DagFileProcessorManager(
dag_directory=new_dag_home,
max_runs=1,
processor_timeout=timedelta(minutes=10),
signal_conn=MagicMock(),
dag_ids=[],
pickle_dags=False,
async_mode=True,
),
)

dagbag = DagBag(old_dag_file.name, read_dags_from_db=False)
other_dagbag = DagBag(new_dag_file.name, read_dags_from_db=False)

with create_session() as session:
# Add DAG from old dah home to the DB
dag = dagbag.get_dag("test_example_bash_operator")
dag.fileloc = old_dag_file.name
dag.last_parsed_time = timezone.utcnow()
dag.sync_to_db(processor_subdir=old_dag_home)

# Add DAG from new DAG home to the DB
other_dag = other_dagbag.get_dag("test_start_date_scheduling")
other_dag.fileloc = new_dag_file.name
other_dag.last_parsed_time = timezone.utcnow()
other_dag.sync_to_db(processor_subdir=new_dag_home)

manager.processor._file_paths = [new_dag_file]

active_dag_count = (
session.query(func.count(DagModel.dag_id)).filter(DagModel.is_active).scalar()
)
assert active_dag_count == 2

manager.processor._scan_stale_dags()

active_dag_count = (
session.query(func.count(DagModel.dag_id)).filter(DagModel.is_active).scalar()
)
assert active_dag_count == 1

@mock.patch(
"airflow.dag_processing.processor.DagFileProcessorProcess.waitable_handle", new_callable=PropertyMock
)
Expand Down
1 change: 0 additions & 1 deletion tests/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -3581,7 +3581,6 @@ def test_retry_still_in_executor(self, dag_maker):
dag_id="test_retry_still_in_executor",
schedule="@once",
session=session,
fileloc=os.devnull + "/test_retry_still_in_executor.py",
):
dag_task1 = BashOperator(
task_id="test_retry_handling_op",
Expand Down

0 comments on commit 3bc2d3a

Please sign in to comment.