Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIRFLOW-5274] dag loading duration metric name too long #5890

Merged
merged 1 commit into from
Aug 26, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions UPDATING.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ assists users migrating to a new version.
<!-- END doctoc generated TOC please keep comment here to allow auto update -->
## Airflow Master

### Change dag loading duration metric name
Change DAG file loading duration metric from
`dag.loading-duration.<dag_id>` to `dag.loading-duration.<dag_file>`. This is to
better handle the case when a DAG file has multiple DAGs.

### Changes to ImapHook, ImapAttachmentSensor and ImapAttachmentToS3Operator

ImapHook:
Expand Down
15 changes: 5 additions & 10 deletions airflow/models/dagbag.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,8 +374,6 @@ def collect_dags(

dag_folder = correct_maybe_zipped(dag_folder)

dags_by_name = {}

for filepath in list_py_file_paths(dag_folder, safe_mode=safe_mode,
include_examples=include_examples):
try:
Expand All @@ -389,7 +387,6 @@ def collect_dags(
td = timezone.utcnow() - ts
td = td.total_seconds() + (
float(td.microseconds) / 1000000)
dags_by_name[dag_id_names] = dag_ids
stats.append(FileLoadStat(
filepath.replace(dag_folder, ''),
td,
Expand All @@ -408,13 +405,11 @@ def collect_dags(
self.dagbag_stats = sorted(
stats, key=lambda x: x.duration, reverse=True)
for file_stat in self.dagbag_stats:
dag_ids = dags_by_name[file_stat.dags]
if file_stat.dag_num >= 1:
# if we found multiple dags per file, the stat is 'dag_id1 _ dag_id2'
dag_names = '_'.join(dag_ids)
Stats.timing('dag.loading-duration.{}'.
format(dag_names),
file_stat.duration)
# file_stat.file similar format: /subdir/dag_name.py
filename = file_stat.file.split('/')[-1].replace('.py', '')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's possible to have the same file name in different folders. please can we join the split, instead of just taking the filename? eg.

filename = '.'.join(file_stat.file.split('/')).replace('.py', '')

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be a bad practice to have the same file name for different DAGs. If you think this would be an issue on your side, feel free to raise a pr.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Besides if we join all the subdir, it will make those stats unreadable if the dir for the dag file is very deep. Personally I am not favor of this. Not sure how other committer thinks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i was imagining a file structure like the for example. it doesn't seem too unreasonable a structure?

dag_dir
   +->dag_module_1
   |       +->dag_definition.py
   |       |->callbacks.py
   |       |->functions.py
   |       |->config.json
   +->dag_module_2
           +->dag_definition.py
           |->callbacks.py
           |->functions.py
           |->config.json

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but yea, we dont use that dir structure, so im happy with the merge :)

Stats.timing('dag.loading-duration.{}'.
format(filename),
file_stat.duration)

def dagbag_report(self):
"""Prints a report around DagBag loading stats"""
Expand Down
2 changes: 1 addition & 1 deletion docs/metrics.rst
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ Name Description
================================= =================================================
dagrun.dependency-check.<dag_id> Seconds taken to check DAG dependencies
dag.<dag_id>.<task_id>.duration Seconds taken to finish a task
dag.loading-duration.<dag_id> Seconds taken to load the given DAG
dag.loading-duration.<dag_file> Seconds taken to load the given DAG file
dagrun.duration.success.<dag_id> Seconds taken for a DagRun to reach success state
dagrun.duration.failed.<dag_id> Seconds taken for a DagRun to reach failed state
dagrun.schedule_delay.<dag_id> Seconds of delay between the scheduled DagRun
Expand Down