Skip to content

Commit

Permalink
Ensure __exit__ is called in decorator context managers (apache#38383)
Browse files Browse the repository at this point in the history
In apache#36800 author fixed zombie scheduler issue arising from context manager exit not being called, thus sub process not getting terminated.  It was fixed by explicitly calling the `close` function on an ExitStack-managed context manager.  Simpler / better / cleaner / more standard solution is to "fix" the underlying context managers by wrapping the yield in a try / finally.
  • Loading branch information
dstandish authored and utkarsharma2 committed Apr 22, 2024
1 parent 66b9c38 commit 9ed5d86
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 28 deletions.
8 changes: 5 additions & 3 deletions airflow/cli/commands/celery_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,11 @@ def _serve_logs(skip_serve_logs: bool = False):
if skip_serve_logs is False:
sub_proc = Process(target=serve_logs)
sub_proc.start()
yield
if sub_proc:
sub_proc.terminate()
try:
yield
finally:
if sub_proc:
sub_proc.terminate()


@after_setup_logger.connect()
Expand Down
32 changes: 13 additions & 19 deletions airflow/cli/commands/scheduler_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import logging
from argparse import Namespace
from contextlib import ExitStack, contextmanager
from contextlib import contextmanager
from multiprocessing import Process

from airflow import settings
Expand All @@ -45,18 +45,8 @@ def _run_scheduler_job(args) -> None:
ExecutorLoader.validate_database_executor_compatibility(job_runner.job.executor)
InternalApiConfig.force_database_direct_access()
enable_health_check = conf.getboolean("scheduler", "ENABLE_HEALTH_CHECK")
with ExitStack() as stack:
stack.enter_context(_serve_logs(args.skip_serve_logs))
stack.enter_context(_serve_health_check(enable_health_check))

try:
run_job(job=job_runner.job, execute_callable=job_runner._execute)
except Exception:
log.exception("Exception when running scheduler job")
raise
finally:
# Ensure that the contexts are closed
stack.close()
with _serve_logs(args.skip_serve_logs), _serve_health_check(enable_health_check):
run_job(job=job_runner.job, execute_callable=job_runner._execute)


@cli_utils.action_cli
Expand Down Expand Up @@ -84,9 +74,11 @@ def _serve_logs(skip_serve_logs: bool = False):
if skip_serve_logs is False:
sub_proc = Process(target=serve_logs)
sub_proc.start()
yield
if sub_proc:
sub_proc.terminate()
try:
yield
finally:
if sub_proc:
sub_proc.terminate()


@contextmanager
Expand All @@ -96,6 +88,8 @@ def _serve_health_check(enable_health_check: bool = False):
if enable_health_check:
sub_proc = Process(target=serve_health_check)
sub_proc.start()
yield
if sub_proc:
sub_proc.terminate()
try:
yield
finally:
if sub_proc:
sub_proc.terminate()
8 changes: 5 additions & 3 deletions airflow/providers/celery/cli/celery_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,11 @@ def _serve_logs(skip_serve_logs: bool = False):
if skip_serve_logs is False:
sub_proc = Process(target=serve_logs)
sub_proc.start()
yield
if sub_proc:
sub_proc.terminate()
try:
yield
finally:
if sub_proc:
sub_proc.terminate()


@after_setup_logger.connect()
Expand Down
3 changes: 0 additions & 3 deletions tests/cli/commands/test_scheduler_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,10 +165,8 @@ def test_scheduler_health_host(
@mock.patch("airflow.cli.commands.scheduler_command.SchedulerJobRunner")
@mock.patch("airflow.cli.commands.scheduler_command.Process")
@mock.patch("airflow.cli.commands.scheduler_command.run_job", side_effect=Exception("run_job failed"))
@mock.patch("airflow.cli.commands.scheduler_command.log")
def test_run_job_exception_handling(
self,
mock_log,
mock_run_job,
mock_process,
mock_scheduler_job,
Expand All @@ -183,7 +181,6 @@ def test_run_job_exception_handling(
job=mock_scheduler_job().job,
execute_callable=mock_scheduler_job()._execute,
)
mock_log.exception.assert_called_once_with("Exception when running scheduler job")
mock_process.assert_called_once_with(target=serve_logs)
mock_process().terminate.assert_called_once_with()

Expand Down

0 comments on commit 9ed5d86

Please sign in to comment.