Skip to content

Commit

Permalink
Refactor dbt ls to run from a temporary directory
Browse files Browse the repository at this point in the history
As of Cosmos 1.0.0, `LoadMode.DBT_LS` ran `dbt ls` from within the original dbt project directory.

The `dbt ls` outputs files to the directory it's running from unless the environment variables `DBT_LOG_PATH` and `DBT_TARGET_PATH` are specified.

Depending on the deployment, the Airflow worker does not have write permissions to the dbt project directory. This PR changes the behavior of `dbt ls` to make a copy of the original project directory into a temporary directory and run the command `dbt ls` from there.

Closes: #411
  • Loading branch information
tatiana committed Jul 28, 2023
1 parent 6665d8d commit 5821a9c
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 25 deletions.
54 changes: 29 additions & 25 deletions cosmos/dbt/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import logging
import os
import shutil
import tempfile
from dataclasses import dataclass, field
from pathlib import Path
from subprocess import Popen, PIPE
Expand Down Expand Up @@ -143,33 +144,36 @@ def load_via_dbt_ls(self) -> None:
if self.select:
command.extend(["--select", *self.select])

with self.profile_config.ensure_profile() as (profile_path, env_vars):
command.extend(
[
"--profiles-dir",
str(profile_path.parent),
"--profile",
self.profile_config.profile_name,
"--target",
self.profile_config.target_name,
]
)
with tempfile.TemporaryDirectory() as tmpdir:
shutil.copytree(self.project.dir, tmpdir, dirs_exist_ok=True)

with self.profile_config.ensure_profile() as (profile_path, env_vars):
command.extend(
[
"--profiles-dir",
str(profile_path.parent),
"--profile",
self.profile_config.profile_name,
"--target",
self.profile_config.target_name,
]
)

env = os.environ.copy()
env.update(env_vars)

logger.info("Running command: `%s`", " ".join(command))
logger.info("Environment variable keys: %s", env.keys())
process = Popen(
command,
stdout=PIPE,
stderr=PIPE,
cwd=self.project.dir,
universal_newlines=True,
env=env,
)
env = os.environ.copy()
env.update(env_vars)

logger.info("Running command: `%s`", " ".join(command))
logger.info("Environment variable keys: %s", env.keys())
process = Popen(
command,
stdout=PIPE,
stderr=PIPE,
cwd=tmpdir,
universal_newlines=True,
env=env,
)

stdout, stderr = process.communicate()
stdout, stderr = process.communicate()

logger.debug("Output: %s", stdout)

Expand Down
22 changes: 22 additions & 0 deletions tests/dbt/test_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,28 @@ def test_load(
assert load_function.called


@patch("cosmos.dbt.graph.Popen")
def test_load_via_dbt_ls_uses_temp_dir(mock_popen):
mock_popen().communicate.return_value = ("", "")
dbt_project = DbtProject(name="jaffle_shop", root_dir=DBT_PROJECTS_ROOT_DIR)
dbt_graph = DbtGraph(
project=dbt_project,
profile_config=ProfileConfig(
profile_name="default",
target_name="default",
profile_mapping=PostgresUserPasswordProfileMapping(
conn_id="airflow_db",
profile_args={"schema": "public"},
),
),
)

dbt_graph.load_via_dbt_ls()
used_cwd = Path(mock_popen.call_args[0][0][-5])
assert used_cwd != dbt_project.dir
assert not used_cwd.exists()


@pytest.mark.integration
def test_load_via_dbt_ls_with_exclude():
dbt_project = DbtProject(name="jaffle_shop", root_dir=DBT_PROJECTS_ROOT_DIR)
Expand Down

0 comments on commit 5821a9c

Please sign in to comment.