diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 372d108b0..74ee7bdec 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,6 +1,14 @@ Changelog ========= +1.6.0a3 (2024-07-23) +------------------- + +Features + +* Cleanup functions + + 1.5.1 (2024-07-17) ------------------ diff --git a/cosmos/__init__.py b/cosmos/__init__.py index 1a00036f1..0c4d27a40 100644 --- a/cosmos/__init__.py +++ b/cosmos/__init__.py @@ -5,8 +5,7 @@ Contains dags, task groups, and operators. """ -__version__ = "1.5.1" - +__version__ = "1.6.0a3" from cosmos.airflow.dag import DbtDag from cosmos.airflow.task_group import DbtTaskGroup diff --git a/cosmos/cleanup.py b/cosmos/cleanup.py new file mode 100644 index 000000000..7f09e34dd --- /dev/null +++ b/cosmos/cleanup.py @@ -0,0 +1,95 @@ +""" +Use this script locally to identify broken symbolic links or recursive loops locally: + $ python -m cosmos.cleanup -p + +To delete the issues identified, run: + $ python -m cosmos.cleanup -p -d +""" + +import argparse +import logging +import os +from pathlib import Path + +logger = logging.getLogger(__name__) + + +def identify_broken_symbolic_links(dir_path: str, should_delete: bool = False) -> None: + """ + Given a directory, recursively inspect it in search for symbolic links. + If should_delete is set to True, delete the symbolic links identified. + + :param dir_path: Path to the directory to be analysed + :param should_delete: Users should set to True if they want the method to not only identify but also delete these links. + """ + logger.info(f"Inspecting the directory {dir_path} for broken symbolic links.") + filepaths = [] + broken_symlinks_count = 0 + deleted_symlinks_count = 0 + for root_dir, dirs, files in os.walk(dir_path): + paths = [os.path.join(root_dir, filepath) for filepath in files] + filepaths.extend(paths) + + for filepath in filepaths: + try: + os.stat(filepath) + except OSError: + broken_symlinks_count += 1 + logger.warning(f"The folder {dir_path} contains a symbolic link to a non-existent file: {filepath}") + if should_delete: + logger.info(f"Deleting the invalid symbolic link: {filepath}") + os.unlink(filepath) + deleted_symlinks_count += 1 + + logger.info( + f"After inspecting {dir_path}, identified {broken_symlinks_count} broken links and deleted {deleted_symlinks_count} of them." + ) + + +# Airflow DAG parsing fails if recursive loops are found, so this method cannot be used from within an Airflow task +def identify_recursive_loops(original_dir_path: str, should_delete: bool = False) -> None: + """ + Given a directory, recursively inspect it in search for recursive loops. + If should_delete is set to True, delete the (symbolic links) recursive loops identified. + + :param dir_path: Path to the directory to be analysed + :param should_delete: Users should set to True if they want the method to not only identify but also delete these loops. + """ + logger.info(f"Inspecting the directory {original_dir_path} for recursive loops.") + dirs_paths = [] + broken_symlinks_count = 0 + deleted_symlinks_count = 0 + + dir_path = Path(original_dir_path).absolute() + + for root_dir, dirs, files in os.walk(dir_path): + paths = [os.path.join(root_dir, dir_name) for dir_name in dirs] + dirs_paths.extend(paths) + + for subdir_path in dirs_paths: + if os.path.islink(subdir_path): + symlink_target_path = os.path.realpath(subdir_path) + if Path(symlink_target_path) in Path(subdir_path).parents: + logger.warning(f"Detected recursive loop from {subdir_path} to {symlink_target_path}") + broken_symlinks_count += 1 + if should_delete: + logger.info(f"Deleting symbolic link: {subdir_path}") + os.unlink(subdir_path) + deleted_symlinks_count += 1 + + logger.info( + f"After inspecting {dir_path}, identified {broken_symlinks_count} recursive loops and deleted {deleted_symlinks_count} of them." + ) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Clean up local directory from broken symbolic links and recursive loops." + ) + parser.add_argument("-p", "--dir-path", help="Path to directory to be inspected", required=True) + parser.add_argument( + "-d", "--delete", help="Delete problems found", action="store_true", required=False, default=False + ) + args = parser.parse_args() + identify_recursive_loops(args.dir_path, args.delete) + identify_broken_symbolic_links(args.dir_path, args.delete) diff --git a/dev/dags/example_cosmos_cleanup_dir_dag.py b/dev/dags/example_cosmos_cleanup_dir_dag.py new file mode 100644 index 000000000..8f0a0d9ac --- /dev/null +++ b/dev/dags/example_cosmos_cleanup_dir_dag.py @@ -0,0 +1,38 @@ +""" +We've observed users who had dbt project directories containing symbolic links to files that no longer existed. + +Although this issue was not created by Cosmos itself, since this issue was already observed by two users, we thought it +was useful to give an example DAG illustrating how to clean the problematic directories. + +Assuming the cause of the issue no longer exists, this DAG can be run only once. +""" + +# [START dirty_dir_example] +from datetime import datetime +from pathlib import Path + +from airflow.decorators import dag, task + +from cosmos.cleanup import identify_broken_symbolic_links + +dbt_project_folder = Path(__file__).parent / "dbt" + + +@dag( + schedule_interval="0 0 * * 0", # Runs every Sunday + start_date=datetime(2023, 1, 1), + catchup=False, + tags=["example"], +) +def example_cosmos_cleanup_dir_dag(): + + @task() + def clear_broken_symlinks(session=None): + identify_broken_symbolic_links(dir_path=dbt_project_folder, should_delete=True) + + clear_broken_symlinks() + + +# [END dirty_dir_example] + +example_cosmos_cleanup_dir_dag()