Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: DAGs are not marked as stale if the dags folder change #41433

Merged
merged 23 commits into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
74ce043
Fix: DAGs are not marked as stale if the AIRFLOW__CORE__DAGS_FOLDER c…
utkarsharma2 Aug 13, 2024
b881678
Merge branch 'main' into FixDagParsing
utkarsharma2 Aug 13, 2024
a41d411
Update airflow/dag_processing/manager.py
utkarsharma2 Aug 13, 2024
00f2b0a
Merge branch 'main' into FixDagParsing
utkarsharma2 Aug 20, 2024
40c6cb0
Merge branch 'main' into FixDagParsing
utkarsharma2 Aug 20, 2024
5e11d81
Add testcase
utkarsharma2 Aug 26, 2024
252e473
Add code comment
utkarsharma2 Aug 26, 2024
df185ba
Update code comment
utkarsharma2 Aug 26, 2024
9898b39
Merge branch 'main' into FixDagParsing
utkarsharma2 Aug 26, 2024
75c9d93
Update the logic for checking the current dag_directory
utkarsharma2 Aug 26, 2024
4b745b8
Update testcases
utkarsharma2 Aug 26, 2024
79cd22e
Remove unwanted code
utkarsharma2 Aug 26, 2024
da5ae39
Merge branch 'main' into FixDagParsing
utkarsharma2 Aug 26, 2024
055192a
Uncomment code
utkarsharma2 Aug 26, 2024
245bef8
Merge branch 'main' into FixDagParsing
utkarsharma2 Aug 26, 2024
c91be45
Add processor_subdir when creating processor_subdir
utkarsharma2 Aug 26, 2024
db2e6d5
Merge branch 'main' into FixDagParsing
utkarsharma2 Aug 26, 2024
b96a832
Fix test_retry_still_in_executor test
utkarsharma2 Aug 27, 2024
7d999cc
Merge branch 'main' into FixDagParsing
utkarsharma2 Aug 27, 2024
3545a7a
Remove config from test
utkarsharma2 Aug 28, 2024
3917819
Update airflow/dag_processing/manager.py
utkarsharma2 Aug 28, 2024
da42bab
Merge branch 'main' into FixDagParsing
utkarsharma2 Aug 28, 2024
fa17bc6
Update if condition for readability
utkarsharma2 Aug 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -526,14 +526,20 @@ def deactivate_stale_dags(
dags_parsed = session.execute(query)

for dag in dags_parsed:
# When the DAG processor runs as part of the scheduler, and the user changes the DAGs folder,
# DAGs from the previous DAGs folder will be marked as stale. Note that this change has no impact
# on standalone DAG processors.
dag_not_in_current_dag_folder = os.path.commonpath([dag.fileloc, dag_directory]) != dag_directory
# The largest valid difference between a DagFileStat's last_finished_time and a DAG's
# last_parsed_time is the processor_timeout. Longer than that indicates that the DAG is
# no longer present in the file. We have a stale_dag_threshold configured to prevent a
# significant delay in deactivation of stale dags when a large timeout is configured
if (
dag_removed_from_dag_folder_or_file = (
dag.fileloc in last_parsed
and (dag.last_parsed_time + timedelta(seconds=stale_dag_threshold)) < last_parsed[dag.fileloc]
):
)

if dag_not_in_current_dag_folder or dag_removed_from_dag_folder_or_file:
cls.logger().info("DAG %s is missing and will be deactivated.", dag.dag_id)
to_deactivate.add(dag.dag_id)

Expand Down
71 changes: 67 additions & 4 deletions tests/dag_processing/test_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import random
import socket
import sys
import tempfile
import textwrap
import threading
import time
Expand Down Expand Up @@ -638,7 +639,7 @@ def test_scan_stale_dags(self):
manager = DagProcessorJobRunner(
job=Job(),
processor=DagFileProcessorManager(
dag_directory="directory",
dag_directory=str(TEST_DAG_FOLDER),
utkarsharma2 marked this conversation as resolved.
Show resolved Hide resolved
max_runs=1,
processor_timeout=timedelta(minutes=10),
signal_conn=MagicMock(),
Expand Down Expand Up @@ -712,11 +713,11 @@ def test_scan_stale_dags_standalone_mode(self):
"""
Ensure only dags from current dag_directory are updated
"""
dag_directory = "directory"
dag_directory = str(TEST_DAG_FOLDER)
manager = DagProcessorJobRunner(
job=Job(),
processor=DagFileProcessorManager(
dag_directory=dag_directory,
dag_directory=TEST_DAG_FOLDER,
max_runs=1,
processor_timeout=timedelta(minutes=10),
signal_conn=MagicMock(),
Expand All @@ -740,7 +741,7 @@ def test_scan_stale_dags_standalone_mode(self):
# Add stale DAG to the DB
other_dag = other_dagbag.get_dag("test_start_date_scheduling")
other_dag.last_parsed_time = timezone.utcnow()
other_dag.sync_to_db(processor_subdir="other")
other_dag.sync_to_db(processor_subdir="/other")

# Add DAG to the file_parsing_stats
stat = DagFileStat(
Expand All @@ -762,6 +763,68 @@ def test_scan_stale_dags_standalone_mode(self):
active_dag_count = session.query(func.count(DagModel.dag_id)).filter(DagModel.is_active).scalar()
assert active_dag_count == 1

def test_scan_stale_dags_when_dag_folder_change(self):
"""
Ensure dags from old dag_folder is marked as stale when dag processor
is running as part of scheduler.
"""

def get_dag_string(filename) -> str:
return open(TEST_DAG_FOLDER / filename).read()

with tempfile.TemporaryDirectory() as tmpdir:
old_dag_home = tempfile.mkdtemp(dir=tmpdir)
old_dag_file = tempfile.NamedTemporaryFile(dir=old_dag_home, suffix=".py")
old_dag_file.write(get_dag_string("test_example_bash_operator.py").encode())
old_dag_file.flush()
new_dag_home = tempfile.mkdtemp(dir=tmpdir)
new_dag_file = tempfile.NamedTemporaryFile(dir=new_dag_home, suffix=".py")
new_dag_file.write(get_dag_string("test_scheduler_dags.py").encode())
new_dag_file.flush()

manager = DagProcessorJobRunner(
job=Job(),
processor=DagFileProcessorManager(
dag_directory=new_dag_home,
max_runs=1,
processor_timeout=timedelta(minutes=10),
signal_conn=MagicMock(),
dag_ids=[],
pickle_dags=False,
async_mode=True,
),
)

dagbag = DagBag(old_dag_file.name, read_dags_from_db=False)
other_dagbag = DagBag(new_dag_file.name, read_dags_from_db=False)

with create_session() as session:
# Add DAG from old dah home to the DB
dag = dagbag.get_dag("test_example_bash_operator")
dag.fileloc = old_dag_file.name
dag.last_parsed_time = timezone.utcnow()
dag.sync_to_db(processor_subdir=old_dag_home)

# Add DAG from new DAG home to the DB
other_dag = other_dagbag.get_dag("test_start_date_scheduling")
other_dag.fileloc = new_dag_file.name
other_dag.last_parsed_time = timezone.utcnow()
other_dag.sync_to_db(processor_subdir=new_dag_home)

manager.processor._file_paths = [new_dag_file]

active_dag_count = (
session.query(func.count(DagModel.dag_id)).filter(DagModel.is_active).scalar()
)
assert active_dag_count == 2

manager.processor._scan_stale_dags()

active_dag_count = (
session.query(func.count(DagModel.dag_id)).filter(DagModel.is_active).scalar()
)
assert active_dag_count == 1

@mock.patch(
"airflow.dag_processing.processor.DagFileProcessorProcess.waitable_handle", new_callable=PropertyMock
)
Expand Down
1 change: 1 addition & 0 deletions tests/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -3568,6 +3568,7 @@ def test_retry_still_in_executor(self, dag_maker):
dag_id="test_retry_still_in_executor",
schedule="@once",
session=session,
fileloc=os.devnull + "/test_retry_still_in_executor.py",
):
dag_task1 = BashOperator(
task_id="test_retry_handling_op",
Expand Down