-
-
Notifications
You must be signed in to change notification settings - Fork 719
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
data_needed exclusively contains tasks in fetch state #6481
Conversation
crusaderky
commented
May 31, 2022
•
edited
Loading
edited
- Closes DEADLOCK: fetch->forgotten->fetch #6480
- This PR also removes a leak condition where a TaskState object remains in data_needed_per_worker, potentially for a long time, after it's been forgotten.
63014ad
to
a032754
Compare
# Ensure maxsize is respected | ||
l["d"] = 4 | ||
assert len(l) == 3 | ||
assert list(l.keys()) == ["c", "a", "d"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved from test_utils.py
|
||
await fut2 | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These tests don't make sense anymore. Also, one of them directly tampers with the state which is a big no-no.
@@ -2589,81 +2589,6 @@ def __reduce__(self): | |||
assert "return lambda: 1 / 0, ()" in logvalue | |||
|
|||
|
|||
@gen_cluster(client=True) | |||
async def test_gather_dep_exception_one_task(c, s, a, b): | |||
"""Ensure an exception in a single task does not tear down an entire batch of gather_dep |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is misleading.
This test was testing resilience to an exception in the transitions of a single task after gather_dep - which should be dealt with through @fail_hard
.
A legitimate exception in a single key of the bundle in gather_dep, namely a task that fails to unpickle, does make the whole gather_dep fail for all tasks. There's no code whatsoever to deal with this use case.
3ee77db
to
b6f5467
Compare
b6f5467
to
74640b8
Compare
This PR also removes a leak condition where a TaskState object remains in data_needed_per_worker after it's been forgotten. |
Ready for final review and merge @fjetter |
@@ -3380,6 +3379,7 @@ def done_event(): | |||
elif ts not in recommendations: | |||
ts.who_has.discard(worker) | |||
self.has_what[worker].discard(ts.key) | |||
self.data_needed_per_worker[worker].discard(ts) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How could it even ben in here? Shouldn't it have been removed when the task transitioned to flight?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, there could have been a transition to fetch in the meantime.
5e1c229
to
23e6698
Compare
All of @gjoseph92's comments have been addressed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just peeked over it and would trust @gjoseph92 with a more thorough review. I'm looking forward to this
return value | ||
heapq.heappop(self._heap) | ||
|
||
def pop(self) -> T: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While the Heap
prefix of the class already implies some non-constant operation, I wouldn't mind documenting the actual complexity since this can differ for heaps. However, I think the stdlib doesn't document this properly either and I'm ok with skipping this.
peek() is O(1) if you treat the bit that calls heappop as delayed housekeeping - e.g. you account for it in discard().
I think the amortized time for both peek and discard are constant. I don't think we should dive into deep algorithm complexity analysis here, though :)
if total_bytes + ts.get_nbytes() > self.target_message_size: | ||
break | ||
tasks.pop() | ||
deps.add(ts.key) | ||
self.data_needed.remove(ts) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
love it
if ts.state == "fetch": | ||
self.data_needed_per_worker[worker].remove(ts) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if ts.state == "fetch": | |
self.data_needed_per_worker[worker].remove(ts) | |
self.data_needed_per_worker[worker].discard(ts) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd much rather fail loudly if for any reason the assumption that fetch state and existence in data_needed are inextricably bound together fails