Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bpo-33613, test_semaphore_tracker_sigint: fix race condition #7850

Merged
merged 21 commits into from
Sep 4, 2018
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
5f1df08
bpo-33613, test_semaphore_tracker_sigint: fix race condition
pablogsal Jun 20, 2018
4f9babb
Implement PONG command in the semaphore tracker
pablogsal Jun 24, 2018
26d81a3
Ensure the tracker is killed to allow multiple testing runs in the sa…
pablogsal Jun 24, 2018
a7d5c59
fixup! Ensure the tracker is killed to allow multiple testing runs in…
pablogsal Jul 1, 2018
a735d49
fixup! fixup! Ensure the tracker is killed to allow multiple testing …
pablogsal Jul 7, 2018
9460da4
fixup! fixup! fixup! Ensure the tracker is killed to allow multiple t…
pablogsal Jul 24, 2018
fb136d0
Use a buffered writer to answer to the PING command
pablogsal Jul 24, 2018
f3d0f0b
Use the tracker own stderr for answering the PING command
pablogsal Jul 24, 2018
f5e5cf9
Use pthread_sigmask to avoid the race condition
pablogsal Jul 27, 2018
4393021
Add comment explaining the race condition
pablogsal Jul 27, 2018
d501f2f
Restore timeout and use os.waitpid
pablogsal Jul 31, 2018
9e0243e
fixup! Restore timeout and use os.waitpid
pablogsal Jul 31, 2018
26e077b
Add News entry
pablogsal Jul 31, 2018
a50912b
fixup! fixup! Restore timeout and use os.waitpid
pablogsal Jul 31, 2018
bcbb942
fixup! Add News entry
pablogsal Jul 31, 2018
0b46f07
fixup! fixup! fixup! Restore timeout and use os.waitpid
pablogsal Aug 1, 2018
66ef7dc
Unregister sigmask if process fails to start
pablogsal Aug 3, 2018
f7d2d4d
Include the signal registering in the try block
pablogsal Sep 3, 2018
4a09fde
Simplify comment when handling ChildProcessError
pablogsal Sep 3, 2018
c9f96c1
Eliminate hardcoded reference to the bug tracker url
pablogsal Sep 3, 2018
c0a088c
Remove extraneous NEWS file
pitrou Sep 4, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions Lib/multiprocessing/semaphore_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,14 @@ def ensure_running(self):
fds_to_pass.append(sys.stderr.fileno())
except Exception:
pass
cmd = 'from multiprocessing.semaphore_tracker import main;main(%d)'
cmd = 'from multiprocessing.semaphore_tracker import main;main({}, {})'
r, w = os.pipe()
try:
fds_to_pass.append(r)
# process will out live us, so no need to wait on pid
exe = spawn.get_executable()
args = [exe] + util._args_from_interpreter_flags()
args += ['-c', cmd % r]
args += ['-c', cmd.format(r, sys.stderr.fileno())]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This won't work if sys.stderr isn't an actual file:

>>> io.StringIO().fileno()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
io.UnsupportedOperation: fileno

Copy link
Member Author

@pablogsal pablogsal Jul 24, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was relaying on the fact that the tracker seems to work under the assumption that sys.stderr has a file descriptor associated:

https://github.com/python/cpython/blob/master/Lib/multiprocessing/semaphore_tracker.py#L60

Relevant lines:

            fds_to_pass = []
            try:
                fds_to_pass.append(sys.stderr.fileno())
            except Exception:
                pass

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pitrou I am missing something?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, the except Exception should be clear, no? :-)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ups, my bad. :)

Let me investigate what options do we have. Do you have a preferred approach on how to handle this?

pid = util.spawnv_passfds(exe, args, fds_to_pass)
except:
os.close(w)
Expand Down Expand Up @@ -105,7 +105,7 @@ def _send(self, cmd, name):
getfd = _semaphore_tracker.getfd


def main(fd):
def main(fd, fd_write):
'''Run semaphore tracker.'''
# protect the process from ^C and "killall python" etc
signal.signal(signal.SIGINT, signal.SIG_IGN)
Expand All @@ -128,6 +128,8 @@ def main(fd):
cache.add(name)
elif cmd == b'UNREGISTER':
cache.remove(name)
elif cmd == b'PING':
os.write(fd_write, b"PONG\n")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

os.write may not write all bytes. You probably want to wrap fd_write in a buffered writer as is done for fd above.

else:
raise RuntimeError('unrecognized command %r' % cmd)
except Exception:
Expand Down
38 changes: 32 additions & 6 deletions Lib/test/_test_multiprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import struct
import operator
import weakref
import warnings
import test.support
import test.support.script_helper
from test import support
Expand Down Expand Up @@ -4472,17 +4473,34 @@ def check_semaphore_tracker_death(self, signum, should_die):
# bpo-31310: if the semaphore tracker process has died, it should
# be restarted implicitly.
from multiprocessing.semaphore_tracker import _semaphore_tracker
_semaphore_tracker.ensure_running()
pid = _semaphore_tracker._pid
if pid:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like the multiprocessing code uses "pid is not None". Technically, I'm not sure that it's possible to have a pid of 0, since pid 0 has a special meaning in many C functions.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use pid is not None here. I think is clearer what the value you are expecting not to be.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm now confused, you modified "if not pid" in the other file, whereas it seems like os.waitpid() doesn't return None.

Copy link
Member Author

@pablogsal pablogsal Aug 1, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I did not understand correctly your suggestion. For some reason, I inferred that with WNOHANG, waitpid could return None (as waitid can do if you provide WNOHANG). I think I understand now. Mea culpa :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 822516a1403d4eaec9ac743861336f0902d8186e

os.kill(pid, signal.SIGKILL)
time.sleep(0.5) # give it time to die
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible to wait until the process completes instead of using a sleep?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sadly no AFAIK, because the tracker relies on calling waitpid to check the child. If you add a os.waitpid here you will get:


ERROR: test_semaphore_tracker_sigint (test.test_multiprocessing_fork.TestSemaphoreTracker)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/Users/pgalindo3/cpython/Lib/test/_test_multiprocessing.py", line 4509, in test_semaphore_tracker_sigint
    self.check_semaphore_tracker_death(signal.SIGINT, False)
  File "/Users/pgalindo3/cpython/Lib/test/_test_multiprocessing.py", line 4481, in check_semaphore_tracker_death
    _semaphore_tracker.ensure_running()
  File "/Users/pgalindo3/cpython/Lib/multiprocessing/semaphore_tracker.py", line 48, in ensure_running
    pid, status = os.waitpid(self._pid, os.WNOHANG)
ChildProcessError: [Errno 10] No child processes
Ran 1 test in 0.068s

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can make the child capture ChildProcessError on that waitpid and treat that as if the child has died, making it possible to call waitpid in the test.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not using os.waitpid() in the test, and modify ensure_running() to handle ChildProcessError? The purpose of the waitpid() call in ensure_running() is to check if the process died. We don't care of the exit status.

Adding time.sleep(0.5) means adding a new race condition, while fixing another, no?

old_stderr = sys.stderr
r, w = os.pipe()
try:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Notice that we cannot use test.support.captured_stderr() as it does not support fileno().

sys.stderr = open(w, "bw")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line should be before the try:.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in d075473

with warnings.catch_warnings(record=True) as all_warn:
_semaphore_tracker.ensure_running()
pid = _semaphore_tracker._pid
# Wait until we receive the PONG from the child, indicating that
# the signal handlers have been registered. See bpo-33613 for more
# information.
_semaphore_tracker._send("PING", "")
with open(r, "rb") as pipe:
data = pipe.readline()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we okay with this potentially hanging indefinitely if something goes wrong?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Commit d075473 implements a way to fail if reading takes too long.

if b"PONG" not in data:
raise ValueError("Invalid data in stderr!")
finally:
sys.stderr.close()
sys.stderr = old_stderr

os.kill(pid, signum)
time.sleep(1.0) # give it time to die

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you reduce time?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because one second seems a lot to me for the signal to be delivered (checked in the buildbot) and this will make the test suite run a bit faster. I can undo this if we want to be conservative :).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please keep 1 second. While you saw the test passing, I'm sure that reducing the sleep will make the test failure more likely on some very busy buildbots.

ctx = multiprocessing.get_context("spawn")
with contextlib.ExitStack() as stack:
if should_die:
stack.enter_context(self.assertWarnsRegex(
UserWarning,
"semaphore_tracker: process died"))
with warnings.catch_warnings(record=True) as all_warn:
sem = ctx.Semaphore()
sem.acquire()
sem.release()
Expand All @@ -4492,6 +4510,14 @@ def check_semaphore_tracker_death(self, signum, should_die):
del sem
gc.collect()
self.assertIsNone(wr())
if should_die:
self.assertEqual(len(all_warn), 1)
the_warn = all_warn[0]
issubclass(the_warn.category, UserWarning)
serhiy-storchaka marked this conversation as resolved.
Show resolved Hide resolved
self.assertTrue("semaphore_tracker: process died"
in str(the_warn.message))
else:
self.assertEqual(len(all_warn), 0)

def test_semaphore_tracker_sigint(self):
# Catchable signal (ignored by semaphore tracker)
Expand Down