Skip to content

Commit

Permalink
fix: scheduler crashing with OL provider on airflow standalone (apach…
Browse files Browse the repository at this point in the history
…e#40353)

Signed-off-by: Kacper Muda <[email protected]>
  • Loading branch information
kacpermuda authored Jun 21, 2024
1 parent 9dc521e commit fbcee8d
Showing 1 changed file with 13 additions and 7 deletions.
20 changes: 13 additions & 7 deletions airflow/providers/openlineage/plugins/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,18 @@ def _get_try_number_success(val):
return val.try_number - 1


def _executor_initializer():
"""
Initialize worker processes for the executor used for DagRun listener.
This function must be picklable, so it cannot be defined as an inner method or local function.
Reconfigures the ORM engine to prevent issues that arise when multiple processes interact with
the Airflow database.
"""
settings.configure_orm()


class OpenLineageListener:
"""OpenLineage listener sends events on task instance and dag run starts, completes and failures."""

Expand Down Expand Up @@ -366,16 +378,10 @@ def _fork_execute(self, callable, callable_name: str):

@property
def executor(self) -> ProcessPoolExecutor:
# Executor for dag_run listener
def initializer():
# Re-configure the ORM engine as there are issues with multiple processes
# if process calls Airflow DB.
settings.configure_orm()

if not self._executor:
self._executor = ProcessPoolExecutor(
max_workers=conf.dag_state_change_process_pool_size(),
initializer=initializer,
initializer=_executor_initializer(),
)
return self._executor

Expand Down

0 comments on commit fbcee8d

Please sign in to comment.