Skip to content

Commit

Permalink
Add docs and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
pankajkoti committed Aug 12, 2024
1 parent 4f00148 commit 48e34bc
Show file tree
Hide file tree
Showing 4 changed files with 148 additions and 5 deletions.
15 changes: 12 additions & 3 deletions docs/configuration/caching.rst
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ It is possible to turn off any cache in Cosmos by exporting the environment vari
Disabling individual types of cache in Cosmos is also possible, as explained below.

Caching the dbt ls output
~~~~~~~~~~~~~
~~~~~~~~~~~~~~~~~~~~~~~~~

(Introduced in Cosmos 1.5)

Expand All @@ -29,13 +29,22 @@ also the tasks queueing time.

Cosmos 1.5 introduced a feature to mitigate the performance issue associated with ``LoadMode.DBT_LS`` by caching the output
of this command as an `Airflow Variable <https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/variables.html>`_.
Based on an initial `analysis <https://github.com/astronomer/astronomer-cosmos/pull/1014>`_, enabling this setting reduced some DAGs ask queueing from 30s to 0s. Additionally, some users `reported improvements of 84% <https://github.com/astronomer/astronomer-cosmos/pull/1014#issuecomment-2168185343>`_ in the DAG run time.
Based on an initial `analysis <https://github.com/astronomer/astronomer-cosmos/pull/1014>`_, enabling this setting reduced some DAGs task queueing from 30s to 0s. Additionally, some users `reported improvements of 84% <https://github.com/astronomer/astronomer-cosmos/pull/1014#issuecomment-2168185343>`_ in the DAG run time.

This feature is on by default. To turn it off, export the following environment variable: ``AIRFLOW__COSMOS__ENABLE_CACHE_DBT_LS=0``.

(Introduced in Cosmos 1.6)

Starting with Cosmos 1.6.0, users can also set a remote path to store this cache instead of using Airflow Variables.
To do so, you need to configure a remote cache path. See :ref:`remote_cache_path` and :ref:`remote_cache_conn_id` for
more information.

**How the cache is refreshed**

Users can purge or delete the cache via Airflow UI by identifying and deleting the cache key.
If using the default Variables cache approach, users can purge or delete the cache via Airflow UI by identifying and
deleting the cache key. In case you're using the alternative approach by setting the ``remote_cache_path`` introduced
in Cosmos 1.6.0, you can delete the cache by removing the specific files by identifying them using your configured path
in the remote store.

Cosmos will refresh the cache in a few circumstances:

Expand Down
21 changes: 21 additions & 0 deletions docs/configuration/cosmos-conf.rst
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,27 @@ This page lists all available Airflow configurations that affect ``astronomer-co
- Default: ``profile``
- Environment Variable: ``AIRFLOW__COSMOS__PROFILE_CACHE_DIR_NAME``

.. _remote_cache_path:

`remote_cache_path`_:
The remote path to store the DBT cache. Starting with Cosmos 1.6.0, you can store the dbt ls output as cache in a
remote location (an alternative to the Variable cache approach released previously since Cosmos 1.5.0)
using this configuration. The remote paths are all schemes supported by the
`Airflow Object Store <https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/objectstorage.html>`_
feature introduced in Airflow 2.8.0 (e.g. ``s3://your_s3_bucket/cache_dir/``, ``gs://your_gs_bucket/cache_dir/``,
``abfs://your_azure_container/cache_r``, etc.)

- Default: ``None``
- Environment Variable: ``AIRFLOW__COSMOS__REMOTE_CACHE_PATH``

.. _remote_cache_conn_id:

`remote_cache_conn_id`_:
The connection ID for the remote cache path. If this is not set, the default Airflow connection ID identified for
the scheme will be used.

- Default: ``None``
- Environment Variable: ``AIRFLOW__COSMOS__REMOTE_CACHE_CONN_ID``

[openlineage]
~~~~~~~~~~~~~
Expand Down
61 changes: 61 additions & 0 deletions tests/dbt/test_graph.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import base64
import importlib
import json
import logging
import os
import shutil
import sys
import tempfile
import zlib
from datetime import datetime
from pathlib import Path
from subprocess import PIPE, Popen
Expand All @@ -24,6 +27,7 @@
run_command,
)
from cosmos.profiles import PostgresUserPasswordProfileMapping
from cosmos.settings import AIRFLOW_IO_AVAILABLE

DBT_PROJECTS_ROOT_DIR = Path(__file__).parent.parent.parent / "dev/dags/dbt"
DBT_PROJECT_NAME = "jaffle_shop"
Expand Down Expand Up @@ -1532,3 +1536,60 @@ def test_should_use_dbt_ls_cache(enable_cache, enable_cache_dbt_ls, cache_id, sh
graph = DbtGraph(cache_identifier=cache_id, project=ProjectConfig(dbt_project_path="/tmp"))
graph.should_use_dbt_ls_cache.cache_clear()
assert graph.should_use_dbt_ls_cache() == should_use


@pytest.mark.skipif(not AIRFLOW_IO_AVAILABLE, reason="Airflow did not have Object Storage until the 2.8 release")
@patch("airflow.io.path.ObjectStoragePath")
@patch("cosmos.config.ProjectConfig")
def test_save_dbt_ls_cache_remote_cache_path(mock_project_config, mock_object_storage_path):
mock_remote_cache_path = mock_object_storage_path.return_value
mock_remote_cache_path.exists.return_value = True

mock_project_config.remote_cache_path = mock_remote_cache_path
mock_project_config.dbt_vars = {"var1": "value1"}
mock_project_config.env_vars = {"var1": "value1"}
mock_project_config._calculate_dbt_ls_cache_current_version.return_value = "mock_version"

dbt_ls_output = "sample dbt ls output"
dbt_graph = DbtGraph(project=mock_project_config)

dbt_graph.save_dbt_ls_cache(dbt_ls_output)

mock_remote_cache_key_path = mock_remote_cache_path / dbt_graph.dbt_ls_cache_key / "dbt_ls_cache.json"
mock_remote_cache_key_path.open.assert_called_once_with("w")


@pytest.mark.skipif(not AIRFLOW_IO_AVAILABLE, reason="Airflow did not have Object Storage until the 2.8 release")
@patch("airflow.io.path.ObjectStoragePath")
@patch("cosmos.config.ProjectConfig")
def test_get_dbt_ls_cache_remote_cache_path(mock_project_config, mock_object_storage_path):
mock_remote_cache_path = mock_object_storage_path.return_value
mock_remote_cache_path.exists.return_value = True

mock_project_config.remote_cache_path = mock_remote_cache_path

dbt_ls_output = "sample dbt ls output"
compressed_data = zlib.compress(dbt_ls_output.encode("utf-8"))
encoded_data = base64.b64encode(compressed_data).decode("utf-8")

cache_dict = {
"version": "cache-version",
"dbt_ls_compressed": encoded_data,
"last_modified": "2024-08-13T12:34:56Z",
}

mock_remote_cache_key_path = mock_remote_cache_path / "some_cache_key" / "dbt_ls_cache.json"
mock_remote_cache_key_path.exists.return_value = True
mock_remote_cache_key_path.open.return_value.__enter__.return_value.read.return_value = json.dumps(cache_dict)

dbt_graph = DbtGraph(project=mock_project_config)

result = dbt_graph.get_dbt_ls_cache()

expected_result = {
"version": "cache-version",
"dbt_ls": dbt_ls_output,
"last_modified": "2024-08-13T12:34:56Z",
}

assert result == expected_result
56 changes: 54 additions & 2 deletions tests/test_config.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from contextlib import nullcontext as does_not_raise
from pathlib import Path
from unittest.mock import Mock, PropertyMock, call, patch
from unittest.mock import MagicMock, Mock, PropertyMock, call, patch

import pytest

from cosmos.config import CosmosConfigException, ExecutionConfig, ProfileConfig, ProjectConfig, RenderConfig
from cosmos.constants import ExecutionMode, InvocationMode
from cosmos.constants import ExecutionMode, InvocationMode, _default_s3_conn
from cosmos.exceptions import CosmosValueError
from cosmos.profiles.athena.access_key import AthenaAccessKeyProfileMapping
from cosmos.profiles.postgres.user_pass import PostgresUserPasswordProfileMapping
Expand Down Expand Up @@ -331,3 +331,55 @@ def test_remote_manifest_path(manifest_path, given_manifest_conn_id, used_manife
_ = ProjectConfig(
dbt_project_path="/tmp/some-path", manifest_path=manifest_path, manifest_conn_id=given_manifest_conn_id
)


@patch("cosmos.config.settings_remote_cache_path", new=None)
def test_remote_cache_path_initialization_no_remote_cache():
config = ProjectConfig(dbt_project_path="/some/path", project_name="test_project")
assert config.remote_cache_path is None


@patch("cosmos.config.settings_remote_cache_path", new="s3://some-bucket/cache")
@patch("cosmos.config.AIRFLOW_IO_AVAILABLE", new=False)
def test_remote_cache_path_initialization_object_storage_unavailable_on_earlier_airflow_versions():
with pytest.raises(CosmosValueError, match="Object Storage feature is unavailable"):
ProjectConfig(dbt_project_path="/some/path", project_name="test_project")


@pytest.mark.skipif(not AIRFLOW_IO_AVAILABLE, reason="Airflow did not have Object Storage until the 2.8 release")
@patch("cosmos.config.settings_remote_cache_path", new="s3://some-bucket/cache")
@patch("airflow.io.path.ObjectStoragePath")
def test_remote_cache_path_initialization_path_available_default_connection(mock_object_storage_path):
mock_cache_path = MagicMock()
mock_cache_path.exists.return_value = True
mock_object_storage_path.return_value = mock_cache_path

config = ProjectConfig(dbt_project_path="/some/path", project_name="test_project")
mock_object_storage_path.assert_called_with("s3://some-bucket/cache", conn_id=_default_s3_conn)
assert config.remote_cache_path == mock_cache_path


@pytest.mark.skipif(not AIRFLOW_IO_AVAILABLE, reason="Airflow did not have Object Storage until the 2.8 release")
@patch("cosmos.config.settings_remote_cache_path", new="s3://some-bucket/cache")
@patch("airflow.io.path.ObjectStoragePath")
def test_remote_cache_path_initialization_path_not_exist(mock_object_storage_path):
mock_cache_path = MagicMock()
mock_cache_path.exists.return_value = False
mock_object_storage_path.return_value = mock_cache_path

with pytest.raises(CosmosValueError, match="remote_cache_path `s3://some-bucket/cache` does not exist"):
ProjectConfig(dbt_project_path="/some/path", project_name="test_project")


@pytest.mark.skipif(not AIRFLOW_IO_AVAILABLE, reason="Airflow did not have Object Storage until the 2.8 release")
@patch("cosmos.config.settings_remote_cache_path", new="s3://some-bucket/cache")
@patch("cosmos.config.remote_cache_conn_id", new="my_conn_id")
@patch("airflow.io.path.ObjectStoragePath")
def test_remote_cache_path_initialization_with_conn_id(mock_object_storage_path):
mock_cache_path = MagicMock()
mock_cache_path.exists.return_value = True
mock_object_storage_path.return_value = mock_cache_path

config = ProjectConfig(dbt_project_path="/some/path", project_name="test_project")
mock_object_storage_path.assert_called_with("s3://some-bucket/cache", conn_id="my_conn_id")
assert config.remote_cache_path == mock_cache_path

0 comments on commit 48e34bc

Please sign in to comment.