-
Notifications
You must be signed in to change notification settings - Fork 14.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[AIRFLOW-5931] Use os.fork when appropriate to speed up task execution. #6627
[AIRFLOW-5931] Use os.fork when appropriate to speed up task execution. #6627
Conversation
I have tested this locally and it seems to work fine. @ashb when are situations where CAN_FORK is false besides when doing run_as_user? |
My benchmarking results of this change below. All my tests were of ** Without change, 40 dag runs**:
With change, 40 dag runs
|
And the effect on time to complete 10 DAG runs: Time for 10 dag runs with 40 tasks each: 803.298s (±8.5607s) A x1.69 change Time for 10 dag runs with 12 tasks each: 240.400s (±3.0390s) A x1.7 change |
Windows mostly :) Just being defensive. |
if self.process and psutil.pid_exists(self.process.pid): | ||
reap_process_group(self.process.pid, self.log) | ||
if self.process: | ||
if self.process.is_running(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you break this into a function?
This week I was thinking about it. If I worked at CLI, I saw this problem and it was on my list. I think this will be better done using multiprocessing.Process. Is there a reason why you did it this way? |
@mik-laj it's because multiprocessing.Process has to re-parse all dependencies/DAGs. It causes a lot of slowdown. |
Can you explain the security of running the tasks and the different processes involved? Afaik it does Executor -> Task -> Rawtask. So with your change it would now do "Executor -> Task -> Rawtask -> New Process"? I.e. it hasn't become part of the executor I assume (that would be a no go). Just verifying. |
reap_process_group(self.process.pid, self.log) | ||
if self.process: | ||
if self.process.is_running(): | ||
reap_process_group(self.process.pid, self.log) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: process groups are undeterministic. We would be better off using cgroups
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is just so that os.pgkill
beahves itself, not for anything more than that.
We've already got a Cgroup task runner (not that I know what state it is in!)
Errr. This is not the on the line I thought it was. Yea, pgroups might not be perfect but this isn't a change to the existing behaviour.
But @dimberman -> multiprocessing also uses os.fork() underneath in fork mode (default for Linux). I have my reservations with using mutlprocessing (but mostly because people do not realise that it actually uses fork (and we plan to use it anyway so no difference). Using multiprocessing might be a more portable way if we consider running it in different environments. Note that in python 3.8 default mode for the new process is spawn as forking on MacOS might cause crashes because threads are not safe for forking and some system libraries on MacOS run threads. So using multiprocessing.Process will be slower on MacOS in 3.8 but won't crash. |
Aside from from this performance enhancement I think a much bigger gain can be gotten from removing the raw task excecution. Ie. the process is now as follows Executor -> Task -> Rawtask. Which doesn't make sense. It should just be Executor -> Task. Ideally the task would then signal to the executor what its state is rather than setting its own state, which is either undeterministic (in case of a crash of the task) and insecure (it requires airflow db access by the task which is available to all operators) |
BTW I agree with @potiuk that its a bit strange that we get this speed up as both are using fork(). What is the trade off? |
@bolkedebruin having the tasks access the DB is a central part of the k8sexecutor. Unless we want to set up some sort of messaging system/message queue |
@bolkedebruin the understanding I have is that when you spawn a totally new process, you are reinitializing the interpreter, re-loading all dependencies, and restarting airflow. using os.fork directly allows you to keep the same memory state (at least that's what I understood from @ashb ) |
@dimberman @bolkedebruin : just to clarify -> |
Currently the change manually checks if we "can fork". When using multiprocessing.Process, the check is done by the library and it uses "fork" by default on Linux, and Python < 3.8 on MacOS, but it uses "spawn" on Windows / MacOS for python >= 3.8. So maybe we can simplify the code a bit here. |
@dimberman It's inherently an insecure design to have tasks directly access the DB and has been a pain point in Airflow for a long time. The executor is fine, but not tasks. There are many ways to do this, but what makes the k8s executor so special that tasks executed require access to the DB apart from the current paradigm? |
@potiuk Gotcha. |
Not quite. The existing flow is: Executor -> exec to spawn new python to run Task "watcher" -> spawn new python to run actual Task My PR changes this to: Executor -> exec to spawn new python to run Task "watcher" -> fork to run actual Task The number of processes in use remains the same -- the only difference is how we create the processes, and wether we have to reload all of python and the airflow modules or not. I am happy that the same semantics and isolation is maintained. We could also look at merging the "watcher" in to the executor -- the main thing the watcher does is set the task to failed if it errors, or kills it if the TaskInstance state is changed externally (i.e. the watcher is what is responsible for sending a term/kill signal to the task when you clear it in the UI). And yes, longer term I also want to stop the workers accessing the DB directly. (Once this is merged/working I plan to fix the Local and Celerey executors to tackle the exec vs fork there too.) I'll try using multiprocessing to do this,
Sad panda. Interestingly the bug report seems to say it's been a problem since OSX 10.13, but I haven't noticed a problem on 1.014 with this code. |
So from a quick look: multiprocessing.Process does not like it when something else reaps the process (which we do via our reap_process_group) -- it thinks it is still running and errors if I attempt to call So multiprocess.Process is not just a drop in replacement. |
Codecov Report
@@ Coverage Diff @@
## master #6627 +/- ##
=========================================
Coverage ? 84.28%
=========================================
Files ? 672
Lines ? 38316
Branches ? 0
=========================================
Hits ? 32296
Misses ? 6020
Partials ? 0
Continue to review full report at Codecov.
|
Right this should now pass it's tests (I had problems with the logging causing an infinite loop, decided the fix for that was to just re-configure the logging in the subprocess.) ✅ on my fork: https://travis-ci.com/ashb/airflow/builds/138072561 |
This is what the change for getting it working with multiprocessing looks like. (give or take, this diff is against a different version.) Do we think it's worth doing like this. I will test the timing, but this should be as fast as os.fork overall (give or take) diff --git airflow/task/task_runner/standard_task_runner.py airflow/task/task_runner/standard_task_runner.py
index 87b046377..9b0ef162a 100644
--- airflow/task/task_runner/standard_task_runner.py
+++ airflow/task/task_runner/standard_task_runner.py
@@ -19,11 +19,13 @@
import os
+import multiprocessing
import psutil
from setproctitle import setproctitle
+from airflow.configuration import conf
from airflow.task.task_runner.base_task_runner import BaseTaskRunner
-from airflow.utils.helpers import reap_process_group
+from airflow.utils.helpers import reap_process_group, DEFAULT_TIME_TO_WAIT_AFTER_SIGTERM
CAN_FORK = hasattr(os, 'fork')
@@ -34,67 +36,106 @@ class StandardTaskRunner(BaseTaskRunner):
"""
def __init__(self, local_task_job):
super().__init__(local_task_job)
+ self.mp_context = multiprocessing.get_context(
+ conf.get('core', 'exector_multiprocess_start_method', fallback=None)
+ )
self._rc = None
+ self._mp_process = None
def start(self):
- if CAN_FORK and not self.run_as_user:
- self.process = self._start_by_fork()
- else:
+ if self.run_as_user:
self.process = self._start_by_exec()
+ else:
+ self.process = self._start_by_subprocess()
def _start_by_exec(self):
subprocess = self.run_command()
return psutil.Process(subprocess.pid)
- def _start_by_fork(self):
- pid = os.fork()
- if pid:
- self.log.info("Started process %d to run task", pid)
- return psutil.Process(pid)
- else:
- from airflow.bin.cli import get_parser
- import signal
- import airflow.settings as settings
-
- signal.signal(signal.SIGINT, signal.SIG_DFL)
- signal.signal(signal.SIGTERM, signal.SIG_DFL)
- # Start a new process group
- os.setpgid(0, 0)
-
- # Force a new SQLAlchemy session. We can't share open DB handles
- # between process. The cli code will re-create this as part of its
- # normal startup
- settings.engine.pool.dispose()
- settings.engine.dispose()
+ def _start_by_subprocess(self):
+ self._mp_process = self.mp_context.Process(
+ target=type(self)._run_in_subprocess,
+ args=(self._command,),
+ daemon=False,
+ )
+ self._mp_process.start()
+
+ self.log.info("Started process %d to run task", self._mp_process.pid)
+ # Return a psutil.process so that we have the same type when doing run_as_user or this
+ return psutil.Process(self._mp_process.pid)
+
+ @classmethod
+ def _run_in_subprocess(cls, command):
+ from airflow.bin.cli import get_parser
+ import signal
+ import airflow.settings as settings
+
+ signal.signal(signal.SIGINT, signal.SIG_DFL)
+ signal.signal(signal.SIGTERM, signal.SIG_DFL)
+ # Start a new process group
+ os.setpgid(0, 0)
+
+ # Force a new SQLAlchemy session. We can't share open DB handles
+ # between process. The cli code will re-create this as part of its
+ # normal startup
+ settings.engine.pool.dispose()
+ settings.engine.dispose()
+
+ parser = get_parser()
+ args = parser.parse_args(command[1:])
+
+ proc_title = "airflow task runner: {0.dag_id} {0.task_id} {0.execution_date}"
+ if hasattr(args, "job_id"):
+ proc_title += " {0.job_id}"
+ setproctitle(proc_title.format(args))
- parser = get_parser()
- args = parser.parse_args(self._command[1:])
-
- proc_title = "airflow task runner: {0.dag_id} {0.task_id} {0.execution_date}"
- if hasattr(args, "job_id"):
- proc_title += " {0.job_id}"
- setproctitle(proc_title.format(args))
-
- try:
- args.func(args)
- os._exit(0)
- except Exception:
- os._exit(1)
+ try:
+ args.func(args)
+ os._exit(0)
+ except Exception:
+ os._exit(1)
def return_code(self, timeout=0):
# We call this multiple times, but we can only wait on the process once
- if self._rc is not None or not self.process:
+ if self._rc is not None:
return self._rc
- try:
- self._rc = self.process.wait(timeout=timeout)
- self.process = None
- except psutil.TimeoutExpired:
- pass
+ if self._mp_process:
+ self._mp_process.join(timeout)
+ self._rc = self._mp_process.exitcode
+
+ if self._rc is not None:
+ self._mp_process.close()
+ self._mp_process = None
+ self._process = None
+ elif self.process:
+ try:
+ self._rc = self.process.wait(timeout=timeout)
+ self.process = None
+ except psutil.TimeoutExpired:
+ pass
return self._rc
def terminate(self):
+ if self._mp_process:
+ pgid = os.getpgid(self._mp_process.pid)
+
+ self._mp_process.terminate()
+ self._rc = self._mp_process.exitcode
+ if self._rc is None:
+ self._mp_process.join(DEFAULT_TIME_TO_WAIT_AFTER_SIGTERM)
+ self._mp_process.kill()
+ self._mp_process.join(0)
+ self._rc = self._mp_process.exitcode
+
+ self._mp_process.close()
+ self.process = None
+ self._mp_process = None
+
+ # Make sure we kill and reap any sub-processes that are left!
+ reap_process_group(pgid, self.log)
+
if self.process:
if self.process.is_running():
rcs = reap_process_group(self.process.pid, self.log) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks Good.
0d74f81
to
9a9b246
Compare
2c95de6
to
3908bfc
Compare
Rather than running a fresh python interpreter which then has to re-load all of Airflow and its dependencies we should use os.fork when it is available/suitable which should speed up task running, espeically for short lived tasks. I've profiled this and it took the task duration (as measured by the `duration` column in the TI table) from an average of 14.063s down to just 0.932s!
leader" has already exited.
…rocesses Most of the time we will run the "raw" task in a forked subprocess (the only time we don't is when we use impersonation) that will have the logging already configured. So if the EsTaskHandler has already been configured we don't want to "re"configure it -- otherwise it will disable JSON output for the actual task!
3908bfc
to
1151395
Compare
…n. (#6627) * [AIRFLOW-5931] Use os.fork when appropriate to speed up task execution. Rather than running a fresh python interpreter which then has to re-load all of Airflow and its dependencies we should use os.fork when it is available/suitable which should speed up task running, espeically for short lived tasks. I've profiled this and it took the task duration (as measured by the `duration` column in the TI table) from an average of 14.063s down to just 0.932s! * Allow `reap_process_group` to kill processes even when the "group leader" has already exited. * Don't re-initialize JSON/stdout logging ElasticSearch inside forked processes Most of the time we will run the "raw" task in a forked subprocess (the only time we don't is when we use impersonation) that will have the logging already configured. So if the EsTaskHandler has already been configured we don't want to "re"configure it -- otherwise it will disable JSON output for the actual task!
…n. (#6627) * [AIRFLOW-5931] Use os.fork when appropriate to speed up task execution. Rather than running a fresh python interpreter which then has to re-load all of Airflow and its dependencies we should use os.fork when it is available/suitable which should speed up task running, espeically for short lived tasks. I've profiled this and it took the task duration (as measured by the `duration` column in the TI table) from an average of 14.063s down to just 0.932s! * Allow `reap_process_group` to kill processes even when the "group leader" has already exited. * Don't re-initialize JSON/stdout logging ElasticSearch inside forked processes Most of the time we will run the "raw" task in a forked subprocess (the only time we don't is when we use impersonation) that will have the logging already configured. So if the EsTaskHandler has already been configured we don't want to "re"configure it -- otherwise it will disable JSON output for the actual task!
…n. (#6627) * [AIRFLOW-5931] Use os.fork when appropriate to speed up task execution. Rather than running a fresh python interpreter which then has to re-load all of Airflow and its dependencies we should use os.fork when it is available/suitable which should speed up task running, espeically for short lived tasks. I've profiled this and it took the task duration (as measured by the `duration` column in the TI table) from an average of 14.063s down to just 0.932s! * Allow `reap_process_group` to kill processes even when the "group leader" has already exited. * Don't re-initialize JSON/stdout logging ElasticSearch inside forked processes Most of the time we will run the "raw" task in a forked subprocess (the only time we don't is when we use impersonation) that will have the logging already configured. So if the EsTaskHandler has already been configured we don't want to "re"configure it -- otherwise it will disable JSON output for the actual task!
…n. (apache#6627) * [AIRFLOW-5931] Use os.fork when appropriate to speed up task execution. Rather than running a fresh python interpreter which then has to re-load all of Airflow and its dependencies we should use os.fork when it is available/suitable which should speed up task running, espeically for short lived tasks. I've profiled this and it took the task duration (as measured by the `duration` column in the TI table) from an average of 14.063s down to just 0.932s! * Allow `reap_process_group` to kill processes even when the "group leader" has already exited. * Don't re-initialize JSON/stdout logging ElasticSearch inside forked processes Most of the time we will run the "raw" task in a forked subprocess (the only time we don't is when we use impersonation) that will have the logging already configured. So if the EsTaskHandler has already been configured we don't want to "re"configure it -- otherwise it will disable JSON output for the actual task! (cherry picked from commit f69aa14)
…n. (apache#6627) * [AIRFLOW-5931] Use os.fork when appropriate to speed up task execution. Rather than running a fresh python interpreter which then has to re-load all of Airflow and its dependencies we should use os.fork when it is available/suitable which should speed up task running, espeically for short lived tasks. I've profiled this and it took the task duration (as measured by the `duration` column in the TI table) from an average of 14.063s down to just 0.932s! * Allow `reap_process_group` to kill processes even when the "group leader" has already exited. * Don't re-initialize JSON/stdout logging ElasticSearch inside forked processes Most of the time we will run the "raw" task in a forked subprocess (the only time we don't is when we use impersonation) that will have the logging already configured. So if the EsTaskHandler has already been configured we don't want to "re"configure it -- otherwise it will disable JSON output for the actual task!
Hi guys, I met some logging issues when upgrading to 1.10.7. Subprocess loggings can't be found in airflow webserver nor local task logs anymore. After digging around, I find this PR is related. The old way of spawning a process has a daemon thread that handles the log streams. After I change the code to always create a subprocess without fork, the logs come right back. Can someone take a look? Thanks in advance. Jira ticket: https://issues.apache.org/jira/browse/AIRFLOW-7030 |
@pkqs90 -> maybe you can add a PR fixing it ? |
@pkqs90 We experienced a similar issue after upgrading from 1.10.3 to 1.10.9: no more task logging visible in the UI. In our case it was specifically for the KubernetesPodOperator, but not for the BashOperator for instance. We fixed it by adding a logger for the My guess is that the combination of forking instead of starting a whole new process + having no specific logger for the package triggered this particular issue. Hope it solves it for you too. |
@ewjmulder could you please elaborate a bit more in regards to what did you actually add to the log_config file? |
@davido912 See further discussion in #8484 |
If anyone has problems with this change, they can use this code to disable this change. sed -i '' "s/CAN_FORK = /CAN_FORK = False # /" $(python -c "from airflow.task.task_runner import standard_task_runner; print(standard_task_runner.__file__)") Debian sed -i "s/CAN_FORK = /CAN_FORK = False # /" $(python -c "from airflow.task.task_runner import standard_task_runner; print(standard_task_runner.__file__)") |
Make sure you have checked all steps below.
Jira
Description
Rather than running a fresh python interpreter which then has to re-load
all of Airflow and its dependencies we should use os.fork when it is
available/suitable which should speed up task running, espeically for
short lived tasks.
I've profiled this and it took the task duration (as measured by the
duration
column in the TI table) from an average of 14.063s down tojust 0.932s!
I could make this change deeper and bypass the
CLIFactory
/go directly to_run_raw_task
, but this makes the change the minimum needed to work.Tests
Other tests I need to perform:
os._exit
is right (this doesn't run atexit callbacks) - so I need to check if logging in the subprocess istidied up properly.Commits
Documentation