-
-
Notifications
You must be signed in to change notification settings - Fork 718
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix Scheduler.restart
logic
#6504
Conversation
`Scheduler.restart` used to remove every worker without closing it. This was bad practice (dask#6390), as well as incorrect: it certainly seemed the intent was only to remove non-Nanny workers. Then, Nanny workers are restarted via the `restart` RPC to the Nanny, not to the worker.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW both added tests fail on main
, so they would have caught the problem.
*( | ||
self.remove_worker(address=addr, stimulus_id=stimulus_id) | ||
for addr in self.workers | ||
if addr not in nanny_workers |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Key change: before, nannies
contained all workers, so we were removing all workers immediately. Now, we only remove non-nanny workers, and leave nanny workers around to be restarted via RPC to the Nanny a few lines below.
distributed/scheduler.py
Outdated
) | ||
for r in close_results: | ||
if isinstance(r, Exception): | ||
# TODO this is probably not, in fact, normal. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't see why this should happen. Any errors here are probably real (especially since remove_worker
doesn't even do much communication, and it try/excepts places where you'd expect errors might happen). I'd like to remove it if others are okay, but it's not necessary and I'm not certain how safe it is, so I'm leaving it for now.
] | ||
|
||
resps = All( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactor for style, using asyncio.gather
instead of All
async with Worker(s.address, nthreads=1) as w: | ||
await c.wait_for_workers(3) | ||
|
||
# Halfway through `Scheduler.restart`, only the non-Nanny workers should be removed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For reference, the plugin is triggered here:
distributed/distributed/scheduler.py
Lines 5115 to 5119 in bd74d2b
for plugin in list(self.plugins.values()): | |
try: | |
plugin.restart(self) | |
except Exception as e: | |
logger.exception(e) |
Hm, the only failure is And that's not a test that shows up as having failed before on https://dask.org/distributed/test_report.html. But since this PR is only changing the logic in |
I've confirmed that test fails for me locally on |
distributed/scheduler.py
Outdated
return_exceptions=True, | ||
) | ||
for r in close_results: | ||
if isinstance(r, Exception): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think like this?
if isinstance(r, Exception): | |
if isinstance(r, BaseException) and not isinstance(r, asyncio.CancelledError): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly minor comments
distributed/scheduler.py
Outdated
# TODO this is probably not, in fact, normal. | ||
logger.info("Exception while restarting. This is normal.", exc_info=r) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
# TODO this is probably not, in fact, normal. | |
logger.info("Exception while restarting. This is normal.", exc_info=r) | |
logger.error("Exception while restarting worker", exc_info=r) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After reading remove_worker
I agree that I can't see anything that could go wrong. IMHO we should just let the exception be logged by @log_errors
and be reraised on the client.
@fjetter do you have an opinion on this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I suggest to remove this. Scheduler.remove_worker
does not intentionally raise any exceptions nor are there any obvious transitive exceptions that should be raised and could be handled safely.
Therefore, in this local context I don't see the point about implementing any exception handling and would prefer to just reraise (ideally to the client).
I think it's even unnecessary to do the gather(..., raise_exception=False)
etc. foo. If there is an unexpected exception at this point, the only sane thing to do is to restart the entire cluster (e.g. if there is a transition error) but this needs to be done externally (obviously, since we tried restarting already :)).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, I didn't like it either. I just left it to keep the changes in this PR to a minimum.
If there is an unexpected exception at this point, the only sane thing to do is to restart the entire cluster
I think that would imply adding @fail_hard
to Scheduler.restart
, should I do that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NVM, we're not using fail_hard
anywhere on the scheduler right now, so I won't add that. I've removed the error suppression and raise_exception=False
.
await s.restart() | ||
|
||
if plugin.error: | ||
raise plugin.error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand what's the purpose of this complication - can't you just put the assertions you wrote in the plugin here instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See the exception handling here:
distributed/distributed/scheduler.py
Lines 5107 to 5111 in ea2c80f
for plugin in list(self.plugins.values()): | |
try: | |
plugin.restart(self) | |
except Exception as e: | |
logger.exception(e) |
distributed/tests/test_scheduler.py
Outdated
|
||
assert len(s.workers) == 2 | ||
# Confirm they restarted | ||
new_pids = set((await c.run(os.getpid)).values()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
new_pids = set((await c.run(os.getpid)).values()) | |
new_pids = {a.process.process.pid, b.process.process.pid} | |
assert all(new_pids) |
distributed/tests/test_scheduler.py
Outdated
assert len(s.workers) == 2 | ||
# Confirm they restarted | ||
new_pids = set((await c.run(os.getpid)).values()) | ||
assert new_pids != original_pids |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assert new_pids != original_pids | |
assert new_pids.isdisjoint(original_pids) |
distributed/tests/test_scheduler.py
Outdated
|
||
assert len(s.workers) == 2 | ||
# Confirm they restarted | ||
new_pids = set((await c.run(os.getpid)).values()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this possibly flakey as the pids could be reused?
also the psutil.Process.create_time()
might also be more useful here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should be safe to assume that the chance that the box running the tests is going to spawn and destroy 65536 processes during the duration of the test should be nil.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@graingert thanks, looks like just doing equality between the Process objects does this for us https://github.com/giampaolo/psutil/blob/5ca68709c44885f6902820e8dcb9fcff1cc1e33b/psutil/__init__.py#L408-L413
Ready for final review I believe |
Unit Test ResultsSee test report for an extended history of previous test failures. This is useful for diagnosing flaky tests. 15 files ± 0 15 suites ±0 6h 8m 49s ⏱️ - 2m 7s For more details on these failures, see this check. Results for commit d037f37. ± Comparison against base commit 6d85a85. |
|
TODO make this a separate PR and add tests. Just want to see if it helps CI.
This reverts commit 7d90e2a.
Unit Test ResultsSee test report for an extended history of previous test failures. This is useful for diagnosing flaky tests. 15 files ± 0 15 suites ±0 6h 39m 5s ⏱️ + 28m 9s Results for commit 5cccba6. ± Comparison against base commit 6d85a85. |
Scheduler.restart
used to remove every worker without closing it. This was bad practice (#6390), as well as incorrect: it certainly seemed the intent was only to remove non-Nanny workers. See detailed explanation in #6455 (comment).Closes #6455, closes #6452, closes #6494. I also will make a separate PR with #6494 (comment) (which won't be necessary to fix the restart issue, just a good cleanup task to do.)
cc @fjetter @hendrikmakait @jrbourbeau
pre-commit run --all-files