Skip to content

Commit

Permalink
Fix behaviour when openlineage raises KeyError (#565)
Browse files Browse the repository at this point in the history
Make Cosmos more resilient to Openlineage errors.

The Openlineage library raises `KeyError` for some community dbt
projects (e.g.
https://apache-airflow.slack.com/archives/C059CC42E9W/p1695067128727239):
```
[2023-09-18, 19:54:12 UTC] {taskinstance.py:1935} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/cosmos/operators/local.py", line 360, in execute
    self.build_and_run_cmd(context=context)
  File "/usr/local/lib/python3.11/site-packages/cosmos/operators/local.py", line 356, in build_and_run_cmd
    result = self.run_command(cmd=dbt_cmd, env=env, context=context)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/cosmos/operators/local.py", line 230, in run_command
    self.calculate_openlineage_events_completes(env, Path(tmp_project_dir))
  File "/usr/local/lib/python3.11/site-packages/cosmos/operators/local.py", line 278, in calculate_openlineage_events_completes
    events = openlineage_processor.parse()
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/openlineage/common/provider/dbt/processor.py", line 211, in parse
    events += self.parse_test(context, nodes)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/openlineage/common/provider/dbt/processor.py", line 308, in parse_test
    assertions = self.parse_assertions(context, nodes)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/openlineage/common/provider/dbt/processor.py", line 367, in parse_assertions
    assertion=test_node["test_metadata"]["name"],
              ~~~~~~~~~^^^^^^^^^^^^^^^^^
KeyError: 'test_metadata'
```
Closes: #545

Co-authored-by: Javier Hernández Novoa <[email protected]>
  • Loading branch information
tatiana and javihernovoa committed Sep 28, 2023
1 parent 85f4f3b commit fb36be5
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 6 deletions.
4 changes: 2 additions & 2 deletions cosmos/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@
DbtSnapshotKubernetesOperator,
DbtTestKubernetesOperator,
)
except ImportError as error:
logger.exception(error)
except ImportError:
logger.debug("To import Kubernetes modules, install astronomer-cosmos[kubernetes].", stack_info=True)
DbtLSKubernetesOperator = MissingPackage(
"cosmos.operators.kubernetes.DbtLSKubernetesOperator",
"kubernetes",
Expand Down
1 change: 1 addition & 0 deletions cosmos/dbt/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ def load_via_dbt_ls(self) -> None:
env.update(env_vars)

with tempfile.TemporaryDirectory() as tmpdir:
logger.info("Content of the dbt project dir <%s>: `%s`", self.project.dir, os.listdir(self.project.dir))
logger.info("Creating symlinks from %s to `%s`", self.project.dir, tmpdir)
# We create symbolic links to the original directory files and directories.
# This allows us to run the dbt command from within the temporary directory, outputting any necessary
Expand Down
9 changes: 5 additions & 4 deletions cosmos/operators/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from airflow.datasets import Dataset
except ModuleNotFoundError:
is_openlineage_available = False
DbtLocalArtifactProcessor = None
else:
is_openlineage_available = True

Expand Down Expand Up @@ -52,11 +53,11 @@
except (ImportError, ModuleNotFoundError):
try:
from openlineage.airflow.extractors.base import OperatorLineage
except (ImportError, ModuleNotFoundError) as error:
except (ImportError, ModuleNotFoundError):
logger.warning(
"To enable emitting Openlineage events, upgrade to Airflow 2.7 or install astronomer-cosmos[openlineage]."
"To enable emitting Openlineage events, upgrade to Airflow 2.7 or install astronomer-cosmos[openlineage].",
stack_info=True,
)
logger.exception(error)
is_openlineage_available = False

@define
Expand Down Expand Up @@ -276,7 +277,7 @@ def calculate_openlineage_events_completes(
try:
events = openlineage_processor.parse()
self.openlineage_events_completes = events.completes
except (FileNotFoundError, NotImplementedError, ValueError):
except (FileNotFoundError, NotImplementedError, ValueError, KeyError):
logger.debug("Unable to parse OpenLineage events", stack_info=True)

def get_datasets(self, source: Literal["inputs", "outputs"]) -> list[Dataset]:
Expand Down
19 changes: 19 additions & 0 deletions tests/operators/test_local.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
import os
import shutil
import tempfile
Expand Down Expand Up @@ -370,3 +371,21 @@ def test_operator_execute_without_flags(mock_build_and_run_cmd, operator_class):
)
task.execute(context={})
mock_build_and_run_cmd.assert_called_once_with(context={})


@patch("cosmos.operators.local.DbtLocalArtifactProcessor")
def test_calculate_openlineage_events_completes_openlineage_errors(mock_processor, caplog):
instance = mock_processor.return_value
instance.parse = MagicMock(side_effect=KeyError)
caplog.set_level(logging.DEBUG)
dbt_base_operator = DbtLocalBaseOperator(
profile_config=profile_config,
task_id="my-task",
project_dir=DBT_PROJ_DIR,
should_store_compiled_sql=False,
)

dbt_base_operator.calculate_openlineage_events_completes(env={}, project_dir=DBT_PROJ_DIR)
assert instance.parse.called
err_msg = "Unable to parse OpenLineage events"
assert err_msg in caplog.text

0 comments on commit fb36be5

Please sign in to comment.