diff --git a/cosmos/__init__.py b/cosmos/__init__.py index b73414c8c..e613dcd02 100644 --- a/cosmos/__init__.py +++ b/cosmos/__init__.py @@ -59,8 +59,8 @@ DbtSnapshotKubernetesOperator, DbtTestKubernetesOperator, ) -except ImportError as error: - logger.exception(error) +except ImportError: + logger.debug("To import Kubernetes modules, install astronomer-cosmos[kubernetes].", stack_info=True) DbtLSKubernetesOperator = MissingPackage( "cosmos.operators.kubernetes.DbtLSKubernetesOperator", "kubernetes", diff --git a/cosmos/dbt/graph.py b/cosmos/dbt/graph.py index 1ad6c1737..72b3eaa2b 100644 --- a/cosmos/dbt/graph.py +++ b/cosmos/dbt/graph.py @@ -155,6 +155,7 @@ def load_via_dbt_ls(self) -> None: env.update(env_vars) with tempfile.TemporaryDirectory() as tmpdir: + logger.info("Content of the dbt project dir <%s>: `%s`", self.project.dir, os.listdir(self.project.dir)) logger.info("Creating symlinks from %s to `%s`", self.project.dir, tmpdir) # We create symbolic links to the original directory files and directories. # This allows us to run the dbt command from within the temporary directory, outputting any necessary diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index b6d0d12c8..d74cfbe44 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -23,6 +23,7 @@ from airflow.datasets import Dataset except ModuleNotFoundError: is_openlineage_available = False + DbtLocalArtifactProcessor = None else: is_openlineage_available = True @@ -52,11 +53,11 @@ except (ImportError, ModuleNotFoundError): try: from openlineage.airflow.extractors.base import OperatorLineage - except (ImportError, ModuleNotFoundError) as error: + except (ImportError, ModuleNotFoundError): logger.warning( - "To enable emitting Openlineage events, upgrade to Airflow 2.7 or install astronomer-cosmos[openlineage]." + "To enable emitting Openlineage events, upgrade to Airflow 2.7 or install astronomer-cosmos[openlineage].", + stack_info=True, ) - logger.exception(error) is_openlineage_available = False @define @@ -276,7 +277,7 @@ def calculate_openlineage_events_completes( try: events = openlineage_processor.parse() self.openlineage_events_completes = events.completes - except (FileNotFoundError, NotImplementedError, ValueError): + except (FileNotFoundError, NotImplementedError, ValueError, KeyError): logger.debug("Unable to parse OpenLineage events", stack_info=True) def get_datasets(self, source: Literal["inputs", "outputs"]) -> list[Dataset]: diff --git a/tests/operators/test_local.py b/tests/operators/test_local.py index 728eea079..0898a2894 100644 --- a/tests/operators/test_local.py +++ b/tests/operators/test_local.py @@ -1,3 +1,4 @@ +import logging import os import shutil import tempfile @@ -370,3 +371,21 @@ def test_operator_execute_without_flags(mock_build_and_run_cmd, operator_class): ) task.execute(context={}) mock_build_and_run_cmd.assert_called_once_with(context={}) + + +@patch("cosmos.operators.local.DbtLocalArtifactProcessor") +def test_calculate_openlineage_events_completes_openlineage_errors(mock_processor, caplog): + instance = mock_processor.return_value + instance.parse = MagicMock(side_effect=KeyError) + caplog.set_level(logging.DEBUG) + dbt_base_operator = DbtLocalBaseOperator( + profile_config=profile_config, + task_id="my-task", + project_dir=DBT_PROJ_DIR, + should_store_compiled_sql=False, + ) + + dbt_base_operator.calculate_openlineage_events_completes(env={}, project_dir=DBT_PROJ_DIR) + assert instance.parse.called + err_msg = "Unable to parse OpenLineage events" + assert err_msg in caplog.text