Skip to content

Commit

Permalink
Remove default create_dataset method.
Browse files Browse the repository at this point in the history
Add section in experimental lineage docs.

Signed-off-by: Jakub Dardzinski <[email protected]>
  • Loading branch information
JDarDagran committed Jul 15, 2024
1 parent 21590f9 commit 5f6c253
Show file tree
Hide file tree
Showing 8 changed files with 162 additions and 82 deletions.
5 changes: 0 additions & 5 deletions airflow/datasets/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,6 @@ def normalize_noop(parts: SplitResult) -> SplitResult:
return parts


def create_dataset(uri: str) -> Dataset:
"""Create a dataset object from a dataset URI."""
return Dataset(uri=uri)


def _get_uri_normalizer(scheme: str) -> Callable[[SplitResult], SplitResult] | None:
if scheme == "file":
return normalize_noop
Expand Down
70 changes: 50 additions & 20 deletions airflow/lineage/hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

import attr

from airflow.datasets import Dataset, create_dataset
from airflow.datasets import Dataset
from airflow.hooks.base import BaseHook
from airflow.io.store import ObjectStore
from airflow.providers_manager import ProvidersManager
Expand Down Expand Up @@ -53,36 +53,61 @@ def __init__(self, **kwargs):
self.inputs: list[tuple[Dataset, LineageContext]] = []
self.outputs: list[tuple[Dataset, LineageContext]] = []

@staticmethod
def create_dataset(dataset_kwargs: dict) -> Dataset:
def create_dataset(
self, scheme: str | None, uri: str | None, dataset_kwargs: dict | None, dataset_extra: dict | None
) -> Dataset | None:
"""Create a Dataset instance from the given dataset kwargs."""
if "uri" in dataset_kwargs:
if uri:
# Fallback to default factory using the provided URI
return create_dataset(dataset_kwargs["uri"])
return Dataset(uri=uri, extra=dataset_extra)

scheme: str = dataset_kwargs.pop("scheme", None)
# scheme: str = dataset_kwargs.pop("scheme", None)
if not scheme:
raise ValueError(
self.log.debug(
"Missing required parameter: either 'uri' or 'scheme' must be provided to create a Dataset."
)
return None

dataset_factory = ProvidersManager().dataset_factories.get(scheme)
if not dataset_factory:
raise ValueError(
f"Unsupported scheme: '{scheme}'. Please provide a valid URI to create a Dataset."
)
self.log.debug("Unsupported scheme: %s. Please provide a valid URI to create a Dataset.", scheme)
return None

return dataset_factory(**dataset_kwargs)
try:
return dataset_factory(**(dataset_kwargs or {}), extra=dataset_extra)
except Exception as e:
self.log.warning("Failed to create dataset. Skipping. Error: %s", e)
return None

def add_input_dataset(self, dataset_kwargs: dict, hook: LineageContext):
def add_input_dataset(
self,
hook: LineageContext,
scheme: str,
uri: str | None = None,
dataset_kwargs: dict | None = None,
dataset_extra: dict | None = None,
):
"""Add the input dataset and its corresponding hook execution context to the collector."""
dataset = self.create_dataset(dataset_kwargs)
self.inputs.append((dataset, hook))
dataset = self.create_dataset(
scheme=scheme, uri=uri, dataset_kwargs=dataset_kwargs, dataset_extra=dataset_extra
)
if dataset:
self.inputs.append((dataset, hook))

def add_output_dataset(self, dataset_kwargs: dict, hook: LineageContext):
def add_output_dataset(
self,
hook: LineageContext,
scheme: str,
uri: str | None = None,
dataset_kwargs: dict | None = None,
dataset_extra: dict | None = None,
):
"""Add the output dataset and its corresponding hook execution context to the collector."""
dataset = self.create_dataset(dataset_kwargs)
self.outputs.append((dataset, hook))
dataset = self.create_dataset(
scheme=scheme, uri=uri, dataset_kwargs=dataset_kwargs, dataset_extra=dataset_extra
)
if dataset:
self.outputs.append((dataset, hook))

@property
def collected_datasets(self) -> HookLineage:
Expand Down Expand Up @@ -112,7 +137,9 @@ def add_output_dataset(self, *_):
def collected_datasets(
self,
) -> HookLineage:
self.log.warning("You should not call this as there's no reader.")
self.log.warning(
"Data lineage tracking is disabled. Register a hook lineage reader to start tracking hook lineage."
)
return HookLineage([], [])


Expand All @@ -132,8 +159,11 @@ def get_hook_lineage_collector() -> HookLineageCollector:
"""Get singleton lineage collector."""
global _hook_lineage_collector
if not _hook_lineage_collector:
# is there a better why how to use noop?
if ProvidersManager().hook_lineage_readers:
from airflow import plugins_manager

plugins_manager.initialize_hook_lineage_readers_plugins()
print("DUPA", plugins_manager.hook_lineage_reader_classes)
if plugins_manager.hook_lineage_reader_classes:
_hook_lineage_collector = HookLineageCollector()
else:
_hook_lineage_collector = NoOpCollector()
Expand Down
27 changes: 27 additions & 0 deletions airflow/plugins_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import os
import sys
import types
from cgitb import Hook
from pathlib import Path
from typing import TYPE_CHECKING, Any, Iterable

Expand All @@ -41,6 +42,8 @@
from airflow.utils.module_loading import import_string, qualname

if TYPE_CHECKING:
from airflow.lineage.hook import HookLineageReader

try:
import importlib_metadata as metadata
except ImportError:
Expand Down Expand Up @@ -75,6 +78,7 @@
registered_operator_link_classes: dict[str, type] | None = None
registered_ti_dep_classes: dict[str, type] | None = None
timetable_classes: dict[str, type[Timetable]] | None = None
hook_lineage_reader_classes: list[type[Hook]] | None = None
priority_weight_strategy_classes: dict[str, type[PriorityWeightStrategy]] | None = None
"""
Mapping of class names to class of OperatorLinks registered by plugins.
Expand Down Expand Up @@ -176,8 +180,12 @@ class AirflowPlugin:
# A list of timetable classes that can be used for DAG scheduling.
timetables: list[type[Timetable]] = []

# A list of listeners that can be used for tracking task and DAG states.
listeners: list[ModuleType | object] = []

# A list of hook lineage reader classes that can be used for reading lineage information from a hook.
hook_lineage_readers: list[type[HookLineageReader]] = []

# A list of priority weight strategy classes that can be used for calculating tasks weight priority.
priority_weight_strategies: list[type[PriorityWeightStrategy]] = []

Expand Down Expand Up @@ -483,6 +491,25 @@ def initialize_timetables_plugins():
}


def initialize_hook_lineage_readers_plugins():
"""Collect hook lineage reader classes registered by plugins."""
global hook_lineage_reader_classes

if hook_lineage_reader_classes is not None:
return

ensure_plugins_loaded()

if plugins is None:
raise AirflowPluginException("Can't load plugins.")

log.debug("Initialize hook lineage readers plugins")

hook_lineage_reader_classes = []
for plugin in plugins:
hook_lineage_reader_classes.extend(plugin.hook_lineage_readers)


def integrate_executor_plugins() -> None:
"""Integrate executor plugins to the context."""
global plugins
Expand Down
7 changes: 0 additions & 7 deletions airflow/provider.yaml.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -220,13 +220,6 @@
}
}
},
"hook-lineage-readers": {
"type": "array",
"description": "Hook lineage readers",
"items": {
"type": "string"
}
},
"transfers": {
"type": "array",
"items": {
Expand Down
42 changes: 12 additions & 30 deletions airflow/providers_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,6 @@ def __init__(self):
self._fs_set: set[str] = set()
self._dataset_uri_handlers: dict[str, Callable[[SplitResult], SplitResult]] = {}
self._dataset_factories: dict[str, Callable[..., Dataset]] = {}
self._hook_lineage_readers: set[str] = set()
self._taskflow_decorators: dict[str, Callable] = LazyDictWithCache() # type: ignore[assignment]
# keeps mapping between connection_types and hook class, package they come from
self._hook_provider_dict: dict[str, HookClassProvider] = {}
Expand Down Expand Up @@ -526,17 +525,11 @@ def initialize_providers_filesystems(self):
self._discover_filesystems()

@provider_info_cache("dataset_uris")
def initialize_providers_dataset_uri_handlers(self):
def initialize_providers_dataset_uri_handlers_and_factories(self):
"""Lazy initialization of provider dataset URI handlers."""
self.initialize_providers_list()
self._discover_dataset_uri_handlers_and_factories()

@provider_info_cache("hook_lineage_readers")
def initialize_providers_hook_lineage_readers(self):
"""Lazy initialization of providers hook lineage readers."""
self.initialize_providers_list()
self._discover_hook_lineage_readers()

@provider_info_cache("hook_lineage_writers")
@provider_info_cache("taskflow_decorators")
def initialize_providers_taskflow_decorator(self):
Expand Down Expand Up @@ -574,7 +567,7 @@ def initialize_providers_notifications(self):
self.initialize_providers_list()
self._discover_notifications()

@provider_info_cache(cache_name="auth_managers")
@provider_info_cache("auth_managers")
def initialize_providers_auth_managers(self):
"""Lazy initialization of providers notifications information."""
self.initialize_providers_list()
Expand Down Expand Up @@ -889,34 +882,28 @@ def _discover_filesystems(self) -> None:
self._fs_set = set(sorted(self._fs_set))

def _discover_dataset_uri_handlers_and_factories(self) -> None:
from airflow.datasets import create_dataset, normalize_noop
from airflow.datasets import normalize_noop

for provider_package, provider in self._provider_dict.items():
for handler_info in provider.data.get("dataset-uris", []):
try:
schemes = handler_info["schemes"]
handler_path = handler_info["handler"]
factory_path = handler_info["factory"]
except KeyError:
continue
if handler_path is None:
handler = normalize_noop
if factory_path is None:
factory = create_dataset
elif not (handler := _correctness_check(provider_package, handler_path, provider)) or not (
factory := _correctness_check(provider_package, factory_path, provider)
):
elif not (handler := _correctness_check(provider_package, handler_path, provider)):
continue
self._dataset_uri_handlers.update((scheme, handler) for scheme in schemes)
factory_path = handler_info.get("factory")
if not (
factory_path is not None
and (factory := _correctness_check(provider_package, factory_path, provider))
):
continue
self._dataset_factories.update((scheme, factory) for scheme in schemes)

def _discover_hook_lineage_readers(self) -> None:
for provider_package, provider in self._provider_dict.items():
for hook_lineage_reader in provider.data.get("hook-lineage-readers", []):
if _correctness_check(provider_package, hook_lineage_reader, provider):
self._hook_lineage_readers.add(hook_lineage_reader)
self._fs_set = set(sorted(self._fs_set))

def _discover_taskflow_decorators(self) -> None:
for name, info in self._provider_dict.items():
for taskflow_decorator in info.data.get("task-decorators", []):
Expand Down Expand Up @@ -1314,19 +1301,14 @@ def filesystem_module_names(self) -> list[str]:

@property
def dataset_factories(self) -> dict[str, Callable[..., Dataset]]:
self.initialize_providers_dataset_uri_handlers()
self.initialize_providers_dataset_uri_handlers_and_factories()
return self._dataset_factories

@property
def dataset_uri_handlers(self) -> dict[str, Callable[[SplitResult], SplitResult]]:
self.initialize_providers_dataset_uri_handlers()
self.initialize_providers_dataset_uri_handlers_and_factories()
return self._dataset_uri_handlers

@property
def hook_lineage_readers(self) -> list[str]:
self.initialize_providers_hook_lineage_readers()
return sorted(self._hook_lineage_readers)

@property
def provider_configs(self) -> list[tuple[str, dict[str, Any]]]:
self.initialize_providers_configuration()
Expand Down
42 changes: 42 additions & 0 deletions docs/apache-airflow/administration-and-deployment/lineage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,48 @@ has outlets defined (e.g. by using ``add_outlets(..)`` or has out of the box sup
.. _precedence: https://docs.python.org/3/reference/expressions.html

Hook Lineage
------------

Airflow provides a powerful feature for tracking data lineage not only between tasks but also from hooks used within those tasks.
This functionality helps you understand how data flows throughout your Airflow pipelines.

A global instance of ``HookLineageCollector`` serves as the central hub for collecting lineage information.
Hooks can send details about datasets they interact with to this collector.
The collector then uses this data to construct AIP-60 compliant Datasets, a standard format for describing datasets.

.. code-block:: python
from airflow.lineage.hook_lineage import get_hook_lineage_collector
class CustomHook(BaseHook):
def run(self):
# run actual code
collector = get_hook_lineage_collector()
collector.add_input_dataset(self, dataset_kwargs={"scheme": "file", "path": "/tmp/in"})
collector.add_output_dataset(self, dataset_kwargs={"scheme": "file", "path": "/tmp/out"})
Lineage data collected by the ``HookLineageCollector`` can be accessed using an instance of ``HookLineageReader``.

.. code-block:: python
from airflow.lineage.hook_lineage import HookLineageReader
from airflow.plugins_manager import AirflowPlugin
class CustomHookLineageReader(HookLineageReader):
def get_inputs(self):
return self.lineage_collector.collected_datasets.inputs
class HookLineageCollectionPlugin(AirflowPlugin):
name = "HookLineageCollectionPlugin"
hook_lineage_readers = [CustomHookLineageReader]
If no ``HookLineageReader`` is registered within Airflow, a default ``NoOpCollector`` is used instead.
This collector does not create AIP-60 compliant datasets or collect lineage information.


Lineage Backend
---------------
Expand Down
Loading

0 comments on commit 5f6c253

Please sign in to comment.