Skip to content

Commit

Permalink
Add utility to delete unused remote cache files and include it in exa…
Browse files Browse the repository at this point in the history
…mple DAG
  • Loading branch information
pankajkoti committed Aug 15, 2024
1 parent a6cd938 commit 94eebc1
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 1 deletion.
58 changes: 58 additions & 0 deletions cosmos/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,64 @@ def delete_unused_dbt_ls_cache(
return deleted_cosmos_variables


# TODO: Add integration tests once remote cache is supported in the CI pipeline
@provide_session
def delete_unused_dbt_ls_remote_cache_files( # pragma: no cover
max_age_last_usage: timedelta = timedelta(days=30), session: Session | None = None
) -> int:
"""
Delete Cosmos cache stored in remote storage based on the last execution of their associated DAGs.
"""
if session is None:
return 0

logger.info(f"Delete the Cosmos cache stored remotely that hasn't been used for {max_age_last_usage}")
cosmos_dags_ids_remote_cache_files = defaultdict(list)

configured_remote_cache_dir = _configure_remote_cache_dir()
if not configured_remote_cache_dir:
logger.info(
"No remote cache directory configured. Skipping the deletion of the dbt ls cache files in remote storage."
)
return 0

dirs = [obj for obj in configured_remote_cache_dir.iterdir() if obj.is_dir()]
files = [f for label in dirs for f in label.iterdir() if f.is_file()]

total_cosmos_remote_cache_files = 0
for file in files:
prefix_path = (configured_remote_cache_dir / VAR_KEY_CACHE_PREFIX).as_uri()
if file.as_uri().startswith(prefix_path):
with file.open("r") as fp:
cache_dict = json.load(fp)
cosmos_dags_ids_remote_cache_files[cache_dict["dag_id"]].append(file)
total_cosmos_remote_cache_files += 1

deleted_cosmos_remote_cache_files = 0

for dag_id, files in cosmos_dags_ids_remote_cache_files.items():
last_dag_run = (
session.query(DagRun)
.filter(
DagRun.dag_id == dag_id,
)
.order_by(DagRun.execution_date.desc())
.first()
)
if last_dag_run and last_dag_run.execution_date < (datetime.now(timezone.utc) - max_age_last_usage):
for file in files:
logger.info(f"Removing the dbt ls cache remote file {file}")
file.unlink()
deleted_cosmos_remote_cache_files += 1
logger.info(
"Deleted %s/%s dbt ls cache files in remote storage.",
deleted_cosmos_remote_cache_files,
total_cosmos_remote_cache_files,
)

return deleted_cosmos_remote_cache_files


def is_profile_cache_enabled() -> bool:
"""Return True if global and profile cache is enable"""
return enable_cache and enable_cache_profile
Expand Down
11 changes: 10 additions & 1 deletion dev/dags/example_cosmos_cleanup_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from airflow.decorators import dag, task

from cosmos.cache import delete_unused_dbt_ls_cache
from cosmos.cache import delete_unused_dbt_ls_cache, delete_unused_dbt_ls_remote_cache_files


@dag(
Expand All @@ -28,6 +28,15 @@ def clear_db_ls_cache(session=None):

clear_db_ls_cache()

@task()
def clear_db_ls_remote_cache(session=None):
"""
Delete the dbt ls remote cache files that have not been used for the last five days.
"""
delete_unused_dbt_ls_remote_cache_files(max_age_last_usage=timedelta(days=5))

clear_db_ls_remote_cache()


# [END cache_example]

Expand Down

0 comments on commit 94eebc1

Please sign in to comment.