Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIRFLOW-5931] Use os.fork when appropriate to speed up task execution. #6627

Merged
merged 3 commits into from
Dec 10, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions airflow/cli/commands/task_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,11 @@ def task_test(args, dag=None):
debugger.post_mortem()
else:
raise
finally:
if not already_has_stream_handler:
ashb marked this conversation as resolved.
Show resolved Hide resolved
# Make sure to reset back to normal. When run for CLI this doesn't
# matter, but it does for test suite
logging.getLogger('airflow.task').propagate = False


@cli_utils.action_logging
Expand Down
80 changes: 75 additions & 5 deletions airflow/task/task_runner/standard_task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,95 @@
# specific language governing permissions and limitations
# under the License.
"""Standard task runner"""

import os

import psutil
from setproctitle import setproctitle

from airflow.task.task_runner.base_task_runner import BaseTaskRunner
from airflow.utils.helpers import reap_process_group

CAN_FORK = hasattr(os, 'fork')


class StandardTaskRunner(BaseTaskRunner):
"""
Standard runner for all tasks.
"""
def __init__(self, local_task_job):
super().__init__(local_task_job)
self._rc = None

def start(self):
self.process = self.run_command()
if CAN_FORK and not self.run_as_user:
self.process = self._start_by_fork()
else:
self.process = self._start_by_exec()

def _start_by_exec(self):
subprocess = self.run_command()
return psutil.Process(subprocess.pid)

def _start_by_fork(self):
pid = os.fork()
if pid:
kaxil marked this conversation as resolved.
Show resolved Hide resolved
self.log.info("Started process %d to run task", pid)
return psutil.Process(pid)
else:
from airflow.bin.cli import get_parser
import signal
import airflow.settings as settings

signal.signal(signal.SIGINT, signal.SIG_DFL)
signal.signal(signal.SIGTERM, signal.SIG_DFL)
# Start a new process group
os.setpgid(0, 0)

# Force a new SQLAlchemy session. We can't share open DB handles
# between process. The cli code will re-create this as part of its
# normal startup
settings.engine.pool.dispose()
settings.engine.dispose()

parser = get_parser()
# [1:] - remove "airflow" from the start of the command
args = parser.parse_args(self._command[1:])
ashb marked this conversation as resolved.
Show resolved Hide resolved
ashb marked this conversation as resolved.
Show resolved Hide resolved

def return_code(self):
return self.process.poll()
proc_title = "airflow task runner: {0.dag_id} {0.task_id} {0.execution_date}"
if hasattr(args, "job_id"):
proc_title += " {0.job_id}"
setproctitle(proc_title.format(args))

try:
args.func(args)
os._exit(0)
except Exception:
os._exit(1)

def return_code(self, timeout=0):
# We call this multiple times, but we can only wait on the process once
if self._rc is not None or not self.process:
return self._rc

try:
self._rc = self.process.wait(timeout=timeout)
self.process = None
except psutil.TimeoutExpired:
pass

return self._rc

def terminate(self):
if self.process and psutil.pid_exists(self.process.pid):
reap_process_group(self.process.pid, self.log)
if self.process is None:
return

if self.process.is_running():
rcs = reap_process_group(self.process.pid, self.log)
self._rc = rcs.get(self.process.pid)

self.process = None

if self._rc is None:
# Something else reaped it before we had a chance, so let's just "guess" at an error code.
self._rc = -9
3 changes: 3 additions & 0 deletions airflow/utils/dag_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -612,6 +612,9 @@ def start(self):
user code.
"""

# Start a new process group
os.setpgid(0, 0)

self.log.info("Processing files using up to %s processes at a time ", self._parallelism)
self.log.info("Process each file at most once every %s seconds", self._file_process_interval)
self.log.info(
Expand Down
77 changes: 44 additions & 33 deletions airflow/utils/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,71 +262,82 @@ def f(t):
return s


def reap_process_group(pid, log, sig=signal.SIGTERM,
def reap_process_group(pgid, log, sig=signal.SIGTERM,
timeout=DEFAULT_TIME_TO_WAIT_AFTER_SIGTERM):
"""
Tries really hard to terminate all children (including grandchildren). Will send
Tries really hard to terminate all processes in the group (including grandchildren). Will send
sig (SIGTERM) to the process group of pid. If any process is alive after timeout
a SIGKILL will be send.

:param log: log handler
:param pid: pid to kill
:param pgid: process group id to kill
:param sig: signal type
:param timeout: how much time a process has to terminate
"""

returncodes = {}

def on_terminate(p):
log.info("Process %s (%s) terminated with exit code %s", p, p.pid, p.returncode)
returncodes[p.pid] = p.returncode

if pid == os.getpid():
raise RuntimeError("I refuse to kill myself")

try:
parent = psutil.Process(pid)
except psutil.NoSuchProcess:
# Race condition - the process already exited
return
def signal_procs(sig):
try:
os.killpg(pgid, sig)
except OSError as err:
# If operation not permitted error is thrown due to run_as_user,
# use sudo -n(--non-interactive) to kill the process
if err.errno == errno.EPERM:
subprocess.check_call(
["sudo", "-n", "kill", "-" + str(sig)] + map(children, lambda p: str(p.pid))
)
else:
raise

children = parent.children(recursive=True)
children.append(parent)
if pgid == os.getpgid(0):
raise RuntimeError("I refuse to kill myself")
ashb marked this conversation as resolved.
Show resolved Hide resolved

try:
pg = os.getpgid(pid)
except OSError as err:
# Skip if not such process - we experience a race and it just terminated
if err.errno == errno.ESRCH:
return
raise
parent = psutil.Process(pgid)

log.info("Sending %s to GPID %s", sig, pg)
children = parent.children(recursive=True)
children.append(parent)
except psutil.NoSuchProcess:
# The process already exited, but maybe it's children haven't.
children = []
for p in psutil.process_iter():
try:
if os.getpgid(p.pid) == pgid and p.pid != 0:
children.append(p)
except OSError:
pass

log.info("Sending %s to GPID %s", sig, pgid)
try:
os.killpg(os.getpgid(pid), sig)
signal_procs(sig)
except OSError as err:
# No such process, which means there is no such process group - our job
# is done
if err.errno == errno.ESRCH:
return
# If operation not permitted error is thrown due to run_as_user,
# use sudo -n(--non-interactive) to kill the process
if err.errno == errno.EPERM:
subprocess.check_call(["sudo", "-n", "kill", "-" + str(sig), str(os.getpgid(pid))])
raise
return returncodes

_, alive = psutil.wait_procs(children, timeout=timeout, callback=on_terminate)

if alive:
for p in alive:
log.warning("process %s (%s) did not respond to SIGTERM. Trying SIGKILL", p, pid)
log.warning("process %s did not respond to SIGTERM. Trying SIGKILL", p)

try:
os.killpg(os.getpgid(pid), signal.SIGKILL)
signal_procs(signal.SIGKILL)
except OSError as err:
if err.errno == errno.ESRCH:
return
raise
if err.errno != errno.ESRCH:
raise

gone, alive = psutil.wait_procs(alive, timeout=timeout, callback=on_terminate)
_, alive = psutil.wait_procs(alive, timeout=timeout, callback=on_terminate)
if alive:
for p in alive:
log.error("Process %s (%s) could not be killed. Giving up.", p, p.pid)
return returncodes


def parse_template_string(template_string):
Expand Down
38 changes: 16 additions & 22 deletions airflow/utils/log/es_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ def __init__(self, base_log_folder, filename_template,
self.json_format = json_format
self.json_fields = [label.strip() for label in json_fields.split(",")]
self.handler = None
self.context_set = False

def _render_log_id(self, ti, try_number):
if self.log_id_jinja_template:
Expand Down Expand Up @@ -200,35 +201,28 @@ def set_context(self, ti):

:param ti: task instance object
"""
super().set_context(ti)
self.mark_end_on_close = not ti.raw

if self.json_format:
self.formatter = JSONFormatter(self.formatter._fmt, json_fields=self.json_fields, extras={
'dag_id': str(ti.dag_id),
'task_id': str(ti.task_id),
'execution_date': self._clean_execution_date(ti.execution_date),
'try_number': str(ti.try_number)
})

if self.write_stdout:
if self.context_set:
# We don't want to re-set up the handler if this logger has
# already been initialized
return

self.handler = logging.StreamHandler(stream=sys.__stdout__)
self.handler.setLevel(self.level)
if self.json_format and not ti.raw:
self.handler.setFormatter(
JSONFormatter(self.formatter._fmt, json_fields=self.json_fields, extras={
'dag_id': str(ti.dag_id),
'task_id': str(ti.task_id),
'execution_date': self._clean_execution_date(ti.execution_date),
'try_number': str(ti.try_number)}))
else:
self.handler.setFormatter(self.formatter)
self.handler.setFormatter(self.formatter)
else:
super().set_context(ti)

def emit(self, record):
if self.write_stdout:
self.formatter.format(record)
if self.handler is not None:
self.handler.emit(record)
else:
super().emit(record)

def flush(self):
if self.handler is not None:
self.handler.flush()
self.context_set = True

def close(self):
# When application exit, system shuts down all handlers by
Expand Down
13 changes: 6 additions & 7 deletions tests/cli/commands/test_task_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,13 +212,12 @@ def test_local_run(self):
dag = get_dag(args)
reset(dag.dag_id)

with mock.patch('argparse.Namespace', args) as mock_args:
task_command.task_run(mock_args)
task = dag.get_task(task_id=args.task_id)
ti = TaskInstance(task, args.execution_date)
ti.refresh_from_db()
state = ti.current_state()
self.assertEqual(state, State.SUCCESS)
task_command.task_run(args)
task = dag.get_task(task_id=args.task_id)
ti = TaskInstance(task, args.execution_date)
ti.refresh_from_db()
state = ti.current_state()
self.assertEqual(state, State.SUCCESS)


class TestCliTaskBackfill(unittest.TestCase):
Expand Down
5 changes: 5 additions & 0 deletions tests/dags/test_on_kill.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@

class DummyWithOnKill(DummyOperator):
def execute(self, context):
import os
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nuclearpinguin Did you fix these issue in other PR?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This wasn't an issue, I was just making doubley sure that the tests created more processes.

# This runs extra processes, so that we can be sure that we correctly
# tidy up all processes launched by a task when killing
if not os.fork():
os.system('sleep 10')
time.sleep(10)

def on_kill(self):
Expand Down
Loading