Skip to content

Commit

Permalink
chore: Add more OpenLineage logs to facilitate debugging
Browse files Browse the repository at this point in the history
Signed-off-by: Kacper Muda <[email protected]>
  • Loading branch information
kacpermuda committed Apr 29, 2024
1 parent 6112745 commit a7a063b
Show file tree
Hide file tree
Showing 7 changed files with 104 additions and 15 deletions.
20 changes: 13 additions & 7 deletions airflow/providers/openlineage/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@
_CONFIG_SECTION = "openlineage"


def _is_true(arg) -> bool:
return str(arg).lower().strip() in ("true", "1", "t")


@cache
def config_path(check_legacy_env_var: bool = True) -> str:
"""[openlineage] config_path."""
Expand All @@ -41,7 +45,9 @@ def is_source_enabled() -> bool:
option = conf.get(_CONFIG_SECTION, "disable_source_code", fallback="")
if not option:
option = os.getenv("OPENLINEAGE_AIRFLOW_DISABLE_SOURCE_CODE", "")
return option.lower() not in ("true", "1", "t")
if _is_true(option):
return True
return False


@cache
Expand All @@ -53,7 +59,11 @@ def disabled_operators() -> set[str]:

@cache
def selective_enable() -> bool:
return conf.getboolean(_CONFIG_SECTION, "selective_enable", fallback=False)
"""[openlineage] selective_enable."""
option = conf.get(_CONFIG_SECTION, "selective_enable", fallback="")
if _is_true(option):
return True
return False


@cache
Expand Down Expand Up @@ -85,11 +95,7 @@ def transport() -> dict[str, Any]:

@cache
def is_disabled() -> bool:
"""[openlineage] disabled + some extra checks."""

def _is_true(val):
return str(val).lower().strip() in ("true", "1", "t")

"""[openlineage] disabled + check if any configuration is present."""
option = conf.get(_CONFIG_SECTION, "disabled", fallback="")
if _is_true(option):
return True
Expand Down
7 changes: 7 additions & 0 deletions airflow/providers/openlineage/extractors/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ def get_operator_classnames(cls) -> list[str]:
def _execute_extraction(self) -> OperatorLineage | None:
# OpenLineage methods are optional - if there's no method, return None
try:
self.log.debug(
"Trying to execute `get_openlineage_facets_on_start` for %s.", self.operator.task_type
)
return self._get_openlineage_facets(self.operator.get_openlineage_facets_on_start) # type: ignore
except ImportError:
self.log.error(
Expand All @@ -105,9 +108,13 @@ def extract_on_complete(self, task_instance) -> OperatorLineage | None:
if task_instance.state == TaskInstanceState.FAILED:
on_failed = getattr(self.operator, "get_openlineage_facets_on_failure", None)
if on_failed and callable(on_failed):
self.log.debug(
"Executing `get_openlineage_facets_on_failure` for %s.", self.operator.task_type
)
return self._get_openlineage_facets(on_failed, task_instance)
on_complete = getattr(self.operator, "get_openlineage_facets_on_complete", None)
if on_complete and callable(on_complete):
self.log.debug("Executing `get_openlineage_facets_on_complete` for %s.", self.operator.task_type)
return self._get_openlineage_facets(on_complete, task_instance)
return self.extract()

Expand Down
4 changes: 4 additions & 0 deletions airflow/providers/openlineage/extractors/bash.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ def _execute_extraction(self) -> OperatorLineage | None:
source=self.operator.bash_command,
)
}
else:
self.log.debug(
"OpenLineage disable_source_code option is on - no source code is extracted.",
)

return OperatorLineage(
job_facets=job_facets,
Expand Down
8 changes: 7 additions & 1 deletion airflow/providers/openlineage/extractors/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,18 @@ def __init__(self):
for operator_class in extractor.get_operator_classnames():
if operator_class in self.extractors:
self.log.debug(
"Duplicate extractor found for `%s`. `%s` will be used instead of `%s`",
"Duplicate OpenLineage custom extractor found for `%s`. "
"`%s` will be used instead of `%s`",
operator_class,
extractor_path,
self.extractors[operator_class],
)
self.extractors[operator_class] = extractor
self.log.debug(
"Registered custom OpenLineage extractor `%s` for class `%s`",
extractor_path,
operator_class,
)

def add_extractor(self, operator_class: str, extractor: type[BaseExtractor]):
self.extractors[operator_class] = extractor
Expand Down
5 changes: 5 additions & 0 deletions airflow/providers/openlineage/extractors/python.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ def _execute_extraction(self) -> OperatorLineage | None:
source=source_code,
)
}
else:
self.log.debug(
"OpenLineage disable_source_code option is on - no source code is extracted.",
)

return OperatorLineage(
job_facets=job_facet,
# The PythonOperator is recorded as an "unknownSource" even though we have an extractor,
Expand Down
16 changes: 15 additions & 1 deletion airflow/providers/openlineage/plugins/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,16 @@ def get_or_create_openlineage_client(self) -> OpenLineageClient:
if not self._client:
config = self.get_openlineage_config()
if config:
self.log.debug(
"OpenLineage configuration found. Transport type: %s",
config.get("type", "no type provided"),
)
self._client = OpenLineageClient.from_dict(config=config)
else:
self.log.debug(
"OpenLineage configuration not found directly in Airflow. "
"Looking for legacy Python Client configuration. "
)
self._client = OpenLineageClient.from_environment()
return self._client

Expand All @@ -85,13 +93,19 @@ def get_openlineage_config(self) -> dict | None:
config = self._read_yaml_config(openlineage_config_path)
if config:
return config.get("transport", None)
self.log.debug("OpenLineage config file is empty: %s", openlineage_config_path)
else:
self.log.debug("OpenLineage config_path configuration not found.")

# Second, try to get transport config
transport_config = conf.transport()
if not transport_config:
self.log.debug("OpenLineage transport configuration not found.")
return None
return transport_config

def _read_yaml_config(self, path: str) -> dict | None:
@staticmethod
def _read_yaml_config(path: str) -> dict | None:
with open(path) as config_file:
return yaml.safe_load(config_file)

Expand Down
59 changes: 53 additions & 6 deletions airflow/providers/openlineage/plugins/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,19 @@ def on_task_instance_running(
dag = task.dag
if is_operator_disabled(task):
self.log.debug(
"Skipping OpenLineage event emission for operator %s "
"Skipping OpenLineage event emission for operator `%s` "
"due to its presence in [openlineage] disabled_for_operators.",
task.task_type,
)
return None
return

if not is_selective_lineage_enabled(task):
self.log.debug(
"Skipping OpenLineage event emission for task `%s` "
"due to lack of explicit lineage enablement for task or DAG while "
"[openlineage] selective_enable is on.",
task.task_id,
)
return

@print_warning(self.log)
Expand Down Expand Up @@ -146,15 +152,22 @@ def on_task_instance_success(self, previous_state, task_instance: TaskInstance,
if TYPE_CHECKING:
assert task
dag = task.dag

if is_operator_disabled(task):
self.log.debug(
"Skipping OpenLineage event emission for operator %s "
"Skipping OpenLineage event emission for operator `%s` "
"due to its presence in [openlineage] disabled_for_operators.",
task.task_type,
)
return None
return

if not is_selective_lineage_enabled(task):
self.log.debug(
"Skipping OpenLineage event emission for task `%s` "
"due to lack of explicit lineage enablement for task or DAG while "
"[openlineage] selective_enable is on.",
task.task_id,
)
return

@print_warning(self.log)
Expand Down Expand Up @@ -201,15 +214,22 @@ def on_task_instance_failed(self, previous_state, task_instance: TaskInstance, s
if TYPE_CHECKING:
assert task
dag = task.dag

if is_operator_disabled(task):
self.log.debug(
"Skipping OpenLineage event emission for operator %s "
"Skipping OpenLineage event emission for operator `%s` "
"due to its presence in [openlineage] disabled_for_operators.",
task.task_type,
)
return None
return

if not is_selective_lineage_enabled(task):
self.log.debug(
"Skipping OpenLineage event emission for task `%s` "
"due to lack of explicit lineage enablement for task or DAG while "
"[openlineage] selective_enable is on.",
task.task_id,
)
return

@print_warning(self.log)
Expand Down Expand Up @@ -266,7 +286,18 @@ def before_stopping(self, component):
@hookimpl
def on_dag_run_running(self, dag_run: DagRun, msg: str):
if dag_run.dag and not is_selective_lineage_enabled(dag_run.dag):
self.log.debug(
"Skipping OpenLineage event emission for DAG `%s` "
"due to lack of explicit lineage enablement for DAG while "
"[openlineage] selective_enable is on.",
dag_run.dag_id,
)
return

if not self.executor:
self.log.debug("Executor have not started before `on_dag_run_running`")
return

data_interval_start = dag_run.data_interval_start.isoformat() if dag_run.data_interval_start else None
data_interval_end = dag_run.data_interval_end.isoformat() if dag_run.data_interval_end else None
self.executor.submit(
Expand All @@ -280,19 +311,35 @@ def on_dag_run_running(self, dag_run: DagRun, msg: str):
@hookimpl
def on_dag_run_success(self, dag_run: DagRun, msg: str):
if dag_run.dag and not is_selective_lineage_enabled(dag_run.dag):
self.log.debug(
"Skipping OpenLineage event emission for DAG `%s` "
"due to lack of explicit lineage enablement for DAG while "
"[openlineage] selective_enable is on.",
dag_run.dag_id,
)
return

if not self.executor:
self.log.debug("Executor have not started before `on_dag_run_success`")
return

self.executor.submit(self.adapter.dag_success, dag_run=dag_run, msg=msg)

@hookimpl
def on_dag_run_failed(self, dag_run: DagRun, msg: str):
if dag_run.dag and not is_selective_lineage_enabled(dag_run.dag):
self.log.debug(
"Skipping OpenLineage event emission for DAG `%s` "
"due to lack of explicit lineage enablement for DAG while "
"[openlineage] selective_enable is on.",
dag_run.dag_id,
)
return

if not self.executor:
self.log.debug("Executor have not started before `on_dag_run_failed`")
return

self.executor.submit(self.adapter.dag_failed, dag_run=dag_run, msg=msg)


Expand Down

0 comments on commit a7a063b

Please sign in to comment.