Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: operator_args modified in place in Airflow converter #835

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cosmos/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,8 @@ def __init__(

validate_adapted_user_config(execution_config, project_config, render_config)

env_vars = project_config.env_vars or operator_args.pop("env", None)
dbt_vars = project_config.dbt_vars or operator_args.pop("vars", None)
env_vars = project_config.env_vars or operator_args.get("env")
dbt_vars = project_config.dbt_vars or operator_args.get("vars")

# Previously, we were creating a cosmos.dbt.project.DbtProject
# DbtProject has now been replaced with ProjectConfig directly
Expand Down
33 changes: 33 additions & 0 deletions tests/test_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,3 +405,36 @@ def test_converter_project_config_dbt_vars_with_custom_load_mode(
)
_, kwargs = mock_legacy_dbt_project.call_args
assert kwargs["dbt_vars"] == {"key": "value"}


@patch("cosmos.config.ProjectConfig.validate_project")
@patch("cosmos.converter.build_airflow_graph")
@patch("cosmos.converter.DbtGraph.load")
def test_converter_multiple_calls_same_operator_args(
mock_dbt_graph_load, mock_validate_project, mock_build_airflow_graph
):
"""Tests if the DbttoAirflowConverter is called more than once with the same operator_args, the
operator_args are not modified.
"""
project_config = ProjectConfig(project_name="fake-project", dbt_project_path="/some/project/path")
execution_config = ExecutionConfig()
render_config = RenderConfig()
profile_config = MagicMock()
operator_args = {
"install_deps": True,
"vars": {"key": "value"},
"env": {"key": "value"},
}
original_operator_args = operator_args.copy()
for _ in range(2):
with DAG("test-id", start_date=datetime(2022, 1, 1)) as dag:
DbtToAirflowConverter(
dag=dag,
nodes=nodes,
project_config=project_config,
profile_config=profile_config,
execution_config=execution_config,
render_config=render_config,
operator_args=operator_args,
)
assert operator_args == original_operator_args
Loading