Skip to content

Commit

Permalink
Add script to identify recursive loops
Browse files Browse the repository at this point in the history
  • Loading branch information
tatiana committed Jul 23, 2024
1 parent bff1a96 commit f282a6b
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 39 deletions.
95 changes: 95 additions & 0 deletions cosmos/cleanup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
"""
Use this script locally to identify broken symbolic links or recursive loops locally:
$ python -m cosmos.cleanup -p <dir-path>
To delete the issues identified, run:
$ python -m cosmos.cleanup -p <dir-path> -d
"""

import argparse
import logging
import os
from pathlib import Path

logger = logging.getLogger(__name__)


def identify_broken_symbolic_links(dir_path: str, should_delete: bool = False) -> None:
"""
Given a directory, recursively inspect it in search for symbolic links.
If should_delete is set to True, delete the symbolic links identified.
:param dir_path: Path to the directory to be analysed
:param should_delete: Users should set to True if they want the method to not only identify but also delete these links.
"""
logger.info(f"Inspecting the directory {dir_path} for broken symbolic links.")
filepaths = []
broken_symlinks_count = 0
deleted_symlinks_count = 0
for root_dir, dirs, files in os.walk(dir_path):
paths = [os.path.join(root_dir, filepath) for filepath in files]
filepaths.extend(paths)

for filepath in filepaths:
try:
os.stat(filepath)
except OSError:
broken_symlinks_count += 1
logger.warning(f"The folder {dir_path} contains a symbolic link to a non-existent file: {filepath}")
if should_delete:
logger.info(f"Deleting the invalid symbolic link: {filepath}")
os.unlink(filepath)
deleted_symlinks_count += 1

Check warning on line 42 in cosmos/cleanup.py

View check run for this annotation

Codecov / codecov/patch

cosmos/cleanup.py#L36-L42

Added lines #L36 - L42 were not covered by tests

logger.info(
f"After inspecting {dir_path}, identified {broken_symlinks_count} broken links and deleted {deleted_symlinks_count} of them."
)


# Airflow DAG parsing fails if recursive loops are found, so this method cannot be used from within an Airflow task
def identify_recursive_loops(original_dir_path: str, should_delete: bool = False) -> None:
"""
Given a directory, recursively inspect it in search for recursive loops.
If should_delete is set to True, delete the (symbolic links) recursive loops identified.
:param dir_path: Path to the directory to be analysed
:param should_delete: Users should set to True if they want the method to not only identify but also delete these loops.
"""
logger.info(f"Inspecting the directory {original_dir_path} for recursive loops.")
dirs_paths = []
broken_symlinks_count = 0
deleted_symlinks_count = 0

Check warning on line 61 in cosmos/cleanup.py

View check run for this annotation

Codecov / codecov/patch

cosmos/cleanup.py#L58-L61

Added lines #L58 - L61 were not covered by tests

dir_path = Path(original_dir_path).absolute()

Check warning on line 63 in cosmos/cleanup.py

View check run for this annotation

Codecov / codecov/patch

cosmos/cleanup.py#L63

Added line #L63 was not covered by tests

for root_dir, dirs, files in os.walk(dir_path):
paths = [os.path.join(root_dir, dir_name) for dir_name in dirs]
dirs_paths.extend(paths)

Check warning on line 67 in cosmos/cleanup.py

View check run for this annotation

Codecov / codecov/patch

cosmos/cleanup.py#L65-L67

Added lines #L65 - L67 were not covered by tests

for subdir_path in dirs_paths:
if os.path.islink(subdir_path):
symlink_target_path = os.path.realpath(subdir_path)
if Path(symlink_target_path) in Path(subdir_path).parents:
logger.warning(f"Detected recursive loop from {subdir_path} to {symlink_target_path}")
broken_symlinks_count += 1
if should_delete:
logger.info(f"Deleting symbolic link: {subdir_path}")
os.unlink(subdir_path)
deleted_symlinks_count += 1

Check warning on line 78 in cosmos/cleanup.py

View check run for this annotation

Codecov / codecov/patch

cosmos/cleanup.py#L69-L78

Added lines #L69 - L78 were not covered by tests

logger.info(

Check warning on line 80 in cosmos/cleanup.py

View check run for this annotation

Codecov / codecov/patch

cosmos/cleanup.py#L80

Added line #L80 was not covered by tests
f"After inspecting {dir_path}, identified {broken_symlinks_count} recursive loops and deleted {deleted_symlinks_count} of them."
)


if __name__ == "__main__":
parser = argparse.ArgumentParser(

Check warning on line 86 in cosmos/cleanup.py

View check run for this annotation

Codecov / codecov/patch

cosmos/cleanup.py#L86

Added line #L86 was not covered by tests
description="Clean up local directory from broken symbolic links and recursive loops."
)
parser.add_argument("-p", "--dir-path", help="Path to directory to be inspected", required=True)
parser.add_argument(

Check warning on line 90 in cosmos/cleanup.py

View check run for this annotation

Codecov / codecov/patch

cosmos/cleanup.py#L89-L90

Added lines #L89 - L90 were not covered by tests
"-d", "--delete", help="Delete problems found", action="store_true", required=False, default=False
)
args = parser.parse_args()
identify_recursive_loops(args.dir_path, args.delete)
identify_broken_symbolic_links(args.dir_path, args.delete)

Check warning on line 95 in cosmos/cleanup.py

View check run for this annotation

Codecov / codecov/patch

cosmos/cleanup.py#L93-L95

Added lines #L93 - L95 were not covered by tests
44 changes: 5 additions & 39 deletions dev/dags/example_cosmos_cleanup_dir_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,47 +8,14 @@
"""

# [START dirty_dir_example]
import logging
import os
from datetime import datetime
from pathlib import Path

from airflow.decorators import dag, task

logger = logging.getLogger(__name__)
dbt_project_folder = Path(__file__).parent / "dbt"


def identify_broken_symbolic_links(dir_path: str, should_delete: bool = False) -> None:
"""
Given a directory, recursively inspect it in search for symbolic links.
If should_delete is set to True, delete the symbolic links identified.
from cosmos.cleanup import identify_broken_symbolic_links

:param dir_path: Path to the directory to be analysed
:param should_delete: Users should set to True if they want the method to not only identify but also delete these links.
"""
logger.info(f"Inspecting the directory {dir_path} for broken symbolic links.")
filepaths = []
broken_links_count = 0
deleted_links_count = 0
for root_dir, dirs, files in os.walk(dir_path):
paths = [os.path.join(root_dir, filepath) for filepath in files]
filepaths.extend(paths)

for filepath in filepaths:
try:
os.stat(filepath)
except OSError:
broken_links_count += 1
logger.warning(f"The folder {dir_path} contains a symbolic link to a non-existent file: {filepath}")
if should_delete:
logger.info(f"Deleting the invalid symbolic link: {filepath}")
os.unlink(filepath)
deleted_links_count += 1

logger.info(
f"After inspecting {dir_path}, identified {broken_links_count} broken links and deleted {deleted_links_count} of them."
)
dbt_project_folder = Path(__file__).parent / "dbt"


@dag(
Expand All @@ -60,11 +27,10 @@ def identify_broken_symbolic_links(dir_path: str, should_delete: bool = False) -
def example_cosmos_cleanup_dir_dag():

@task()
def clear_dbt_project_dir(dir_path: str, should_delete: bool, session=None):
""" """
identify_broken_symbolic_links(dir_path, should_delete)
def clear_broken_symlinks(session=None):
identify_broken_symbolic_links(dir_path=dbt_project_folder, should_delete=True)

clear_dbt_project_dir(dir_path=dbt_project_folder, should_delete=True)
clear_broken_symlinks()


# [END dirty_dir_example]
Expand Down

0 comments on commit f282a6b

Please sign in to comment.