Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve performance by 22-35% or more by caching partial parse artefact #904

Merged
merged 20 commits into from
May 1, 2024
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
901651d
Cache Cosmos-generated dbt partial parse in Airflow
tatiana Mar 28, 2024
cc5f797
Extend caching of partial parse to running tasks
tatiana Mar 28, 2024
9f67153
Allow users to disable mock profile
tatiana Mar 28, 2024
b51dcfe
🎨 [pre-commit.ci] Auto format from pre-commit.com hooks
pre-commit-ci[bot] Mar 28, 2024
9584aa4
Fix broken unittests
tatiana Mar 28, 2024
d91cb0d
Fix type-check
tatiana Mar 28, 2024
8f93b30
Refactor so we leverage cache even if user instantiates operator dire…
tatiana Mar 29, 2024
88c6a75
Fix broken unit tests
tatiana Apr 17, 2024
cf6a639
Increase test coverage and fix implementation
tatiana Apr 18, 2024
466c271
Fix broken unittest and add integration partial parsing test
tatiana Apr 19, 2024
a377b60
Try to overcome failing test in CI - that works locally
tatiana Apr 19, 2024
42aacaf
Restore test_plugin.py
tatiana Apr 19, 2024
bb4d353
Simplify how cache is set at the operator level
tatiana Apr 24, 2024
c66f808
Add test to confirm behaviour of converter on cache dir
tatiana Apr 24, 2024
6401096
Remove unused copy_msgpack_for_partial_parse
tatiana May 1, 2024
5e98dec
Use InvocationMode.DBT_RUNNER in another example DAG
tatiana May 1, 2024
12ca7f5
Add missing docstring information
tatiana May 1, 2024
7424a7f
Rely on Airflow conf fallback, removing redundancy
tatiana May 1, 2024
02ee94c
Make cache functions non-public, as advised in the code-review
tatiana May 1, 2024
2a3f007
Create cache identifier using double underscore to differentiate betw…
tatiana May 1, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 124 additions & 0 deletions cosmos/cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
from __future__ import annotations

import shutil
from pathlib import Path

from airflow.models.dag import DAG
from airflow.utils.task_group import TaskGroup

from cosmos import settings
from cosmos.constants import DBT_MANIFEST_FILE_NAME, DBT_TARGET_DIR_NAME
from cosmos.dbt.project import get_partial_parse_path


# It was considered to create a cache identifier based on the dbt project path, as opposed
# to where it is used in Airflow. However, we could have concurrency issues if the same
# dbt cached directory was being used by different dbt task groups or DAGs within the same
# node. For this reason, as a starting point, the cache is identified by where it is used.
# This can be reviewed in the future.
def create_cache_identifier(dag: DAG, task_group: TaskGroup | None) -> str:
tatiana marked this conversation as resolved.
Show resolved Hide resolved
"""
Given a DAG name and a (optional) task_group_name, create the identifier for caching.

:param dag_name: Name of the Cosmos DbtDag being cached
:param task_group_name: (optional) Name of the Cosmos DbtTaskGroup being cached
:return: Unique identifier representing the cache
"""
if task_group:
if task_group.dag_id is not None:
cache_identifiers_list = [task_group.dag_id]
if task_group.group_id is not None:
cache_identifiers_list.extend([task_group.group_id.replace(".", "_")])
cache_identifier = "_".join(cache_identifiers_list)
tatiana marked this conversation as resolved.
Show resolved Hide resolved
else:
cache_identifier = dag.dag_id

return cache_identifier


def obtain_cache_dir_path(cache_identifier: str, base_dir: Path = settings.cache_dir) -> Path:
"""
Return a directory used to cache a specific Cosmos DbtDag or DbtTaskGroup. If the directory
does not exist, create it.

:param cache_identifier: Unique key used as a cache identifier
:param base_dir: Root directory where cache will be stored
:return: Path to directory used to cache this specific Cosmos DbtDag or DbtTaskGroup
"""
cache_dir_path = base_dir / cache_identifier
tmp_target_dir = cache_dir_path / DBT_TARGET_DIR_NAME
tmp_target_dir.mkdir(parents=True, exist_ok=True)
return cache_dir_path


def get_timestamp(path: Path) -> float:
"""
Return the timestamp of a path or 0, if it does not exist.

:param path: Path to the file or directory of interest
:return: File or directory timestamp
"""
try:
timestamp = path.stat().st_mtime
except FileNotFoundError:
timestamp = 0
return timestamp


def get_latest_partial_parse(dbt_project_path: Path, cache_dir: Path) -> Path | None:
"""
Return the path to the latest partial parse file, if defined.

:param dbt_project_path: Original dbt project path
:param cache_dir: Path to the Cosmos project cache directory
:return: Either return the Path to the latest partial parse file, or None.
"""
project_partial_parse_path = get_partial_parse_path(dbt_project_path)
cosmos_cached_partial_parse_filepath = get_partial_parse_path(cache_dir)

age_project_partial_parse = get_timestamp(project_partial_parse_path)
age_cosmos_cached_partial_parse_filepath = get_timestamp(cosmos_cached_partial_parse_filepath)

if age_project_partial_parse and age_cosmos_cached_partial_parse_filepath:
if age_project_partial_parse > age_cosmos_cached_partial_parse_filepath:
return project_partial_parse_path
else:
return cosmos_cached_partial_parse_filepath
elif age_project_partial_parse:
return project_partial_parse_path
elif age_cosmos_cached_partial_parse_filepath:
tatiana marked this conversation as resolved.
Show resolved Hide resolved
return cosmos_cached_partial_parse_filepath

return None


def update_partial_parse_cache(latest_partial_parse_filepath: Path, cache_dir: Path) -> None:
"""
Update the cache to have the latest partial parse file contents.

:param latest_partial_parse_filepath: Path to the most up-to-date partial parse file
:param cache_dir: Path to the Cosmos project cache directory
"""
cache_path = get_partial_parse_path(cache_dir)
manifest_path = get_partial_parse_path(cache_dir).parent / DBT_MANIFEST_FILE_NAME
latest_manifest_filepath = latest_partial_parse_filepath.parent / DBT_MANIFEST_FILE_NAME

shutil.copy(str(latest_partial_parse_filepath), str(cache_path))
shutil.copy(str(latest_manifest_filepath), str(manifest_path))


def copy_partial_parse_to_project(partial_parse_filepath: Path, project_path: Path) -> None:
"""
Update target dbt project directory to have the latest partial parse file contents.

:param partial_parse_filepath: Path to the most up-to-date partial parse file
:param project_path: Path to the target dbt project directory
"""
target_partial_parse_file = get_partial_parse_path(project_path)
tmp_target_dir = project_path / DBT_TARGET_DIR_NAME
tmp_target_dir.mkdir(exist_ok=True)

source_manifest_filepath = partial_parse_filepath.parent / DBT_MANIFEST_FILE_NAME
target_manifest_filepath = target_partial_parse_file.parent / DBT_MANIFEST_FILE_NAME
shutil.copy(str(partial_parse_filepath), str(target_partial_parse_file))
shutil.copy(str(source_manifest_filepath), str(target_manifest_filepath))
5 changes: 3 additions & 2 deletions cosmos/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ class RenderConfig:
env_vars: dict[str, str] | None = None
dbt_project_path: InitVar[str | Path | None] = None
dbt_ls_path: Path | None = None

project_path: Path | None = field(init=False)
enable_mock_profile: bool = True
tatiana marked this conversation as resolved.
Show resolved Hide resolved

def __post_init__(self, dbt_project_path: str | Path | None) -> None:
if self.env_vars:
Expand Down Expand Up @@ -288,7 +288,8 @@ def ensure_profile(
with tempfile.TemporaryDirectory() as temp_dir:
temp_file = Path(temp_dir) / DEFAULT_PROFILES_FILE_NAME
logger.info(
"Creating temporary profiles.yml at %s with the following contents:\n%s",
"Creating temporary profiles.yml with use_mock_values=%s at %s with the following contents:\n%s",
use_mock_values,
temp_file,
profile_contents,
)
Expand Down
2 changes: 2 additions & 0 deletions cosmos/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@
DBT_PROFILE_PATH = Path(os.path.expanduser("~")).joinpath(".dbt/profiles.yml")
DEFAULT_DBT_PROFILE_NAME = "cosmos_profile"
DEFAULT_DBT_TARGET_NAME = "cosmos_target"
DEFAULT_COSMOS_CACHE_DIR_NAME = "cosmos"
DBT_LOG_PATH_ENVVAR = "DBT_LOG_PATH"
DBT_LOG_DIR_NAME = "logs"
DBT_TARGET_PATH_ENVVAR = "DBT_TARGET_PATH"
DBT_TARGET_DIR_NAME = "target"
DBT_PARTIAL_PARSE_FILE_NAME = "partial_parse.msgpack"
DBT_MANIFEST_FILE_NAME = "manifest.json"
DBT_LOG_FILENAME = "dbt.log"
DBT_BINARY_NAME = "dbt"

Expand Down
19 changes: 7 additions & 12 deletions cosmos/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from airflow.models.dag import DAG
from airflow.utils.task_group import TaskGroup

from cosmos import cache, settings
from cosmos.airflow.graph import build_airflow_graph
from cosmos.config import ExecutionConfig, ProfileConfig, ProjectConfig, RenderConfig
from cosmos.constants import ExecutionMode
Expand Down Expand Up @@ -214,8 +215,6 @@ def __init__(

validate_initial_user_config(execution_config, profile_config, project_config, render_config, operator_args)

# If we are using the old interface, we should migrate it to the new interface
# This is safe to do now since we have validated which config interface we're using
if project_config.dbt_project_path:
execution_config, render_config = migrate_to_new_interface(execution_config, project_config, render_config)

Expand All @@ -224,21 +223,16 @@ def __init__(
env_vars = project_config.env_vars or operator_args.get("env")
dbt_vars = project_config.dbt_vars or operator_args.get("vars")

# Previously, we were creating a cosmos.dbt.project.DbtProject
# DbtProject has now been replaced with ProjectConfig directly
# since the interface of the two classes were effectively the same
# Under this previous implementation, we were passing:
# - name, root dir, models dir, snapshots dir and manifest path
# Internally in the dbtProject class, we were defaulting the profile_path
# To be root dir/profiles.yml
# To keep this logic working, if converter is given no ProfileConfig,
# we can create a default retaining this value to preserve this functionality.
# We may want to consider defaulting this value in our actual ProjceConfig class?
cache_dir = None
if settings.enable_cache:
cache_dir = cache.obtain_cache_dir_path(cache_identifier=cache.create_cache_identifier(dag, task_group))

self.dbt_graph = DbtGraph(
project=project_config,
render_config=render_config,
execution_config=execution_config,
profile_config=profile_config,
cache_dir=cache_dir,
dbt_vars=dbt_vars,
)
self.dbt_graph.load(method=render_config.load_method, execution_mode=execution_config.execution_mode)
Expand All @@ -251,6 +245,7 @@ def __init__(
"emit_datasets": render_config.emit_datasets,
"env": env_vars,
"vars": dbt_vars,
"cache_dir": cache_dir,
}
if execution_config.dbt_executable_path:
task_args["dbt_executable_path"] = execution_config.dbt_executable_path
Expand Down
29 changes: 21 additions & 8 deletions cosmos/dbt/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import yaml

from cosmos import cache
from cosmos.config import ExecutionConfig, ProfileConfig, ProjectConfig, RenderConfig
from cosmos.constants import (
DBT_LOG_DIR_NAME,
Expand All @@ -23,7 +24,7 @@
LoadMode,
)
from cosmos.dbt.parser.project import LegacyDbtProject
from cosmos.dbt.project import copy_msgpack_for_partial_parse, create_symlinks, environ
from cosmos.dbt.project import copy_msgpack_for_partial_parse, create_symlinks, environ, get_partial_parse_path
from cosmos.dbt.selector import select_nodes
from cosmos.log import get_logger

Expand Down Expand Up @@ -73,7 +74,7 @@ def name(self) -> str:
def run_command(command: list[str], tmp_dir: Path, env_vars: dict[str, str]) -> str:
"""Run a command in a subprocess, returning the stdout."""
logger.info("Running command: `%s`", " ".join(command))
logger.info("Environment variable keys: %s", env_vars.keys())
logger.debug("Environment variable keys: %s", env_vars.keys())
process = Popen(
command,
stdout=PIPE,
Expand Down Expand Up @@ -136,13 +137,15 @@ def __init__(
render_config: RenderConfig = RenderConfig(),
execution_config: ExecutionConfig = ExecutionConfig(),
profile_config: ProfileConfig | None = None,
cache_dir: Path | None = None,
# dbt_vars only supported for LegacyDbtProject
dbt_vars: dict[str, str] | None = None,
):
self.project = project
self.render_config = render_config
self.profile_config = profile_config
self.execution_config = execution_config
self.cache_dir = cache_dir
self.dbt_vars = dbt_vars or {}

def load(
Expand Down Expand Up @@ -250,14 +253,19 @@ def load_via_dbt_ls(self) -> None:
f"Content of the dbt project dir {self.render_config.project_path}: `{os.listdir(self.render_config.project_path)}`"
)
tmpdir_path = Path(tmpdir)
create_symlinks(self.render_config.project_path, tmpdir_path, self.render_config.dbt_deps)

if self.project.partial_parse:
copy_msgpack_for_partial_parse(self.render_config.project_path, tmpdir_path)
abs_project_path = self.render_config.project_path.absolute()
create_symlinks(abs_project_path, tmpdir_path, self.render_config.dbt_deps)

with self.profile_config.ensure_profile(use_mock_values=True) as profile_values, environ(
self.project.env_vars or self.render_config.env_vars or {}
):
if self.project.partial_parse and self.cache_dir:
latest_partial_parse = cache.get_latest_partial_parse(abs_project_path, self.cache_dir)
logger.info("Partial parse is enabled and the latest partial parse file is %s", latest_partial_parse)
if latest_partial_parse is not None:
cache.copy_partial_parse_to_project(latest_partial_parse, tmpdir_path)

with self.profile_config.ensure_profile(
use_mock_values=self.render_config.enable_mock_profile
) as profile_values, environ(self.project.env_vars or self.render_config.env_vars or {}):
(profile_path, env_vars) = profile_values
env = os.environ.copy()
env.update(env_vars)
Expand Down Expand Up @@ -288,6 +296,11 @@ def load_via_dbt_ls(self) -> None:
self.nodes = nodes
self.filtered_nodes = nodes

if self.project.partial_parse and self.cache_dir:
partial_parse_file = get_partial_parse_path(tmpdir_path)
if partial_parse_file.exists():
cache.update_partial_parse_cache(partial_parse_file, self.cache_dir)

def load_via_dbt_ls_file(self) -> None:
"""
This is between dbt ls and full manifest. It allows to use the output (needs to be json output) of the dbt ls as a
Expand Down
9 changes: 8 additions & 1 deletion cosmos/dbt/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,15 @@ def create_symlinks(project_path: Path, tmp_dir: Path, ignore_dbt_packages: bool
os.symlink(project_path / child_name, tmp_dir / child_name)


def get_partial_parse_path(project_dir_path: Path) -> Path:
"""
Return the partial parse (partial_parse.msgpack) path for a given dbt project directory.
"""
return project_dir_path / DBT_TARGET_DIR_NAME / DBT_PARTIAL_PARSE_FILE_NAME


def copy_msgpack_for_partial_parse(project_path: Path, tmp_dir: Path) -> None:
tatiana marked this conversation as resolved.
Show resolved Hide resolved
partial_parse_file = Path(project_path) / DBT_TARGET_DIR_NAME / DBT_PARTIAL_PARSE_FILE_NAME
partial_parse_file = get_partial_parse_path(project_path)

if partial_parse_file.exists():
tmp_target_dir = tmp_dir / DBT_TARGET_DIR_NAME
Expand Down
6 changes: 6 additions & 0 deletions cosmos/operators/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import os
from abc import ABCMeta, abstractmethod
from functools import cached_property
from pathlib import Path
from typing import Any, Sequence, Tuple

import yaml
Expand All @@ -10,6 +12,7 @@
from airflow.utils.operator_helpers import context_to_airflow_vars
from airflow.utils.strings import to_boolean

from cosmos import cache
from cosmos.dbt.executable import get_system_dbt
from cosmos.log import get_logger

Expand Down Expand Up @@ -61,6 +64,7 @@ class AbstractDbtBaseOperator(BaseOperator, metaclass=ABCMeta):
(i.e. /home/astro/.pyenv/versions/dbt_venv/bin/dbt)
:param dbt_cmd_flags: List of flags to pass to dbt command
:param dbt_cmd_global_flags: List of dbt global flags to be passed to the dbt command
:param cache_dir: Directory used to cache Cosmos/dbt artifacts in Airflow worker nodes
"""

template_fields: Sequence[str] = ("env", "select", "exclude", "selector", "vars", "models")
Expand Down Expand Up @@ -108,6 +112,7 @@ def __init__(
dbt_executable_path: str = get_system_dbt(),
dbt_cmd_flags: list[str] | None = None,
dbt_cmd_global_flags: list[str] | None = None,
cache_dir: Path | None = None,
**kwargs: Any,
) -> None:
self.project_dir = project_dir
Expand Down Expand Up @@ -135,6 +140,7 @@ def __init__(
self.dbt_executable_path = dbt_executable_path
self.dbt_cmd_flags = dbt_cmd_flags
self.dbt_cmd_global_flags = dbt_cmd_global_flags or []
self.cache_dir = cache_dir
super().__init__(**kwargs)

def get_env(self, context: Context) -> dict[str, str | bytes | os.PathLike[Any]]:
Expand Down
Loading
Loading