Skip to content

Commit

Permalink
Fix installing deps when using profile_mapping & `ExecutionMode.LOC…
Browse files Browse the repository at this point in the history
…AL` (#659)

Extends the local operator when running `dbt deps` with the provides
profile flags.

This makes the logic consistent between DAG parsing and task running as
referenced below

https://github.com/astronomer/astronomer-cosmos/blob/8e2d5908ce89aa98813af6dfd112239e124bd69a/cosmos/dbt/graph.py#L247-L266

Closes: #658
(cherry picked from commit 6e41471)
  • Loading branch information
joppevos authored and tatiana committed Nov 15, 2023
1 parent 6fa970f commit 1eb931e
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 11 deletions.
26 changes: 16 additions & 10 deletions cosmos/operators/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,17 +204,11 @@ def run_command(
tmp_project_dir,
)

# if we need to install deps, do so
if self.install_deps:
self.run_subprocess(
command=[self.dbt_executable_path, "deps"],
env=env,
output_encoding=self.output_encoding,
cwd=tmp_project_dir,
)
with self.profile_config.ensure_profile() as (profile_path, env_vars):
with self.profile_config.ensure_profile() as profile_values:
(profile_path, env_vars) = profile_values
env.update(env_vars)
full_cmd = cmd + [

flags = [
"--profiles-dir",
str(profile_path.parent),
"--profile",
Expand All @@ -223,6 +217,18 @@ def run_command(
self.profile_config.target_name,
]

if self.install_deps:
deps_command = [self.dbt_executable_path, "deps"]
deps_command.extend(flags)
self.run_subprocess(
command=deps_command,
env=env,
output_encoding=self.output_encoding,
cwd=tmp_project_dir,
)

full_cmd = cmd + flags

logger.info("Trying to run the command:\n %s\nFrom %s", full_cmd, tmp_project_dir)
logger.info("Using environment variables keys: %s", env.keys())
result = self.run_subprocess(
Expand Down
43 changes: 43 additions & 0 deletions tests/operators/test_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -414,3 +414,46 @@ def test_calculate_openlineage_events_completes_openlineage_errors(mock_processo
assert instance.parse.called
err_msg = "Unable to parse OpenLineage events"
assert err_msg in caplog.text


@pytest.mark.parametrize(
"operator_class,expected_template",
[
(DbtSeedLocalOperator, ("env", "vars", "full_refresh")),
(DbtRunLocalOperator, ("env", "vars", "full_refresh")),
],
)
def test_dbt_base_operator_template_fields(operator_class, expected_template):
# Check if value of template fields is what we expect for the operators we're validating
dbt_base_operator = operator_class(profile_config=profile_config, task_id="my-task", project_dir="my/dir")
assert dbt_base_operator.template_fields == expected_template


@patch("cosmos.operators.local.DbtLocalBaseOperator.store_compiled_sql")
@patch("cosmos.operators.local.DbtLocalBaseOperator.exception_handling")
@patch("cosmos.config.ProfileConfig.ensure_profile")
@patch("cosmos.operators.local.DbtLocalBaseOperator.run_subprocess")
def test_operator_execute_deps_parameters(
mock_build_and_run_cmd, mock_ensure_profile, mock_exception_handling, mock_store_compiled_sql
):
expected_call_kwargs = [
"/usr/local/bin/dbt",
"deps",
"--profiles-dir",
"/path/to",
"--profile",
"default",
"--target",
"dev",
]
task = DbtRunLocalOperator(
profile_config=real_profile_config,
task_id="my-task",
project_dir=DBT_PROJ_DIR,
install_deps=True,
emit_datasets=False,
dbt_executable_path="/usr/local/bin/dbt",
)
mock_ensure_profile.return_value.__enter__.return_value = (Path("/path/to/profile"), {"ENV_VAR": "value"})
task.execute(context={"task_instance": MagicMock()})
assert mock_build_and_run_cmd.call_args_list[0].kwargs["command"] == expected_call_kwargs
2 changes: 1 addition & 1 deletion tests/operators/test_virtualenv.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def test_run_command(
dbt_cmd = run_command_args[2]
assert python_cmd[0][0][0].endswith("/bin/python")
assert python_cmd[0][-1][-1] == "from importlib.metadata import version; print(version('dbt-core'))"
assert dbt_deps[0][0][-1] == "deps"
assert dbt_deps[0][0][1] == "deps"
assert dbt_deps[0][0][0].endswith("/bin/dbt")
assert dbt_deps[0][0][0] == dbt_cmd[0][0][0]
assert dbt_cmd[0][0][1] == "do-something"
Expand Down

0 comments on commit 1eb931e

Please sign in to comment.