Skip to content

Commit

Permalink
chore: Add more OpenLineage logs to facilitate debugging
Browse files Browse the repository at this point in the history
Signed-off-by: Kacper Muda <[email protected]>
  • Loading branch information
kacpermuda committed May 9, 2024
1 parent 4ada175 commit ec0786d
Show file tree
Hide file tree
Showing 8 changed files with 103 additions and 16 deletions.
17 changes: 10 additions & 7 deletions airflow/providers/openlineage/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@
_CONFIG_SECTION = "openlineage"


def _is_true(arg) -> bool:
return str(arg).lower().strip() in ("true", "1", "t")


@cache
def config_path(check_legacy_env_var: bool = True) -> str:
"""[openlineage] config_path."""
Expand All @@ -41,7 +45,8 @@ def is_source_enabled() -> bool:
option = conf.get(_CONFIG_SECTION, "disable_source_code", fallback="")
if not option:
option = os.getenv("OPENLINEAGE_AIRFLOW_DISABLE_SOURCE_CODE", "")
return option.lower() not in ("true", "1", "t")
# when disable_source_code is True, is_source_enabled() should be False
return not _is_true(option)


@cache
Expand All @@ -53,7 +58,9 @@ def disabled_operators() -> set[str]:

@cache
def selective_enable() -> bool:
return conf.getboolean(_CONFIG_SECTION, "selective_enable", fallback=False)
"""[openlineage] selective_enable."""
option = conf.get(_CONFIG_SECTION, "selective_enable", fallback="")
return _is_true(option)


@cache
Expand Down Expand Up @@ -85,11 +92,7 @@ def transport() -> dict[str, Any]:

@cache
def is_disabled() -> bool:
"""[openlineage] disabled + some extra checks."""

def _is_true(val):
return str(val).lower().strip() in ("true", "1", "t")

"""[openlineage] disabled + check if any configuration is present."""
option = conf.get(_CONFIG_SECTION, "disabled", fallback="")
if _is_true(option):
return True
Expand Down
7 changes: 7 additions & 0 deletions airflow/providers/openlineage/extractors/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ def get_operator_classnames(cls) -> list[str]:
def _execute_extraction(self) -> OperatorLineage | None:
# OpenLineage methods are optional - if there's no method, return None
try:
self.log.debug(
"Trying to execute `get_openlineage_facets_on_start` for %s.", self.operator.task_type
)
return self._get_openlineage_facets(self.operator.get_openlineage_facets_on_start) # type: ignore
except ImportError:
self.log.error(
Expand All @@ -105,9 +108,13 @@ def extract_on_complete(self, task_instance) -> OperatorLineage | None:
if task_instance.state == TaskInstanceState.FAILED:
on_failed = getattr(self.operator, "get_openlineage_facets_on_failure", None)
if on_failed and callable(on_failed):
self.log.debug(
"Executing `get_openlineage_facets_on_failure` for %s.", self.operator.task_type
)
return self._get_openlineage_facets(on_failed, task_instance)
on_complete = getattr(self.operator, "get_openlineage_facets_on_complete", None)
if on_complete and callable(on_complete):
self.log.debug("Executing `get_openlineage_facets_on_complete` for %s.", self.operator.task_type)
return self._get_openlineage_facets(on_complete, task_instance)
return self.extract()

Expand Down
4 changes: 4 additions & 0 deletions airflow/providers/openlineage/extractors/bash.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ def _execute_extraction(self) -> OperatorLineage | None:
source=self.operator.bash_command,
)
}
else:
self.log.debug(
"OpenLineage disable_source_code option is on - no source code is extracted.",
)

return OperatorLineage(
job_facets=job_facets,
Expand Down
8 changes: 7 additions & 1 deletion airflow/providers/openlineage/extractors/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,18 @@ def __init__(self):
for operator_class in extractor.get_operator_classnames():
if operator_class in self.extractors:
self.log.debug(
"Duplicate extractor found for `%s`. `%s` will be used instead of `%s`",
"Duplicate OpenLineage custom extractor found for `%s`. "
"`%s` will be used instead of `%s`",
operator_class,
extractor_path,
self.extractors[operator_class],
)
self.extractors[operator_class] = extractor
self.log.debug(
"Registered custom OpenLineage extractor `%s` for class `%s`",
extractor_path,
operator_class,
)

def add_extractor(self, operator_class: str, extractor: type[BaseExtractor]):
self.extractors[operator_class] = extractor
Expand Down
5 changes: 5 additions & 0 deletions airflow/providers/openlineage/extractors/python.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ def _execute_extraction(self) -> OperatorLineage | None:
source=source_code,
)
}
else:
self.log.debug(
"OpenLineage disable_source_code option is on - no source code is extracted.",
)

return OperatorLineage(
job_facets=job_facet,
# The PythonOperator is recorded as an "unknownSource" even though we have an extractor,
Expand Down
17 changes: 16 additions & 1 deletion airflow/providers/openlineage/plugins/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,16 @@ def get_or_create_openlineage_client(self) -> OpenLineageClient:
if not self._client:
config = self.get_openlineage_config()
if config:
self.log.debug(
"OpenLineage configuration found. Transport type: `%s`",
config.get("type", "no type provided"),
)
self._client = OpenLineageClient.from_dict(config=config)
else:
self.log.debug(
"OpenLineage configuration not found directly in Airflow. "
"Looking for legacy environment configuration. "
)
self._client = OpenLineageClient.from_environment()
return self._client

Expand All @@ -85,13 +93,19 @@ def get_openlineage_config(self) -> dict | None:
config = self._read_yaml_config(openlineage_config_path)
if config:
return config.get("transport", None)
self.log.debug("OpenLineage config file is empty: `%s`", openlineage_config_path)
else:
self.log.debug("OpenLineage config_path configuration not found.")

# Second, try to get transport config
transport_config = conf.transport()
if not transport_config:
self.log.debug("OpenLineage transport configuration not found.")
return None
return transport_config

def _read_yaml_config(self, path: str) -> dict | None:
@staticmethod
def _read_yaml_config(path: str) -> dict | None:
with open(path) as config_file:
return yaml.safe_load(config_file)

Expand Down Expand Up @@ -125,6 +139,7 @@ def emit(self, event: RunEvent):
stack.enter_context(Stats.timer(f"ol.emit.attempts.{event_type}.{transport_type}"))
stack.enter_context(Stats.timer("ol.emit.attempts"))
self._client.emit(redacted_event)
self.log.debug("Successfully emitted OpenLineage event of id %s", event.run.runId)
except Exception as e:
Stats.incr("ol.emit.failed")
self.log.warning("Failed to emit OpenLineage event of id %s", event.run.runId)
Expand Down
59 changes: 53 additions & 6 deletions airflow/providers/openlineage/plugins/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,19 @@ def on_task_instance_running(
dag = task.dag
if is_operator_disabled(task):
self.log.debug(
"Skipping OpenLineage event emission for operator %s "
"Skipping OpenLineage event emission for operator `%s` "
"due to its presence in [openlineage] disabled_for_operators.",
task.task_type,
)
return None
return

if not is_selective_lineage_enabled(task):
self.log.debug(
"Skipping OpenLineage event emission for task `%s` "
"due to lack of explicit lineage enablement for task or DAG while "
"[openlineage] selective_enable is on.",
task.task_id,
)
return

@print_warning(self.log)
Expand Down Expand Up @@ -157,15 +163,22 @@ def on_task_instance_success(self, previous_state, task_instance: TaskInstance,
if TYPE_CHECKING:
assert task
dag = task.dag

if is_operator_disabled(task):
self.log.debug(
"Skipping OpenLineage event emission for operator %s "
"Skipping OpenLineage event emission for operator `%s` "
"due to its presence in [openlineage] disabled_for_operators.",
task.task_type,
)
return None
return

if not is_selective_lineage_enabled(task):
self.log.debug(
"Skipping OpenLineage event emission for task `%s` "
"due to lack of explicit lineage enablement for task or DAG while "
"[openlineage] selective_enable is on.",
task.task_id,
)
return

@print_warning(self.log)
Expand Down Expand Up @@ -212,15 +225,22 @@ def on_task_instance_failed(self, previous_state, task_instance: TaskInstance, s
if TYPE_CHECKING:
assert task
dag = task.dag

if is_operator_disabled(task):
self.log.debug(
"Skipping OpenLineage event emission for operator %s "
"Skipping OpenLineage event emission for operator `%s` "
"due to its presence in [openlineage] disabled_for_operators.",
task.task_type,
)
return None
return

if not is_selective_lineage_enabled(task):
self.log.debug(
"Skipping OpenLineage event emission for task `%s` "
"due to lack of explicit lineage enablement for task or DAG while "
"[openlineage] selective_enable is on.",
task.task_id,
)
return

@print_warning(self.log)
Expand Down Expand Up @@ -277,7 +297,18 @@ def before_stopping(self, component):
@hookimpl
def on_dag_run_running(self, dag_run: DagRun, msg: str):
if dag_run.dag and not is_selective_lineage_enabled(dag_run.dag):
self.log.debug(
"Skipping OpenLineage event emission for DAG `%s` "
"due to lack of explicit lineage enablement for DAG while "
"[openlineage] selective_enable is on.",
dag_run.dag_id,
)
return

if not self.executor:
self.log.debug("Executor have not started before `on_dag_run_running`")
return

data_interval_start = dag_run.data_interval_start.isoformat() if dag_run.data_interval_start else None
data_interval_end = dag_run.data_interval_end.isoformat() if dag_run.data_interval_end else None
self.executor.submit(
Expand All @@ -291,19 +322,35 @@ def on_dag_run_running(self, dag_run: DagRun, msg: str):
@hookimpl
def on_dag_run_success(self, dag_run: DagRun, msg: str):
if dag_run.dag and not is_selective_lineage_enabled(dag_run.dag):
self.log.debug(
"Skipping OpenLineage event emission for DAG `%s` "
"due to lack of explicit lineage enablement for DAG while "
"[openlineage] selective_enable is on.",
dag_run.dag_id,
)
return

if not self.executor:
self.log.debug("Executor have not started before `on_dag_run_success`")
return

self.executor.submit(self.adapter.dag_success, dag_run=dag_run, msg=msg)

@hookimpl
def on_dag_run_failed(self, dag_run: DagRun, msg: str):
if dag_run.dag and not is_selective_lineage_enabled(dag_run.dag):
self.log.debug(
"Skipping OpenLineage event emission for DAG `%s` "
"due to lack of explicit lineage enablement for DAG while "
"[openlineage] selective_enable is on.",
dag_run.dag_id,
)
return

if not self.executor:
self.log.debug("Executor have not started before `on_dag_run_failed`")
return

self.executor.submit(self.adapter.dag_failed, dag_run=dag_run, msg=msg)


Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/openlineage/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ config:
Disable the inclusion of source code in OpenLineage events by setting this to `true`.
By default, several Operators (e.g. Python, Bash) will include their source code in the events
unless disabled.
default: ~
default: "False"
example: ~
type: boolean
version_added: ~

0 comments on commit ec0786d

Please sign in to comment.