Skip to content

Commit

Permalink
Fix issue #945
Browse files Browse the repository at this point in the history
  • Loading branch information
tatiana committed May 9, 2024
1 parent 1d5cbf0 commit fd79630
Show file tree
Hide file tree
Showing 6 changed files with 121 additions and 6 deletions.
5 changes: 5 additions & 0 deletions cosmos/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from pathlib import Path

import aenum
from packaging.version import Version

DBT_PROFILE_PATH = Path(os.path.expanduser("~")).joinpath(".dbt/profiles.yml")
DEFAULT_DBT_PROFILE_NAME = "cosmos_profile"
Expand All @@ -20,6 +21,10 @@
DEFAULT_OPENLINEAGE_NAMESPACE = "cosmos"
OPENLINEAGE_PRODUCER = "https://github.com/astronomer/astronomer-cosmos/"

# Cosmos will not emit datasets for the following Airflow versions, due to a breaking change that's fixed in later Airflow 2.x versions
# https://github.com/apache/airflow/issues/39486
PARTIALLY_SUPPORTED_AIRFLOW_VERSIONS = [Version("2.9.0"), Version("2.9.1")]


class LoadMode(Enum):
"""
Expand Down
4 changes: 4 additions & 0 deletions cosmos/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,7 @@

class CosmosValueError(ValueError):
"""Raised when a Cosmos config value is invalid."""


class AirflowCompatibilityError(Exception):
"""Raised when Cosmos features are limited for Airflow version being used."""
17 changes: 16 additions & 1 deletion cosmos/operators/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from cosmos import cache
from cosmos.constants import InvocationMode
from cosmos.dbt.project import get_partial_parse_path
from cosmos.exceptions import AirflowCompatibilityError

try:
from airflow.datasets import Dataset
Expand Down Expand Up @@ -407,7 +408,21 @@ def get_datasets(self, source: Literal["inputs", "outputs"]) -> list[Dataset]:
dataset_uri = output.namespace + "/" + output.name
uris.append(dataset_uri)
logger.debug("URIs to be converted to Dataset: %s", uris)
return [Dataset(uri) for uri in uris]

datasets = []
try:
datasets = [Dataset(uri) for uri in uris]
except ValueError as e:
raise AirflowCompatibilityError(
"""
Apache Airflow 2.9.0 & 2.9.1 introduced a breaking change in Dataset URIs, to be fixed in newer versions:
https://github.com/apache/airflow/issues/39486
If you want to use Cosmos with one of these Airflow versions, you will have to disable emission of Datasets:
By setting ``emit_datasets=False`` in ``RenderConfig``. For more information, see https://astronomer.github.io/astronomer-cosmos/configuration/render-config.html.
"""
)
return datasets

def register_dataset(self, new_inlets: list[Dataset], new_outlets: list[Dataset]) -> None:
"""
Expand Down
1 change: 1 addition & 0 deletions dev/dags/example_virtualenv.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
"py_system_site_packages": False,
"py_requirements": ["dbt-postgres==1.6.0b1"],
"install_deps": True,
"emit_datasets": False, # Example of how to not set inlets and outlets
},
# normal dag parameters
schedule_interval="@daily",
Expand Down
92 changes: 88 additions & 4 deletions tests/operators/test_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@

from cosmos import cache
from cosmos.config import ProfileConfig
from cosmos.constants import InvocationMode
from cosmos.constants import PARTIALLY_SUPPORTED_AIRFLOW_VERSIONS, InvocationMode
from cosmos.dbt.parser.output import (
extract_dbt_runner_issues,
parse_number_of_warnings_dbt_runner,
parse_number_of_warnings_subprocess,
)
from cosmos.exceptions import AirflowCompatibilityError
from cosmos.operators.local import (
DbtBuildLocalOperator,
DbtDocsAzureStorageLocalOperator,
Expand Down Expand Up @@ -384,11 +385,12 @@ def test_dbt_test_local_operator_invocation_mode_methods(mock_extract_log_issues


@pytest.mark.skipif(
version.parse(airflow_version) < version.parse("2.4"),
reason="Airflow DAG did not have datasets until the 2.4 release",
version.parse(airflow_version) < version.parse("2.4")
or version.parse(airflow_version) in PARTIALLY_SUPPORTED_AIRFLOW_VERSIONS,
reason="Airflow DAG did not have datasets until the 2.4 release, inlets and outlets do not work by default in Airflow 2.9.0 and 2.9.1",
)
@pytest.mark.integration
def test_run_operator_dataset_inlets_and_outlets():
def test_run_operator_dataset_inlets_and_outlets(caplog):
from airflow.datasets import Dataset

with DAG("test-id-1", start_date=datetime(2022, 1, 1)) as dag:
Expand Down Expand Up @@ -417,13 +419,95 @@ def test_run_operator_dataset_inlets_and_outlets():
append_env=True,
)
seed_operator >> run_operator >> test_operator

run_test_dag(dag)

assert run_operator.inlets == []
assert run_operator.outlets == [Dataset(uri="postgres://0.0.0.0:5432/postgres.public.stg_customers", extra=None)]
assert test_operator.inlets == [Dataset(uri="postgres://0.0.0.0:5432/postgres.public.stg_customers", extra=None)]
assert test_operator.outlets == []


@pytest.mark.skipif(
version.parse(airflow_version) not in PARTIALLY_SUPPORTED_AIRFLOW_VERSIONS,
reason="Airflow 2.9.0 and 2.9.1 have a breaking change in Dataset URIs",
# https://github.com/apache/airflow/issues/39486
)
@pytest.mark.integration
def test_run_operator_dataset_emission_fails(caplog):
from airflow.datasets import Dataset

with DAG("test-id-1", start_date=datetime(2022, 1, 1)) as dag:
seed_operator = DbtSeedLocalOperator(
profile_config=real_profile_config,
project_dir=DBT_PROJ_DIR,
task_id="seed",
dbt_cmd_flags=["--select", "raw_customers"],
install_deps=True,
append_env=True,
)
run_operator = DbtRunLocalOperator(
profile_config=real_profile_config,
project_dir=DBT_PROJ_DIR,
task_id="run",
dbt_cmd_flags=["--models", "stg_customers"],
install_deps=True,
append_env=True,
)

seed_operator >> run_operator

with pytest.raises(AirflowCompatibilityError) as exc:
run_test_dag(dag)

err_msg = str(exc.value)
assert (
"Apache Airflow 2.9.0 & 2.9.1 introduced a breaking change in Dataset URIs, to be fixed in newer versions"
in err_msg
)
assert (
"If you want to use Cosmos with one of these Airflow versions, you will have to disable emission of Datasets"
in err_msg
)


@pytest.mark.skipif(
version.parse(airflow_version) not in PARTIALLY_SUPPORTED_AIRFLOW_VERSIONS,
reason="Airflow 2.9.0 and 2.9.1 have a breaking change in Dataset URIs",
# https://github.com/apache/airflow/issues/39486
)
@pytest.mark.integration
def test_run_operator_dataset_emission_is_skipped(caplog):
from airflow.datasets import Dataset

with DAG("test-id-1", start_date=datetime(2022, 1, 1)) as dag:
seed_operator = DbtSeedLocalOperator(
profile_config=real_profile_config,
project_dir=DBT_PROJ_DIR,
task_id="seed",
dbt_cmd_flags=["--select", "raw_customers"],
install_deps=True,
append_env=True,
emit_datasets=False,
)
run_operator = DbtRunLocalOperator(
profile_config=real_profile_config,
project_dir=DBT_PROJ_DIR,
task_id="run",
dbt_cmd_flags=["--models", "stg_customers"],
install_deps=True,
append_env=True,
emit_datasets=False,
)

seed_operator >> run_operator

run_test_dag(dag)

assert run_operator.inlets == []
assert run_operator.outlets == []


@pytest.mark.integration
def test_run_operator_caches_partial_parsing(caplog, tmp_path):
caplog.set_level(logging.DEBUG)
Expand Down
8 changes: 7 additions & 1 deletion tests/test_example_dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
from dbt.version import get_installed_version as get_dbt_version
from packaging.version import Version

from cosmos.constants import PARTIALLY_SUPPORTED_AIRFLOW_VERSIONS

from . import utils as test_utils

EXAMPLE_DAGS_DIR = Path(__file__).parent.parent / "dev/dags"
Expand All @@ -28,6 +30,7 @@

IGNORED_DAG_FILES = ["performance_dag.py"]


# Sort descending based on Versions and convert string to an actual version
MIN_VER_DAG_FILE_VER: dict[Version, list[str]] = {
Version(version): MIN_VER_DAG_FILE[version] for version in sorted(MIN_VER_DAG_FILE, key=Version, reverse=True)
Expand All @@ -50,7 +53,10 @@ def get_dag_bag() -> DagBag:
"""Create a DagBag by adding the files that are not supported to .airflowignore"""
with open(AIRFLOW_IGNORE_FILE, "w+") as file:
for min_version, files in MIN_VER_DAG_FILE_VER.items():
if Version(airflow.__version__) < min_version:
if (
Version(airflow.__version__) < min_version
or Version(airflow.__version__) in PARTIALLY_SUPPORTED_AIRFLOW_VERSIONS
):
print(f"Adding {files} to .airflowignore")
file.writelines([f"{file}\n" for file in files])

Expand Down

0 comments on commit fd79630

Please sign in to comment.