Skip to content

Commit

Permalink
Don't dispose sqlalchemy engine when using internal api (apache#38562)
Browse files Browse the repository at this point in the history
  • Loading branch information
dstandish authored and utkarsharma2 committed Apr 22, 2024
1 parent 2352bc8 commit 965b681
Showing 1 changed file with 5 additions and 2 deletions.
7 changes: 5 additions & 2 deletions airflow/providers/celery/executors/celery_executor_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from sqlalchemy import select

import airflow.settings as settings
from airflow.api_internal.internal_api_call import InternalApiConfig
from airflow.configuration import conf
from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning, AirflowTaskTimeout
from airflow.executors.base_executor import BaseExecutor
Expand Down Expand Up @@ -155,8 +156,9 @@ def _execute_in_fork(command_to_exec: CommandType, celery_task_id: str | None =
try:
from airflow.cli.cli_parser import get_parser

settings.engine.pool.dispose()
settings.engine.dispose()
if not InternalApiConfig.get_use_internal_api():
settings.engine.pool.dispose()
settings.engine.dispose()

parser = get_parser()
# [1:] - remove "airflow" from the start of the command
Expand All @@ -166,6 +168,7 @@ def _execute_in_fork(command_to_exec: CommandType, celery_task_id: str | None =
args.external_executor_id = celery_task_id

setproctitle(f"airflow task supervisor: {command_to_exec}")
log.debug("calling func '%s' with args %s", args.func.__name__, args)
args.func(args)
ret = 0
except Exception:
Expand Down

0 comments on commit 965b681

Please sign in to comment.