From 74ce04362bd3a6324cdd1b7cf943ee3c0244de26 Mon Sep 17 00:00:00 2001 From: utkarsh sharma Date: Tue, 13 Aug 2024 19:26:53 +0530 Subject: [PATCH 01/14] Fix: DAGs are not marked as stale if the AIRFLOW__CORE__DAGS_FOLDER changes --- airflow/dag_processing/manager.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/airflow/dag_processing/manager.py b/airflow/dag_processing/manager.py index c03bc074d0abd7..ac73170fa59223 100644 --- a/airflow/dag_processing/manager.py +++ b/airflow/dag_processing/manager.py @@ -526,6 +526,8 @@ def deactivate_stale_dags( dags_parsed = session.execute(query) for dag in dags_parsed: + if not Path(dag.fileloc).is_relative_to(dag_directory): + to_deactivate.add(dag.dag_id) # The largest valid difference between a DagFileStat's last_finished_time and a DAG's # last_parsed_time is the processor_timeout. Longer than that indicates that the DAG is # no longer present in the file. We have a stale_dag_threshold configured to prevent a From a41d411dc706658caead03143e96cf239c2103f3 Mon Sep 17 00:00:00 2001 From: Utkarsh Sharma Date: Tue, 13 Aug 2024 19:31:41 +0530 Subject: [PATCH 02/14] Update airflow/dag_processing/manager.py --- airflow/dag_processing/manager.py | 1 + 1 file changed, 1 insertion(+) diff --git a/airflow/dag_processing/manager.py b/airflow/dag_processing/manager.py index ac73170fa59223..da31a3b0eeed44 100644 --- a/airflow/dag_processing/manager.py +++ b/airflow/dag_processing/manager.py @@ -528,6 +528,7 @@ def deactivate_stale_dags( for dag in dags_parsed: if not Path(dag.fileloc).is_relative_to(dag_directory): to_deactivate.add(dag.dag_id) + continue # The largest valid difference between a DagFileStat's last_finished_time and a DAG's # last_parsed_time is the processor_timeout. Longer than that indicates that the DAG is # no longer present in the file. We have a stale_dag_threshold configured to prevent a From 5e11d81504de379eeab423159224eb65bb09046e Mon Sep 17 00:00:00 2001 From: utkarsh sharma Date: Mon, 26 Aug 2024 17:53:29 +0530 Subject: [PATCH 03/14] Add testcase --- tests/dag_processing/test_job_runner.py | 90 +++++++++++++++++++++++++ 1 file changed, 90 insertions(+) diff --git a/tests/dag_processing/test_job_runner.py b/tests/dag_processing/test_job_runner.py index 8112b7222a6973..c9bf19d17c8fb1 100644 --- a/tests/dag_processing/test_job_runner.py +++ b/tests/dag_processing/test_job_runner.py @@ -26,6 +26,7 @@ import random import socket import sys +import tempfile import textwrap import threading import time @@ -762,6 +763,95 @@ def test_scan_stale_dags_standalone_mode(self): active_dag_count = session.query(func.count(DagModel.dag_id)).filter(DagModel.is_active).scalar() assert active_dag_count == 1 + @conf_vars( + { + ("scheduler", "standalone_dag_processor"): "False", + } + ) + def test_scan_stale_dags_when_dag_folder_change(self): + """ + Ensure dags from old dag_folder is marked as stale when dag processor + is running as part of scheduler. + """ + + def get_dag_string(dag_id) -> str: + return ( + f"from __future__ import annotations\n" + f"import datetime\n" + f"from airflow.models.dag import DAG\n" + f"from airflow.operators.empty import EmptyOperator\n" + f"with DAG(\n" + f' dag_id="{dag_id}",\n' + f" schedule=datetime.timedelta(hours=4),\n" + f" start_date=datetime.datetime(2021, 1, 1),\n" + f" catchup=False,\n" + f") as dag:\n" + f' task1 = EmptyOperator(task_id="task1")\n' + ) + + with tempfile.TemporaryDirectory() as tmpdir: + old_dag_home = tempfile.mkdtemp(dir=tmpdir) + old_dag_file = tempfile.NamedTemporaryFile(dir=old_dag_home, suffix=".py") + old_dag_file.write(get_dag_string("old_temp_dag").encode()) + old_dag_file.flush() + new_dag_home = tempfile.mkdtemp(dir=tmpdir) + new_dag_file = tempfile.NamedTemporaryFile(dir=new_dag_home, suffix=".py") + new_dag_file.write(get_dag_string("new_temp_dag").encode()) + new_dag_file.flush() + + manager = DagProcessorJobRunner( + job=Job(), + processor=DagFileProcessorManager( + dag_directory=new_dag_home, + max_runs=1, + processor_timeout=timedelta(minutes=10), + signal_conn=MagicMock(), + dag_ids=[], + pickle_dags=False, + async_mode=True, + ), + ) + + dagbag = DagBag(old_dag_file.name, read_dags_from_db=False) + other_dagbag = DagBag(new_dag_file.name, read_dags_from_db=False) + + with create_session() as session: + # Add stale DAG to the DB + dag = dagbag.get_dag("old_temp_dag") + dag.fileloc = old_dag_file.name + dag.last_parsed_time = timezone.utcnow() + dag.sync_to_db(processor_subdir=old_dag_home) + + # Add stale DAG to the DB + other_dag = other_dagbag.get_dag("new_temp_dag") + other_dag.fileloc = new_dag_file.name + other_dag.last_parsed_time = timezone.utcnow() + other_dag.sync_to_db(processor_subdir=new_dag_home) + + # Add DAG to the file_parsing_stats + stat = DagFileStat( + num_dags=1, + import_errors=0, + last_finish_time=timezone.utcnow() + timedelta(hours=1), + last_duration=1, + run_count=1, + last_num_of_db_queries=1, + ) + manager.processor._file_paths = [new_dag_file] + manager.processor._file_stats[new_dag_file] = stat + + active_dag_count = ( + session.query(func.count(DagModel.dag_id)).filter(DagModel.is_active).scalar() + ) + assert active_dag_count == 2 + + manager.processor._scan_stale_dags() + + active_dag_count = ( + session.query(func.count(DagModel.dag_id)).filter(DagModel.is_active).scalar() + ) + assert active_dag_count == 1 + @mock.patch( "airflow.dag_processing.processor.DagFileProcessorProcess.waitable_handle", new_callable=PropertyMock ) From 252e47358b80c48f2469c57cd7687e580052ee90 Mon Sep 17 00:00:00 2001 From: utkarsh sharma Date: Mon, 26 Aug 2024 18:00:56 +0530 Subject: [PATCH 04/14] Add code comment --- airflow/dag_processing/manager.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/airflow/dag_processing/manager.py b/airflow/dag_processing/manager.py index 83c4a536819ca9..b0e0f410deef79 100644 --- a/airflow/dag_processing/manager.py +++ b/airflow/dag_processing/manager.py @@ -526,6 +526,9 @@ def deactivate_stale_dags( dags_parsed = session.execute(query) for dag in dags_parsed: + # When the DAG processor runs as part of the scheduler, and the user changes the DAG home folder, + # DAGs from the previous DAG home will be marked as stale. Note that this change has no impact + # on standalone DAG processors. if not Path(dag.fileloc).is_relative_to(dag_directory): to_deactivate.add(dag.dag_id) continue From df185ba7f5fe2580b3f7c5d7734d206a0637a657 Mon Sep 17 00:00:00 2001 From: utkarsh sharma Date: Mon, 26 Aug 2024 18:02:59 +0530 Subject: [PATCH 05/14] Update code comment --- tests/dag_processing/test_job_runner.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/dag_processing/test_job_runner.py b/tests/dag_processing/test_job_runner.py index c9bf19d17c8fb1..4a0f12c7687f11 100644 --- a/tests/dag_processing/test_job_runner.py +++ b/tests/dag_processing/test_job_runner.py @@ -816,13 +816,13 @@ def get_dag_string(dag_id) -> str: other_dagbag = DagBag(new_dag_file.name, read_dags_from_db=False) with create_session() as session: - # Add stale DAG to the DB + # Add DAG from old dah home to the DB dag = dagbag.get_dag("old_temp_dag") dag.fileloc = old_dag_file.name dag.last_parsed_time = timezone.utcnow() dag.sync_to_db(processor_subdir=old_dag_home) - # Add stale DAG to the DB + # Add DAG from new DAG home to the DB other_dag = other_dagbag.get_dag("new_temp_dag") other_dag.fileloc = new_dag_file.name other_dag.last_parsed_time = timezone.utcnow() From 75c9d93920f68614ba3cc3b59629fe5d5c35c928 Mon Sep 17 00:00:00 2001 From: utkarsh sharma Date: Mon, 26 Aug 2024 18:47:57 +0530 Subject: [PATCH 06/14] Update the logic for checking the current dag_directory --- airflow/dag_processing/manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/dag_processing/manager.py b/airflow/dag_processing/manager.py index b0e0f410deef79..370bb655fcc602 100644 --- a/airflow/dag_processing/manager.py +++ b/airflow/dag_processing/manager.py @@ -529,7 +529,7 @@ def deactivate_stale_dags( # When the DAG processor runs as part of the scheduler, and the user changes the DAG home folder, # DAGs from the previous DAG home will be marked as stale. Note that this change has no impact # on standalone DAG processors. - if not Path(dag.fileloc).is_relative_to(dag_directory): + if os.path.commonpath([dag.fileloc, dag_directory]) != dag_directory: to_deactivate.add(dag.dag_id) continue # The largest valid difference between a DagFileStat's last_finished_time and a DAG's From 4b745b8ceb1571d3d78bf5ec753497aefad0c880 Mon Sep 17 00:00:00 2001 From: utkarsh sharma Date: Mon, 26 Aug 2024 20:24:57 +0530 Subject: [PATCH 07/14] Update testcases --- tests/dag_processing/test_job_runner.py | 32 ++++++++----------------- 1 file changed, 10 insertions(+), 22 deletions(-) diff --git a/tests/dag_processing/test_job_runner.py b/tests/dag_processing/test_job_runner.py index 4a0f12c7687f11..0a5e048f104aa0 100644 --- a/tests/dag_processing/test_job_runner.py +++ b/tests/dag_processing/test_job_runner.py @@ -639,7 +639,7 @@ def test_scan_stale_dags(self): manager = DagProcessorJobRunner( job=Job(), processor=DagFileProcessorManager( - dag_directory="directory", + dag_directory=str(TEST_DAG_FOLDER), max_runs=1, processor_timeout=timedelta(minutes=10), signal_conn=MagicMock(), @@ -713,11 +713,11 @@ def test_scan_stale_dags_standalone_mode(self): """ Ensure only dags from current dag_directory are updated """ - dag_directory = "directory" + dag_directory = str(TEST_DAG_FOLDER) manager = DagProcessorJobRunner( job=Job(), processor=DagFileProcessorManager( - dag_directory=dag_directory, + dag_directory=TEST_DAG_FOLDER, max_runs=1, processor_timeout=timedelta(minutes=10), signal_conn=MagicMock(), @@ -741,7 +741,7 @@ def test_scan_stale_dags_standalone_mode(self): # Add stale DAG to the DB other_dag = other_dagbag.get_dag("test_start_date_scheduling") other_dag.last_parsed_time = timezone.utcnow() - other_dag.sync_to_db(processor_subdir="other") + other_dag.sync_to_db(processor_subdir="/other") # Add DAG to the file_parsing_stats stat = DagFileStat( @@ -774,29 +774,17 @@ def test_scan_stale_dags_when_dag_folder_change(self): is running as part of scheduler. """ - def get_dag_string(dag_id) -> str: - return ( - f"from __future__ import annotations\n" - f"import datetime\n" - f"from airflow.models.dag import DAG\n" - f"from airflow.operators.empty import EmptyOperator\n" - f"with DAG(\n" - f' dag_id="{dag_id}",\n' - f" schedule=datetime.timedelta(hours=4),\n" - f" start_date=datetime.datetime(2021, 1, 1),\n" - f" catchup=False,\n" - f") as dag:\n" - f' task1 = EmptyOperator(task_id="task1")\n' - ) + def get_dag_string(filename) -> str: + return open(TEST_DAG_FOLDER / filename).read() with tempfile.TemporaryDirectory() as tmpdir: old_dag_home = tempfile.mkdtemp(dir=tmpdir) old_dag_file = tempfile.NamedTemporaryFile(dir=old_dag_home, suffix=".py") - old_dag_file.write(get_dag_string("old_temp_dag").encode()) + old_dag_file.write(get_dag_string("test_example_bash_operator.py").encode()) old_dag_file.flush() new_dag_home = tempfile.mkdtemp(dir=tmpdir) new_dag_file = tempfile.NamedTemporaryFile(dir=new_dag_home, suffix=".py") - new_dag_file.write(get_dag_string("new_temp_dag").encode()) + new_dag_file.write(get_dag_string("test_scheduler_dags.py").encode()) new_dag_file.flush() manager = DagProcessorJobRunner( @@ -817,13 +805,13 @@ def get_dag_string(dag_id) -> str: with create_session() as session: # Add DAG from old dah home to the DB - dag = dagbag.get_dag("old_temp_dag") + dag = dagbag.get_dag("test_example_bash_operator") dag.fileloc = old_dag_file.name dag.last_parsed_time = timezone.utcnow() dag.sync_to_db(processor_subdir=old_dag_home) # Add DAG from new DAG home to the DB - other_dag = other_dagbag.get_dag("new_temp_dag") + other_dag = other_dagbag.get_dag("test_start_date_scheduling") other_dag.fileloc = new_dag_file.name other_dag.last_parsed_time = timezone.utcnow() other_dag.sync_to_db(processor_subdir=new_dag_home) From 79cd22e0eb1beb0e61efdd690f006fb116c86d34 Mon Sep 17 00:00:00 2001 From: utkarsh sharma Date: Mon, 26 Aug 2024 20:29:01 +0530 Subject: [PATCH 08/14] Remove unwanted code --- tests/dag_processing/test_job_runner.py | 28 ++++++++----------------- 1 file changed, 9 insertions(+), 19 deletions(-) diff --git a/tests/dag_processing/test_job_runner.py b/tests/dag_processing/test_job_runner.py index 0a5e048f104aa0..e964ff54f61441 100644 --- a/tests/dag_processing/test_job_runner.py +++ b/tests/dag_processing/test_job_runner.py @@ -660,16 +660,16 @@ def test_scan_stale_dags(self): SerializedDagModel.write_dag(dag) # Add DAG to the file_parsing_stats - stat = DagFileStat( - num_dags=1, - import_errors=0, - last_finish_time=timezone.utcnow() + timedelta(hours=1), - last_duration=1, - run_count=1, - last_num_of_db_queries=1, - ) + # stat = DagFileStat( + # num_dags=1, + # import_errors=0, + # last_finish_time=timezone.utcnow() + timedelta(hours=1), + # last_duration=1, + # run_count=1, + # last_num_of_db_queries=1, + # ) manager.processor._file_paths = [test_dag_path] - manager.processor._file_stats[test_dag_path] = stat + # manager.processor._file_stats[test_dag_path] = stat active_dag_count = ( session.query(func.count(DagModel.dag_id)) @@ -816,17 +816,7 @@ def get_dag_string(filename) -> str: other_dag.last_parsed_time = timezone.utcnow() other_dag.sync_to_db(processor_subdir=new_dag_home) - # Add DAG to the file_parsing_stats - stat = DagFileStat( - num_dags=1, - import_errors=0, - last_finish_time=timezone.utcnow() + timedelta(hours=1), - last_duration=1, - run_count=1, - last_num_of_db_queries=1, - ) manager.processor._file_paths = [new_dag_file] - manager.processor._file_stats[new_dag_file] = stat active_dag_count = ( session.query(func.count(DagModel.dag_id)).filter(DagModel.is_active).scalar() From 055192afcd472fb088337e49be6fbbf62491741e Mon Sep 17 00:00:00 2001 From: utkarsh sharma Date: Mon, 26 Aug 2024 20:38:30 +0530 Subject: [PATCH 09/14] Uncomment code --- tests/dag_processing/test_job_runner.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/tests/dag_processing/test_job_runner.py b/tests/dag_processing/test_job_runner.py index e964ff54f61441..acdee6b437007a 100644 --- a/tests/dag_processing/test_job_runner.py +++ b/tests/dag_processing/test_job_runner.py @@ -660,16 +660,16 @@ def test_scan_stale_dags(self): SerializedDagModel.write_dag(dag) # Add DAG to the file_parsing_stats - # stat = DagFileStat( - # num_dags=1, - # import_errors=0, - # last_finish_time=timezone.utcnow() + timedelta(hours=1), - # last_duration=1, - # run_count=1, - # last_num_of_db_queries=1, - # ) + stat = DagFileStat( + num_dags=1, + import_errors=0, + last_finish_time=timezone.utcnow() + timedelta(hours=1), + last_duration=1, + run_count=1, + last_num_of_db_queries=1, + ) manager.processor._file_paths = [test_dag_path] - # manager.processor._file_stats[test_dag_path] = stat + manager.processor._file_stats[test_dag_path] = stat active_dag_count = ( session.query(func.count(DagModel.dag_id)) From c91be459f3260d56627ab83e8d6719b1a5e31ec6 Mon Sep 17 00:00:00 2001 From: utkarsh sharma Date: Mon, 26 Aug 2024 23:16:52 +0530 Subject: [PATCH 10/14] Add processor_subdir when creating processor_subdir --- tests/jobs/test_scheduler_job.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index 961ac8122d8ca7..71ebdd31b769b8 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -3568,6 +3568,7 @@ def test_retry_still_in_executor(self, dag_maker): dag_id="test_retry_still_in_executor", schedule="@once", session=session, + processor_subdir=os.devnull, ): dag_task1 = BashOperator( task_id="test_retry_handling_op", From b96a8325cac7e0544c57ade207aa4857adee07f0 Mon Sep 17 00:00:00 2001 From: utkarsh sharma Date: Tue, 27 Aug 2024 18:08:48 +0530 Subject: [PATCH 11/14] Fix test_retry_still_in_executor test --- tests/jobs/test_scheduler_job.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index 71ebdd31b769b8..14f5fef92d085e 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -3568,7 +3568,7 @@ def test_retry_still_in_executor(self, dag_maker): dag_id="test_retry_still_in_executor", schedule="@once", session=session, - processor_subdir=os.devnull, + fileloc=os.devnull + "/test_retry_still_in_executor.py", ): dag_task1 = BashOperator( task_id="test_retry_handling_op", From 3545a7a921709c0dc8b867416e1703eb1835424e Mon Sep 17 00:00:00 2001 From: utkarsh sharma Date: Wed, 28 Aug 2024 11:54:20 +0530 Subject: [PATCH 12/14] Remove config from test --- tests/dag_processing/test_job_runner.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/tests/dag_processing/test_job_runner.py b/tests/dag_processing/test_job_runner.py index acdee6b437007a..b5d0b35580b4aa 100644 --- a/tests/dag_processing/test_job_runner.py +++ b/tests/dag_processing/test_job_runner.py @@ -763,11 +763,6 @@ def test_scan_stale_dags_standalone_mode(self): active_dag_count = session.query(func.count(DagModel.dag_id)).filter(DagModel.is_active).scalar() assert active_dag_count == 1 - @conf_vars( - { - ("scheduler", "standalone_dag_processor"): "False", - } - ) def test_scan_stale_dags_when_dag_folder_change(self): """ Ensure dags from old dag_folder is marked as stale when dag processor From 39178198178faf9f24d209f90db82aa84c0e06d5 Mon Sep 17 00:00:00 2001 From: Utkarsh Sharma Date: Wed, 28 Aug 2024 11:56:39 +0530 Subject: [PATCH 13/14] Update airflow/dag_processing/manager.py Co-authored-by: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com> --- airflow/dag_processing/manager.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow/dag_processing/manager.py b/airflow/dag_processing/manager.py index 370bb655fcc602..f1bb731ed3841b 100644 --- a/airflow/dag_processing/manager.py +++ b/airflow/dag_processing/manager.py @@ -526,8 +526,8 @@ def deactivate_stale_dags( dags_parsed = session.execute(query) for dag in dags_parsed: - # When the DAG processor runs as part of the scheduler, and the user changes the DAG home folder, - # DAGs from the previous DAG home will be marked as stale. Note that this change has no impact + # When the DAG processor runs as part of the scheduler, and the user changes the DAGs folder, + # DAGs from the previous DAGs folder will be marked as stale. Note that this change has no impact # on standalone DAG processors. if os.path.commonpath([dag.fileloc, dag_directory]) != dag_directory: to_deactivate.add(dag.dag_id) From fa17bc6e97365e43726ac7e3f507e495ad0ce716 Mon Sep 17 00:00:00 2001 From: utkarsh sharma Date: Wed, 28 Aug 2024 12:46:08 +0530 Subject: [PATCH 14/14] Update if condition for readability --- airflow/dag_processing/manager.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/airflow/dag_processing/manager.py b/airflow/dag_processing/manager.py index f1bb731ed3841b..37b4323ac4d9e1 100644 --- a/airflow/dag_processing/manager.py +++ b/airflow/dag_processing/manager.py @@ -529,17 +529,17 @@ def deactivate_stale_dags( # When the DAG processor runs as part of the scheduler, and the user changes the DAGs folder, # DAGs from the previous DAGs folder will be marked as stale. Note that this change has no impact # on standalone DAG processors. - if os.path.commonpath([dag.fileloc, dag_directory]) != dag_directory: - to_deactivate.add(dag.dag_id) - continue + dag_not_in_current_dag_folder = os.path.commonpath([dag.fileloc, dag_directory]) != dag_directory # The largest valid difference between a DagFileStat's last_finished_time and a DAG's # last_parsed_time is the processor_timeout. Longer than that indicates that the DAG is # no longer present in the file. We have a stale_dag_threshold configured to prevent a # significant delay in deactivation of stale dags when a large timeout is configured - if ( + dag_removed_from_dag_folder_or_file = ( dag.fileloc in last_parsed and (dag.last_parsed_time + timedelta(seconds=stale_dag_threshold)) < last_parsed[dag.fileloc] - ): + ) + + if dag_not_in_current_dag_folder or dag_removed_from_dag_folder_or_file: cls.logger().info("DAG %s is missing and will be deactivated.", dag.dag_id) to_deactivate.add(dag.dag_id)