Skip to content

Commit

Permalink
Fix ExecutionMode.KUBERNETES on Cosmos 1.x (#554)
Browse files Browse the repository at this point in the history
Fix behaviour when using `ExecutionMode.KUBERNETES`, broken between the
Cosmos releases 1.0.0 and 1.1.1.

Update the documentation to be representative of the 1.x Cosmos
interface:

https://astronomer.github.io/astronomer-cosmos/getting_started/kubernetes.html

Add unit tests to avoid regressions on these fixes.

Part of the documentation fixes are made in:
astronomer/cosmos-example#4

As a next step, we must ensure integration tests for running Cosmos on
K8s to avoid this breaking change moving forward (issue #535).

Closes: #493
Closes: #548
Closes: #534

Co-authored-by: Pádraic Slattery <[email protected]>
  • Loading branch information
tatiana and pgoslatara committed Sep 27, 2023
1 parent 9be9d3b commit 5ccc126
Show file tree
Hide file tree
Showing 10 changed files with 93 additions and 16 deletions.
1 change: 0 additions & 1 deletion cosmos/airflow/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,6 @@ def build_airflow_graph(
and test_behavior == TestBehavior.AFTER_EACH
and node.has_test is True
)

task_meta = create_task_metadata(
node=node, execution_mode=execution_mode, args=task_args, use_task_group=use_task_group
)
Expand Down
1 change: 0 additions & 1 deletion cosmos/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,6 @@ def __init__(
"profile_config": profile_config,
"emit_datasets": emit_datasets,
}

if dbt_executable_path:
task_args["dbt_executable_path"] = dbt_executable_path

Expand Down
3 changes: 3 additions & 0 deletions cosmos/operators/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class DbtBaseOperator(BaseOperator):
:param cache_selected_only:
:param no_version_check: dbt optional argument - If set, skip ensuring dbt's version matches the one specified in
the dbt_project.yml file ('require-dbt-version')
:param emit_datasets: Enable emitting inlets and outlets during task execution
:param fail_fast: dbt optional argument to make dbt exit immediately if a single resource fails to build.
:param quiet: dbt optional argument to show only error logs in stdout
:param warn_error: dbt optional argument to convert dbt warnings into errors
Expand Down Expand Up @@ -87,6 +88,7 @@ def __init__(
selector: str | None = None,
vars: dict[str, str] | None = None,
models: str | None = None,
emit_datasets: bool = True,
cache_selected_only: bool = False,
no_version_check: bool = False,
fail_fast: bool = False,
Expand All @@ -112,6 +114,7 @@ def __init__(
self.selector = selector
self.vars = vars
self.models = models
self.emit_datasets = emit_datasets
self.cache_selected_only = cache_selected_only
self.no_version_check = no_version_check
self.fail_fast = fail_fast
Expand Down
3 changes: 3 additions & 0 deletions cosmos/operators/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ def build_kube_args(self, context: Context, cmd_flags: list[str] | None = None)
if self.profile_config.target_name:
dbt_cmd.extend(["--target", self.profile_config.target_name])

if self.project_dir:
dbt_cmd.extend(["--project-dir", str(self.project_dir)])

# set env vars
self.build_env_args(env_vars)
self.arguments = dbt_cmd
Expand Down
3 changes: 1 addition & 2 deletions cosmos/operators/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ class DbtLocalBaseOperator(DbtBaseOperator):
:param profile_name: A name to use for the dbt profile. If not provided, and no profile target is found
in your project's dbt_project.yml, "cosmos_profile" is used.
:param install_deps: If true, install dependencies before running the command
:param install_deps: If true, the operator will set inlets and outlets
:param callback: A callback function called on after a dbt run with a path to the dbt project directory.
:param target_name: A name to use for the dbt target. If not provided, and no target is found
in your project's dbt_project.yml, "cosmos_target" is used.
Expand All @@ -99,15 +100,13 @@ def __init__(
install_deps: bool = False,
callback: Callable[[str], None] | None = None,
should_store_compiled_sql: bool = True,
emit_datasets: bool = True,
**kwargs: Any,
) -> None:
self.profile_config = profile_config
self.install_deps = install_deps
self.callback = callback
self.compiled_sql = ""
self.should_store_compiled_sql = should_store_compiled_sql
self.emit_datasets = emit_datasets
self.openlineage_events_completes: list[RunEvent] = []
super().__init__(**kwargs)

Expand Down
Binary file modified docs/_static/jaffle_shop_k8s_dag_run.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
29 changes: 18 additions & 11 deletions docs/getting_started/kubernetes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -30,20 +30,27 @@ For instance,

.. code-block:: text
DbtTaskGroup(
...
run_models = DbtTaskGroup(
profile_config=ProfileConfig(
profile_name="postgres_profile",
target_name="dev",
profile_mapping=PostgresUserPasswordProfileMapping(
conn_id="postgres_default",
profile_args={
"schema": "public",
},
),
),
project_config=ProjectConfig(PROJECT_DIR),
execution_config=ExecutionConfig(
execution_mode=ExecutionMode.KUBERNETES,
),
operator_args={
"queue": "kubernetes",
"image": "dbt-jaffle-shop:1.0.0",
"image_pull_policy": "Always",
"image": DBT_IMAGE,
"get_logs": True,
"is_delete_operator_pod": False,
"namespace": "default",
"env_vars": {
...
},
"secrets": [postgres_password_secret, postgres_host_secret],
},
execution_mode="kubernetes",
)
Step-by-step instructions
Expand All @@ -53,7 +60,7 @@ Using installed `Kind <https://kind.sigs.k8s.io/>`_, you can setup a local kuber

.. code-block:: bash
kind cluster create
kind create cluster
Deploy a Postgres pod to Kind using `Helm <https://helm.sh/docs/helm/helm_install/>`_

Expand Down
4 changes: 4 additions & 0 deletions tests/operators/test_kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ def test_dbt_kubernetes_build_command():
"end_time: '{{ data_interval_end.strftime(''%Y%m%d%H%M%S'') }}'\n"
"start_time: '{{ data_interval_start.strftime(''%Y%m%d%H%M%S'') }}'\n",
"--no-version-check",
"--project-dir",
"my/dir",
]


Expand Down Expand Up @@ -150,6 +152,8 @@ def test_created_pod(test_hook):
"data_interval_start.strftime(''%Y%m%d%H%M%S'') "
"}}'\n",
"--no-version-check",
"--project-dir",
"my/dir",
],
"command": [],
"env": [{"name": "FOO", "value": "BAR", "value_from": None}],
Expand Down
12 changes: 12 additions & 0 deletions tests/sample/profiles.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
default:
target: dev
outputs:
dev:
type: postgres
host: "localhost"
user: "postgres"
password: "postgres"
port: 5432
dbname: "postgres"
schema: "public"
threads: 4
53 changes: 52 additions & 1 deletion tests/test_converter.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,17 @@
from pathlib import Path

from unittest.mock import patch
import pytest

from cosmos.converter import DbtToAirflowConverter, validate_arguments
from cosmos.constants import DbtResourceType, ExecutionMode
from cosmos.config import ProjectConfig, ProfileConfig, ExecutionConfig, RenderConfig
from cosmos.dbt.graph import DbtNode
from cosmos.exceptions import CosmosValueError


from cosmos.converter import validate_arguments
SAMPLE_PROFILE_YML = Path(__file__).parent / "sample/profiles.yml"
SAMPLE_DBT_PROJECT = Path(__file__).parent / "sample/"


@pytest.mark.parametrize("argument_key", ["tags", "paths"])
Expand All @@ -16,3 +25,45 @@ def test_validate_arguments_tags(argument_key):
validate_arguments(select, exclude, profile_args, task_args)
expected = f"Can't specify the same {selector_name} in `select` and `exclude`: {{'b'}}"
assert err.value.args[0] == expected


parent_seed = DbtNode(
name="seed_parent",
unique_id="seed_parent",
resource_type=DbtResourceType.SEED,
depends_on=[],
file_path="",
)
nodes = {"seed_parent": parent_seed}


@pytest.mark.parametrize(
"execution_mode,operator_args",
[
(ExecutionMode.KUBERNETES, {}),
# (ExecutionMode.DOCKER, {"image": "sample-image"}),
],
)
@patch("cosmos.converter.DbtGraph.filtered_nodes", nodes)
@patch("cosmos.converter.DbtGraph.load")
def test_converter_creates_dag_with_seed(mock_load_dbt_graph, execution_mode, operator_args):
"""
This test will raise exceptions if we are trying to pass incorrect arguments to operator constructors.
"""
project_config = ProjectConfig(dbt_project_path=SAMPLE_DBT_PROJECT)
execution_config = ExecutionConfig(execution_mode=execution_mode)
render_config = RenderConfig(emit_datasets=True)
profile_config = ProfileConfig(
profile_name="my_profile_name",
target_name="my_target_name",
profiles_yml_filepath=SAMPLE_PROFILE_YML,
)
converter = DbtToAirflowConverter(
nodes=nodes,
project_config=project_config,
profile_config=profile_config,
execution_config=execution_config,
render_config=render_config,
operator_args=operator_args,
)
assert converter

0 comments on commit 5ccc126

Please sign in to comment.