Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cleanup functions to fix recursive loop and no such file raised by broken symlinks #1115

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 95 additions & 0 deletions cosmos/cleanup.py
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add some tests for this module or exclude from codecov for the time being?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It sounds like an operator or decorator. I'm ok with keeping it in the project's root path, but what are your thoughts on placing it in the operator directory instead?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The challenge is that while identify_broken_symbolic_links could be exposed as an operator, ideally, users would check this before it is deployed to a remote instance of Airflow. Additionally, identify_recursive_loops cannot be exposed as an operator since it would be useless (the exception is raised at Airflow DAG parsing). Therefore, we'd need a non-operator place to expose the second use case, so it felt we could benefit from having both methods together - and it'd be up to the user on how to use them.

Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
"""
Use this script locally to identify broken symbolic links or recursive loops locally:
$ python -m cosmos.cleanup -p <dir-path>
To delete the issues identified, run:
$ python -m cosmos.cleanup -p <dir-path> -d
"""

import argparse
import logging
import os
from pathlib import Path

logger = logging.getLogger(__name__)


def identify_broken_symbolic_links(dir_path: str, should_delete: bool = False) -> None:
"""
Given a directory, recursively inspect it in search for symbolic links.
If should_delete is set to True, delete the symbolic links identified.
:param dir_path: Path to the directory to be analysed
:param should_delete: Users should set to True if they want the method to not only identify but also delete these links.
"""
logger.info(f"Inspecting the directory {dir_path} for broken symbolic links.")
filepaths = []
broken_symlinks_count = 0
deleted_symlinks_count = 0
for root_dir, dirs, files in os.walk(dir_path):
paths = [os.path.join(root_dir, filepath) for filepath in files]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we check here if the filepath is a symlink?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was being checked after, but it would be a good improvement!

filepaths.extend(paths)

for filepath in filepaths:
try:
os.stat(filepath)
except OSError:
broken_symlinks_count += 1
logger.warning(f"The folder {dir_path} contains a symbolic link to a non-existent file: {filepath}")
if should_delete:
logger.info(f"Deleting the invalid symbolic link: {filepath}")
os.unlink(filepath)
deleted_symlinks_count += 1

Check warning on line 42 in cosmos/cleanup.py

View check run for this annotation

Codecov / codecov/patch

cosmos/cleanup.py#L36-L42

Added lines #L36 - L42 were not covered by tests

logger.info(
f"After inspecting {dir_path}, identified {broken_symlinks_count} broken links and deleted {deleted_symlinks_count} of them."
)


# Airflow DAG parsing fails if recursive loops are found, so this method cannot be used from within an Airflow task
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also add an example here on how to call this and where to call this like mentioned in the PR description.

Or should we also create a public docs page listing the steps that we can share with users?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a short docs will be great and also we could render example in the docs

def identify_recursive_loops(original_dir_path: str, should_delete: bool = False) -> None:
"""
Given a directory, recursively inspect it in search for recursive loops.
If should_delete is set to True, delete the (symbolic links) recursive loops identified.
:param dir_path: Path to the directory to be analysed
:param should_delete: Users should set to True if they want the method to not only identify but also delete these loops.
"""
logger.info(f"Inspecting the directory {original_dir_path} for recursive loops.")
dirs_paths = []
broken_symlinks_count = 0
deleted_symlinks_count = 0

Check warning on line 61 in cosmos/cleanup.py

View check run for this annotation

Codecov / codecov/patch

cosmos/cleanup.py#L58-L61

Added lines #L58 - L61 were not covered by tests

dir_path = Path(original_dir_path).absolute()

Check warning on line 63 in cosmos/cleanup.py

View check run for this annotation

Codecov / codecov/patch

cosmos/cleanup.py#L63

Added line #L63 was not covered by tests

for root_dir, dirs, files in os.walk(dir_path):
paths = [os.path.join(root_dir, dir_name) for dir_name in dirs]
dirs_paths.extend(paths)

Check warning on line 67 in cosmos/cleanup.py

View check run for this annotation

Codecov / codecov/patch

cosmos/cleanup.py#L65-L67

Added lines #L65 - L67 were not covered by tests

for subdir_path in dirs_paths:
if os.path.islink(subdir_path):
symlink_target_path = os.path.realpath(subdir_path)
if Path(symlink_target_path) in Path(subdir_path).parents:
logger.warning(f"Detected recursive loop from {subdir_path} to {symlink_target_path}")
broken_symlinks_count += 1
if should_delete:
logger.info(f"Deleting symbolic link: {subdir_path}")
os.unlink(subdir_path)
deleted_symlinks_count += 1

Check warning on line 78 in cosmos/cleanup.py

View check run for this annotation

Codecov / codecov/patch

cosmos/cleanup.py#L69-L78

Added lines #L69 - L78 were not covered by tests

logger.info(

Check warning on line 80 in cosmos/cleanup.py

View check run for this annotation

Codecov / codecov/patch

cosmos/cleanup.py#L80

Added line #L80 was not covered by tests
f"After inspecting {dir_path}, identified {broken_symlinks_count} recursive loops and deleted {deleted_symlinks_count} of them."
)


if __name__ == "__main__":
parser = argparse.ArgumentParser(

Check warning on line 86 in cosmos/cleanup.py

View check run for this annotation

Codecov / codecov/patch

cosmos/cleanup.py#L86

Added line #L86 was not covered by tests
description="Clean up local directory from broken symbolic links and recursive loops."
)
parser.add_argument("-p", "--dir-path", help="Path to directory to be inspected", required=True)
parser.add_argument(

Check warning on line 90 in cosmos/cleanup.py

View check run for this annotation

Codecov / codecov/patch

cosmos/cleanup.py#L89-L90

Added lines #L89 - L90 were not covered by tests
"-d", "--delete", help="Delete problems found", action="store_true", required=False, default=False
)
args = parser.parse_args()
identify_recursive_loops(args.dir_path, args.delete)
identify_broken_symbolic_links(args.dir_path, args.delete)

Check warning on line 95 in cosmos/cleanup.py

View check run for this annotation

Codecov / codecov/patch

cosmos/cleanup.py#L93-L95

Added lines #L93 - L95 were not covered by tests
38 changes: 38 additions & 0 deletions dev/dags/example_cosmos_cleanup_dir_dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
"""
We've observed users who had dbt project directories containing symbolic links to files that no longer existed.

Although this issue was not created by Cosmos itself, since this issue was already observed by two users, we thought it
was useful to give an example DAG illustrating how to clean the problematic directories.

Assuming the cause of the issue no longer exists, this DAG can be run only once.
"""

# [START dirty_dir_example]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be great if we could use tag dirty_dir_example to render this example in docs

from datetime import datetime
from pathlib import Path

from airflow.decorators import dag, task

from cosmos.cleanup import identify_broken_symbolic_links

dbt_project_folder = Path(__file__).parent / "dbt"


@dag(
schedule_interval="0 0 * * 0", # Runs every Sunday
start_date=datetime(2023, 1, 1),
catchup=False,
tags=["example"],
)
def example_cosmos_cleanup_dir_dag():

@task()
def clear_broken_symlinks(session=None):
identify_broken_symbolic_links(dir_path=dbt_project_folder, should_delete=True)

clear_broken_symlinks()


# [END dirty_dir_example]

example_cosmos_cleanup_dir_dag()
Loading