Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Zombie tasks after missing->released transition #5316

Merged
merged 2 commits into from
Oct 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions distributed/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2984,6 +2984,57 @@ async def test_who_has_consistent_remove_replica(c, s, *workers):
assert s.tasks[f1.key].suspicious == 0


@gen_cluster(client=True)
async def test_missing_released_zombie_tasks(c, s, a, b):
"""
Ensure that no fetch/flight tasks are left in the task dict of a
worker after everything was released
"""
a.total_in_connections = 0
Copy link
Collaborator

@crusaderky crusaderky Sep 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
a.total_in_connections = 0
# Cause f1 to remain in "fetch" state on b forever
a.total_in_connections = 0

f1 = c.submit(inc, 1, key="f1", workers=[a.address])
f2 = c.submit(inc, f1, key="f2", workers=[b.address])
key = f1.key

while key not in b.tasks or b.tasks[key].state != "fetch":
await asyncio.sleep(0.01)

await a.close(report=False)

del f1, f2

while b.tasks:
await asyncio.sleep(0.01)


@gen_cluster(client=True)
async def test_missing_released_zombie_tasks_2(c, s, a, b):
a.total_in_connections = 0
f1 = c.submit(inc, 1, key="f1", workers=[a.address])
f2 = c.submit(inc, f1, key="f2", workers=[b.address])

while f1.key not in b.tasks:
await asyncio.sleep(0)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
await asyncio.sleep(0)
await asyncio.sleep(0.01)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The zero sleep is actually deliberate to avoid any timing related problems. fetch->flight->memory can be very fast and I don't want to miss the transition. My sleeping 0 we're checking this upon every loop cycle. This is a bit excessive but guarantees us not to miss this

Copy link
Collaborator

@crusaderky crusaderky Sep 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't a.total_in_connections = 0 cause the state on b to be stuck in "fetch" forever?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the tasks will transition fo flight and trigger a get_data to worker A. Based on the total_in_connections it will then limit the number of concurrent outgoing connections (outgoing_current_count) and will reply with a {"status": "busy"} message.


ts = b.tasks[f1.key]
assert ts.state == "fetch"

# A few things can happen to clear who_has. The dominant process is upon
# connection failure to a worker. Regardless of how the set was cleared, the
# task will be transitioned to missing where the worker is trying to
# reaquire this information from the scheduler. While this is happening on
# worker side, the tasks are released and we want to ensure that no dangling
# zombie tasks are left on the worker
ts.who_has.clear()

del f1, f2

while b.tasks:
await asyncio.sleep(0.01)

story = b.story(ts)
assert any("missing" in msg for msg in story)


@pytest.mark.slow
@gen_cluster(
client=True,
Expand Down
23 changes: 7 additions & 16 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ def __init__(self, key, runspec=None):
self.who_has = set()
self.coming_from = None
self.waiting_for_data = set()
self.waiters = set()
self.resource_restrictions = {}
self.exception = None
self.exception_text = ""
Expand Down Expand Up @@ -1854,6 +1855,7 @@ def transition_released_waiting(self, ts, *, stimulus_id):
for dep_ts in ts.dependencies:
if not dep_ts.state == "memory":
ts.waiting_for_data.add(dep_ts)
dep_ts.waiters.add(ts)

if ts.waiting_for_data:
self.waiting_for_data_count += 1
Expand Down Expand Up @@ -2669,19 +2671,6 @@ async def find_missing(self):
who_has = {k: v for k, v in who_has.items() if v}
self.update_who_has(who_has, stimulus_id=stimulus_id)

if self._missing_dep_flight:
logger.debug(
"No new workers found for %s", self._missing_dep_flight
)
recommendations = {
dep: "released"
for dep in self._missing_dep_flight
if dep.state == "missing"
}
self.transitions(
recommendations=recommendations, stimulus_id=stimulus_id
)

finally:
# This is quite arbitrary but the heartbeat has scaling implemented
self.periodic_callbacks[
Expand Down Expand Up @@ -2792,9 +2781,11 @@ def release_key(
self.available_resources[resource] += quantity

for d in ts.dependencies:
ts.waiting_for_data.discard(ts)
if not d.dependents and d.state in {"flight", "fetch", "missing"}:
recommendations[d] = "released"
ts.waiting_for_data.discard(d)
d.waiters.discard(ts)

if not d.waiters and d.state in {"flight", "fetch", "missing"}:
recommendations[d] = "forgotten"

ts.waiting_for_data.clear()
ts.nbytes = None
Expand Down