Skip to content

Commit

Permalink
Add parameter for the last number of queries to the DB in DAG file pr…
Browse files Browse the repository at this point in the history
…ocessing stats
  • Loading branch information
MaksYermak committed Jun 20, 2024
1 parent 5221196 commit b306f36
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 18 deletions.
63 changes: 58 additions & 5 deletions airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ class DagFileStat(NamedTuple):
last_finish_time: datetime | None
last_duration: timedelta | None
run_count: int
last_num_of_db_queries: int


class DagParsingSignal(enum.Enum):
Expand Down Expand Up @@ -351,7 +352,12 @@ class DagFileProcessorManager(LoggingMixin):
"""

DEFAULT_FILE_STAT = DagFileStat(
num_dags=0, import_errors=0, last_finish_time=None, last_duration=None, run_count=0
num_dags=0,
import_errors=0,
last_finish_time=None,
last_duration=None,
run_count=0,
last_num_of_db_queries=0,
)

def __init__(
Expand Down Expand Up @@ -850,7 +856,18 @@ def _log_file_processing_stats(self, known_file_paths):
# Last Runtime: If the process ran before, how long did it take to
# finish in seconds
# Last Run: When the file finished processing in the previous run.
headers = ["File Path", "PID", "Runtime", "# DAGs", "# Errors", "Last Runtime", "Last Run"]
# Last # of DB Queries: The number of queries performed to the
# Airflow database during last parsing of the file.
headers = [
"File Path",
"PID",
"Runtime",
"# DAGs",
"# Errors",
"Last Runtime",
"Last Run",
"Last # of DB Queries",
]

rows = []
now = timezone.utcnow()
Expand All @@ -866,14 +883,35 @@ def _log_file_processing_stats(self, known_file_paths):
if last_run:
seconds_ago = (now - last_run).total_seconds()
Stats.gauge(f"dag_processing.last_run.seconds_ago.{file_name}", seconds_ago)
last_num_of_db_queries = self.get_last_num_of_db_queries(file_path)

rows.append((file_path, processor_pid, runtime, num_dags, num_errors, last_runtime, last_run))
rows.append(
(
file_path,
processor_pid,
runtime,
num_dags,
num_errors,
last_runtime,
last_run,
last_num_of_db_queries,
)
)

# Sort by longest last runtime. (Can't sort None values in python3)
rows.sort(key=lambda x: x[5] or 0.0, reverse=True)

formatted_rows = []
for file_path, pid, runtime, num_dags, num_errors, last_runtime, last_run in rows:
for (
file_path,
pid,
runtime,
num_dags,
num_errors,
last_runtime,
last_run,
last_num_of_db_queries,
) in rows:
formatted_rows.append(
(
file_path,
Expand All @@ -883,6 +921,7 @@ def _log_file_processing_stats(self, known_file_paths):
num_errors,
f"{last_runtime:.2f}s" if last_runtime else None,
last_run.strftime("%Y-%m-%dT%H:%M:%S") if last_run else None,
last_num_of_db_queries,
)
)
log_str = (
Expand Down Expand Up @@ -946,6 +985,17 @@ def get_last_error_count(self, file_path) -> int | None:
stat = self._file_stats.get(file_path)
return stat.import_errors if stat else None

def get_last_num_of_db_queries(self, file_path) -> int | None:
"""
Retrieve the number of queries performed to the Airflow database during last parsing of the file.
:param file_path: the path to the file that was processed
:return: the number of queries performed to the Airflow database during last parsing of the file,
or None if the file was never processed.
"""
stat = self._file_stats.get(file_path)
return stat.last_num_of_db_queries if stat else None

def get_last_finish_time(self, file_path) -> datetime | None:
"""
Retrieve the last completion time for processing a specific path.
Expand Down Expand Up @@ -1031,13 +1081,14 @@ def _collect_results_from_processor(self, processor) -> None:
last_finish_time = timezone.utcnow()

if processor.result is not None:
num_dags, count_import_errors = processor.result
num_dags, count_import_errors, last_num_of_db_queries = processor.result
else:
self.log.error(
"Processor for %s exited with return code %s.", processor.file_path, processor.exit_code
)
count_import_errors = -1
num_dags = 0
last_num_of_db_queries = 0

last_duration = last_finish_time - processor.start_time
stat = DagFileStat(
Expand All @@ -1046,6 +1097,7 @@ def _collect_results_from_processor(self, processor) -> None:
last_finish_time=last_finish_time,
last_duration=last_duration,
run_count=self.get_run_count(processor.file_path) + 1,
last_num_of_db_queries=last_num_of_db_queries,
)
self._file_stats[processor.file_path] = stat
file_name = Path(processor.file_path).stem
Expand Down Expand Up @@ -1242,6 +1294,7 @@ def _kill_timed_out_processors(self):
last_finish_time=now,
last_duration=duration,
run_count=self.get_run_count(file_path) + 1,
last_num_of_db_queries=0,
)
self._file_stats[processor.file_path] = stat

Expand Down
23 changes: 14 additions & 9 deletions airflow/dag_processing/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from typing import TYPE_CHECKING, Iterable, Iterator

from setproctitle import setproctitle
from sqlalchemy import delete, func, or_, select
from sqlalchemy import delete, event, func, or_, select

from airflow import settings
from airflow.api_internal.internal_api_call import internal_api_call
Expand Down Expand Up @@ -99,7 +99,7 @@ def __init__(
# The process that was launched to process the given .
self._process: multiprocessing.process.BaseProcess | None = None
# The result of DagFileProcessor.process_file(file_path).
self._result: tuple[int, int] | None = None
self._result: tuple[int, int, int] | None = None
# Whether the process is done running.
self._done = False
# When the process started.
Expand Down Expand Up @@ -162,7 +162,7 @@ def _handle_dag_file_processing():

log.info("Started process (PID=%s) to work on %s", os.getpid(), file_path)
dag_file_processor = DagFileProcessor(dag_ids=dag_ids, dag_directory=dag_directory, log=log)
result: tuple[int, int] = dag_file_processor.process_file(
result: tuple[int, int, int] = dag_file_processor.process_file(
file_path=file_path,
pickle_dags=pickle_dags,
callback_requests=callback_requests,
Expand Down Expand Up @@ -350,7 +350,7 @@ def done(self) -> bool:
return False

@property
def result(self) -> tuple[int, int] | None:
def result(self) -> tuple[int, int, int] | None:
"""Result of running ``DagFileProcessor.process_file()``."""
if not self.done:
raise AirflowException("Tried to get the result before it's done!")
Expand Down Expand Up @@ -415,6 +415,7 @@ def __init__(self, dag_ids: list[str] | None, dag_directory: str, log: logging.L
self._log = log
self._dag_directory = dag_directory
self.dag_warnings: set[tuple[str, str]] = set()
self._last_num_of_db_queries = 0

@classmethod
@internal_api_call
Expand Down Expand Up @@ -815,7 +816,7 @@ def process_file(
callback_requests: list[CallbackRequest],
pickle_dags: bool = False,
session: Session = NEW_SESSION,
) -> tuple[int, int]:
) -> tuple[int, int, int]:
"""
Process a Python file containing Airflow DAGs.
Expand All @@ -833,16 +834,20 @@ def process_file(
:param pickle_dags: whether serialize the DAGs found in the file and
save them to the db
:param session: Sqlalchemy ORM Session
:return: number of dags found, count of import errors
:return: number of dags found, count of import errors, last number of db queries
"""
self.log.info("Processing file %s for tasks to queue", file_path)

@event.listens_for(session, "do_orm_execute")
def _count_db_queries(orm_execute_state):
self._last_num_of_db_queries += 1

try:
dagbag = DagFileProcessor._get_dagbag(file_path)
except Exception:
self.log.exception("Failed at reloading the DAG file %s", file_path)
Stats.incr("dag_file_refresh_error", 1, 1, tags={"file_path": file_path})
return 0, 0
return 0, 0, self._last_num_of_db_queries

if dagbag.dags:
self.log.info("DAG(s) %s retrieved from %s", ", ".join(map(repr, dagbag.dags)), file_path)
Expand All @@ -859,7 +864,7 @@ def process_file(
# parse error we still need to progress the state of TIs,
# otherwise they might be stuck in queued/running for ever!
self.execute_callbacks_without_dag(callback_requests, session)
return 0, len(dagbag.import_errors)
return 0, len(dagbag.import_errors), self._last_num_of_db_queries

self.execute_callbacks(dagbag, callback_requests, session)
session.commit()
Expand Down Expand Up @@ -889,7 +894,7 @@ def process_file(
except Exception:
self.log.exception("Error logging DAG warnings.")

return len(dagbag.dags), len(dagbag.import_errors)
return len(dagbag.dags), len(dagbag.import_errors), self._last_num_of_db_queries

@staticmethod
@internal_api_call
Expand Down
8 changes: 5 additions & 3 deletions tests/dag_processing/test_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def __init__(self, file_path, pickle_dags, dag_ids, dag_directory, callbacks):
writable.send("abc")
writable.close()
self._waitable_handle = readable
self._result = 0, 0
self._result = 0, 0, 0

def start(self):
pass
Expand Down Expand Up @@ -270,7 +270,7 @@ def test_set_file_paths_when_processor_file_path_not_in_new_file_paths(self):
mock_processor.terminate.side_effect = None

manager.processor._processors["missing_file.txt"] = mock_processor
manager.processor._file_stats["missing_file.txt"] = DagFileStat(0, 0, None, None, 0)
manager.processor._file_stats["missing_file.txt"] = DagFileStat(0, 0, None, None, 0, 0)

manager.processor.set_file_paths(["abc.txt"])
assert manager.processor._processors == {}
Expand Down Expand Up @@ -549,7 +549,7 @@ def test_recently_modified_file_is_parsed_with_mtime_mode(
# let's say the DAG was just parsed 10 seconds before the Freezed time
last_finish_time = freezed_base_time - timedelta(seconds=10)
manager.processor._file_stats = {
"file_1.py": DagFileStat(1, 0, last_finish_time, timedelta(seconds=1.0), 1),
"file_1.py": DagFileStat(1, 0, last_finish_time, timedelta(seconds=1.0), 1, 1),
}
with time_machine.travel(freezed_base_time):
manager.processor.set_file_paths(dag_files)
Expand Down Expand Up @@ -651,6 +651,7 @@ def test_scan_stale_dags(self):
last_finish_time=timezone.utcnow() + timedelta(hours=1),
last_duration=1,
run_count=1,
last_num_of_db_queries=1,
)
manager.processor._file_paths = [test_dag_path]
manager.processor._file_stats[test_dag_path] = stat
Expand Down Expand Up @@ -733,6 +734,7 @@ def test_scan_stale_dags_standalone_mode(self):
last_finish_time=timezone.utcnow() + timedelta(hours=1),
last_duration=1,
run_count=1,
last_num_of_db_queries=1,
)
manager.processor._file_paths = [test_dag_path]
manager.processor._file_stats[test_dag_path] = stat
Expand Down
12 changes: 11 additions & 1 deletion tests/dag_processing/test_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import datetime
import os
import pathlib
import sys
from unittest import mock
from unittest.mock import MagicMock, patch
Expand Down Expand Up @@ -67,6 +68,7 @@
# Filename to be used for dags that are created in an ad-hoc manner and can be removed/
# created at runtime
TEMP_DAG_FILENAME = "temp_dag.py"
TEST_DAG_FOLDER = pathlib.Path(__file__).parents[1].resolve() / "dags"


@pytest.fixture(scope="class")
Expand Down Expand Up @@ -108,7 +110,7 @@ def _process_file(self, file_path, dag_directory, session):
dag_ids=[], dag_directory=str(dag_directory), log=mock.MagicMock()
)

dag_file_processor.process_file(file_path, [], False, session)
return dag_file_processor.process_file(file_path, [], False, session)

@mock.patch("airflow.dag_processing.processor.DagFileProcessor._get_dagbag")
def test_dag_file_processor_sla_miss_callback(self, mock_get_dagbag, create_dummy_dag, get_test_dag):
Expand Down Expand Up @@ -1008,6 +1010,14 @@ def test_nullbyte_exception_handling_when_preimporting_airflow(self, mock_contex
)
processor.start()

def test_counter_for_last_num_of_db_queries(self):
dag_filepath = TEST_DAG_FOLDER / "test_multiple_dags.py"

with create_session() as session:
_, _, last_num_of_db_queries = self._process_file(dag_filepath, TEST_DAG_FOLDER, session)

assert last_num_of_db_queries == 94


class TestProcessorAgent:
@pytest.fixture(autouse=True)
Expand Down
3 changes: 3 additions & 0 deletions tests/dags/test_multiple_dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,6 @@ def create_dag(suffix):

globals()["dag_1"] = create_dag("dag_1")
globals()["dag_2"] = create_dag("dag_2")
globals()["dag_3"] = create_dag("dag_3")
globals()["dag_4"] = create_dag("dag_4")
globals()["dag_5"] = create_dag("dag_5")

0 comments on commit b306f36

Please sign in to comment.