Skip to content

Commit

Permalink
chore: Add OpenLineage logs to facilitate debugging
Browse files Browse the repository at this point in the history
Signed-off-by: Kacper Muda <[email protected]>
  • Loading branch information
kacpermuda committed Apr 19, 2024
1 parent 90acbfb commit 9c6cf16
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 17 deletions.
35 changes: 28 additions & 7 deletions airflow/providers/openlineage/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,22 @@

from __future__ import annotations

import logging
import os
from typing import Any

from airflow.compat.functools import cache
from airflow.configuration import conf

log = logging.getLogger(__name__)

_CONFIG_SECTION = "openlineage"


def _is_true(arg) -> bool:
return str(arg).lower().strip() in ("true", "1", "t")


@cache
def config_path(check_legacy_env_var: bool = True) -> str:
"""[openlineage] config_path."""
Expand All @@ -41,7 +48,12 @@ def is_source_enabled() -> bool:
option = conf.get(_CONFIG_SECTION, "disable_source_code", fallback="")
if not option:
option = os.getenv("OPENLINEAGE_AIRFLOW_DISABLE_SOURCE_CODE", "")
return option.lower() not in ("true", "1", "t")
if _is_true(option):
log.debug(
"OpenLineage disable_source_code option is on.",
)
return True
return False


@cache
Expand All @@ -53,7 +65,14 @@ def disabled_operators() -> set[str]:

@cache
def selective_enable() -> bool:
return conf.getboolean(_CONFIG_SECTION, "selective_enable", fallback=False)
"""[openlineage] selective_enable."""
option = conf.get(_CONFIG_SECTION, "selective_enable", fallback="")
if _is_true(option):
log.debug(
"OpenLineage selective_enable option is on.",
)
return True
return False


@cache
Expand All @@ -71,6 +90,7 @@ def namespace() -> str:
option = conf.get(_CONFIG_SECTION, "namespace", fallback="")
if not option:
option = os.getenv("OPENLINEAGE_NAMESPACE", "default")
log.debug("OpenLineage namespace is set to: `%s`", option)
return option


Expand All @@ -86,18 +106,19 @@ def transport() -> dict[str, Any]:
@cache
def is_disabled() -> bool:
"""[openlineage] disabled + some extra checks."""

def _is_true(val):
return str(val).lower().strip() in ("true", "1", "t")

option = conf.get(_CONFIG_SECTION, "disabled", fallback="")
if _is_true(option):
log.debug("OpenLineage is disabled with Airflow configuration.")
return True

option = os.getenv("OPENLINEAGE_DISABLED", "")
if _is_true(option):
log.debug("OpenLineage is disabled with legacy env variable.")
return True

# Check if both 'transport' and 'config_path' are not present and also
# if legacy 'OPENLINEAGE_URL' environment variables is not set
return transport() == {} and config_path(True) == "" and os.getenv("OPENLINEAGE_URL", "") == ""
if transport() == {} and config_path(True) == "" and os.getenv("OPENLINEAGE_URL", "") == "":
log.debug("OpenLineage is disabled as no configuration has been provided.")
return True
return False
5 changes: 5 additions & 0 deletions airflow/providers/openlineage/extractors/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ def get_operator_classnames(cls) -> list[str]:
def _execute_extraction(self) -> OperatorLineage | None:
# OpenLineage methods are optional - if there's no method, return None
try:
self.log.debug("Executing `get_openlineage_facets_on_start` for %s.", self.operator.task_type)
return self._get_openlineage_facets(self.operator.get_openlineage_facets_on_start) # type: ignore
except ImportError:
self.log.error(
Expand All @@ -105,9 +106,13 @@ def extract_on_complete(self, task_instance) -> OperatorLineage | None:
if task_instance.state == TaskInstanceState.FAILED:
on_failed = getattr(self.operator, "get_openlineage_facets_on_failure", None)
if on_failed and callable(on_failed):
self.log.debug(
"Executing `get_openlineage_facets_on_failure` for %s.", self.operator.task_type
)
return self._get_openlineage_facets(on_failed, task_instance)
on_complete = getattr(self.operator, "get_openlineage_facets_on_complete", None)
if on_complete and callable(on_complete):
self.log.debug("Executing `get_openlineage_facets_on_complete` for %s.", self.operator.task_type)
return self._get_openlineage_facets(on_complete, task_instance)
return self.extract()

Expand Down
8 changes: 7 additions & 1 deletion airflow/providers/openlineage/extractors/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,18 @@ def __init__(self):
for operator_class in extractor.get_operator_classnames():
if operator_class in self.extractors:
self.log.debug(
"Duplicate extractor found for `%s`. `%s` will be used instead of `%s`",
"Duplicate OpenLineage custom extractor found for `%s`. "
"`%s` will be used instead of `%s`",
operator_class,
extractor_path,
self.extractors[operator_class],
)
self.extractors[operator_class] = extractor
self.log.debug(
"Registered custom OpenLineage extractor `%s` for class `%s`",
extractor_path,
operator_class,
)

def add_extractor(self, operator_class: str, extractor: type[BaseExtractor]):
self.extractors[operator_class] = extractor
Expand Down
16 changes: 15 additions & 1 deletion airflow/providers/openlineage/plugins/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,16 @@ def get_or_create_openlineage_client(self) -> OpenLineageClient:
if not self._client:
config = self.get_openlineage_config()
if config:
self.log.debug(
"OpenLineage configuration found. Transport type: %s",
config.get("type", "no type provided"),
)
self._client = OpenLineageClient.from_dict(config=config)
else:
self.log.debug(
"OpenLineage configuration not found directly in Airflow. "
"Looking for legacy Python Client configuration. "
)
self._client = OpenLineageClient.from_environment()
return self._client

Expand All @@ -85,13 +93,19 @@ def get_openlineage_config(self) -> dict | None:
config = self._read_yaml_config(openlineage_config_path)
if config:
return config.get("transport", None)
self.log.debug("Empty OpenLineage config file: %s", openlineage_config_path)
else:
self.log.debug("OpenLineage config file path configuration not found.")

# Second, try to get transport config
transport_config = conf.transport()
if not transport_config:
self.log.debug("OpenLineage transport configuration not found.")
return None
return transport_config

def _read_yaml_config(self, path: str) -> dict | None:
@staticmethod
def _read_yaml_config(path: str) -> dict | None:
with open(path) as config_file:
return yaml.safe_load(config_file)

Expand Down
62 changes: 55 additions & 7 deletions airflow/providers/openlineage/plugins/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,19 @@ def on_task_instance_running(
dag = task.dag
if is_operator_disabled(task):
self.log.debug(
"Skipping OpenLineage event emission for operator %s "
"Skipping OpenLineage event emission for operator `%s` "
"due to its presence in [openlineage] disabled_for_operators.",
task.task_type,
)
return None
return

if not is_selective_lineage_enabled(task):
self.log.debug(
"Skipping OpenLineage event emission for task `%s` "
"due to lack of explicit lineage enablement for task or DAG while "
"[openlineage] selective_enable is on.",
task.task_id,
)
return

@print_warning(self.log)
Expand Down Expand Up @@ -146,15 +152,22 @@ def on_task_instance_success(self, previous_state, task_instance: TaskInstance,
if TYPE_CHECKING:
assert task
dag = task.dag

if is_operator_disabled(task):
self.log.debug(
"Skipping OpenLineage event emission for operator %s "
"Skipping OpenLineage event emission for operator `%s` "
"due to its presence in [openlineage] disabled_for_operators.",
task.task_type,
)
return None
return

if not is_selective_lineage_enabled(task):
self.log.debug(
"Skipping OpenLineage event emission for task `%s` "
"due to lack of explicit lineage enablement for task or DAG while "
"[openlineage] selective_enable is on.",
task.task_id,
)
return

@print_warning(self.log)
Expand Down Expand Up @@ -201,15 +214,22 @@ def on_task_instance_failed(self, previous_state, task_instance: TaskInstance, s
if TYPE_CHECKING:
assert task
dag = task.dag

if is_operator_disabled(task):
self.log.debug(
"Skipping OpenLineage event emission for operator %s "
"Skipping OpenLineage event emission for operator `%s` "
"due to its presence in [openlineage] disabled_for_operators.",
task.task_type,
)
return None
return

if not is_selective_lineage_enabled(task):
self.log.debug(
"Skipping OpenLineage event emission for task `%s` "
"due to lack of explicit lineage enablement for task or DAG while "
"[openlineage] selective_enable is on.",
task.task_id,
)
return

@print_warning(self.log)
Expand Down Expand Up @@ -266,7 +286,18 @@ def before_stopping(self, component):
@hookimpl
def on_dag_run_running(self, dag_run: DagRun, msg: str):
if dag_run.dag and not is_selective_lineage_enabled(dag_run.dag):
self.log.debug(
"Skipping OpenLineage event emission for DAG `%s` "
"due to lack of explicit lineage enablement for DAG while "
"[openlineage] selective_enable is on.",
dag_run.dag_id,
)
return

if not self.executor:
self.log.debug("Executor have not started before `on_dag_run_running`")
return

data_interval_start = dag_run.data_interval_start.isoformat() if dag_run.data_interval_start else None
data_interval_end = dag_run.data_interval_end.isoformat() if dag_run.data_interval_end else None
self.executor.submit(
Expand All @@ -280,19 +311,36 @@ def on_dag_run_running(self, dag_run: DagRun, msg: str):
@hookimpl
def on_dag_run_success(self, dag_run: DagRun, msg: str):
if dag_run.dag and not is_selective_lineage_enabled(dag_run.dag):
return
if dag_run.dag and not is_selective_lineage_enabled(dag_run.dag):
self.log.debug(
"Skipping OpenLineage event emission for DAG `%s` "
"due to lack of explicit lineage enablement for DAG while "
"[openlineage] selective_enable is on.",
dag_run.dag_id,
)
return

if not self.executor:
self.log.debug("Executor have not started before `on_dag_run_success`")
return

self.executor.submit(self.adapter.dag_success, dag_run=dag_run, msg=msg)

@hookimpl
def on_dag_run_failed(self, dag_run: DagRun, msg: str):
if dag_run.dag and not is_selective_lineage_enabled(dag_run.dag):
self.log.debug(
"Skipping OpenLineage event emission for DAG `%s` "
"due to lack of explicit lineage enablement for DAG while "
"[openlineage] selective_enable is on.",
dag_run.dag_id,
)
return

if not self.executor:
self.log.debug("Executor have not started before `on_dag_run_failed`")
return

self.executor.submit(self.adapter.dag_failed, dag_run=dag_run, msg=msg)


Expand Down
10 changes: 9 additions & 1 deletion airflow/providers/openlineage/plugins/openlineage.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@
# under the License.
from __future__ import annotations

import logging

from airflow.plugins_manager import AirflowPlugin
from airflow.providers.openlineage import conf
from airflow.providers.openlineage import __version__ as OPENLINEAGE_PROVIDER_VERSION, conf
from airflow.providers.openlineage.plugins.listener import get_openlineage_listener
from airflow.providers.openlineage.plugins.macros import (
lineage_job_name,
Expand All @@ -26,6 +28,8 @@
lineage_run_id,
)

log = logging.getLogger(__name__)


class OpenLineageProviderPlugin(AirflowPlugin):
"""
Expand All @@ -36,6 +40,10 @@ class OpenLineageProviderPlugin(AirflowPlugin):
"""

name = "OpenLineageProviderPlugin"
log.debug("Using apache-airflow-providers-openlineage==%s", OPENLINEAGE_PROVIDER_VERSION)
if not conf.is_disabled():
macros = [lineage_job_namespace, lineage_job_name, lineage_run_id, lineage_parent_id]
listeners = [get_openlineage_listener()]
log.debug("OpenLineageProviderPlugin has been loaded successfully.")
else:
log.debug("OpenLineage is disabled. OpenLineageProviderPlugin has not been loaded.")

0 comments on commit 9c6cf16

Please sign in to comment.