Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dask tasks not being shared among workers #788

Closed
rabernat opened this issue Sep 24, 2020 · 20 comments
Closed

Dask tasks not being shared among workers #788

rabernat opened this issue Sep 24, 2020 · 20 comments

Comments

@rabernat
Copy link
Member

I have a puzzling situation that is one of these difficult full-stack problems (not clear which package is responsible), so I'm posting it in our generic pangeo issue tracker.

The basic problem is contained in this dashboard screenshot. A single worker has all the tasks. That worker is out of memory, so the computation is stalled. The other workers are doing nothing. The only way out is to interrupt.

dask_stalled

This problem can be reproduced on https://us-central1-b.gcp.pangeo.io/ with the following code

from dask_gateway import Gateway
gateway = Gateway()
options = gateway.cluster_options()
options.worker_memory = 10
cluster = gateway.new_cluster(options)
cluster.adapt(minimum=1, maximum=30)

from dask.distributed import Client
client = Client(cluster)

import gcsfs
import dask.array as dsa
gcs = gcsfs.GCSFileSystem(requester_pays=True)
url = 'pangeo-scratch/paigem//paigem/CESM_POP_hires_control/SHF_2_target.zarr'
target_mapper = gcs.get_mapper(url)
target_array = dsa.from_zarr(target_mapper)

target_array.mean(axis=0).compute()

I am a bit lost on how to debug this. Is it a problem with distributed? Gateway? With my code?

Any tips would be appreciated. @TomAugspurger is of course the guru on this stuff.

cc @paigem

@mrocklin
Copy link
Member

Congratulations! You have discovered a secret dashboard page! Please proceed to /stealing to claim your prize!

@TomAugspurger
Copy link
Member

Hmm, it just finished well for me on us-central1-b.gcp.pangeo.io.
I never had a single worker end up with too much memory. I wonder if there were more ready nodes in the kubernetes cluster, so my Dask cluster scaled up faster, and I never got into a bad situation?

I wonder if work stealing isn't kicking in here, since the task is actively running in that worker (https://distributed.dask.org/en/latest/work-stealing.html#transactional-work-stealing).

@chiaral
Copy link
Member

chiaral commented Sep 24, 2020

I've had this issue - tasks concentrating in one worker and stalling - many many times in my workflow. It is different from Ryan's because it is a very intensive calculation.

In particular, in my case:

  1. I don't necessarily run out of memory for the overwhelmed worker. It just simply gets stuck with hundreds of tasks and it doesn't go anywhere.
  2. it can stall for hours, so I have to babysit my runs to avoid it stalls for like days. it won't die.
  3. I sort of solved by increasing worker memory of a lot, but at some point it still reaches this situation (almost like if it were leaking memory, but at some point we checked and we didn't see this), and I just have to kill the job, and re start it (my workflow allows for restarting without losing everything, because I am looping tru years of data one at the time and save them one at the time).
  4. my graphs are really big >50k tasks , but nothing gets overwhelmed or out of memory, until ... it does 🤷🏼‍♀️
  5. some times.. it gets over the hump...
  6. this issue started when we went from dask kubernetis to dask gateway. I am positive that I did not have this specific issue before that upgrade.

@TomAugspurger
Copy link
Member

Thanks for the additional info. I was able to reproduce by lowering the worker memory limit. I'll continue to debug this, and will keep in mind that it's happening elsewhere.

@chiaral
Copy link
Member

chiaral commented Sep 24, 2020

FYI Here a picture of my dask dashboard when it reaches the stalling. It's a busy graph, for sure.

@rabernat
Copy link
Member Author

Hmm, it just finished well for me on us-central1-b.gcp.pangeo.io.
I never had a single worker end up with too much memory. I wonder if there were more ready nodes in the kubernetes cluster, so my Dask cluster scaled up faster, and I never got into a bad situation?

Frustrating that you could not replicate it. That's unfortunately often the case. Indeed, on a fresh cluster, I was also unable to reproduce, including when I limited to maximum=10 workers.

Just read latest comments

Thanks for the additional info. I was able to reproduce by lowering the worker memory limit. I'll continue to debug this, and will keep in mind that it's happening elsewhere.

Ok, if we can reproduce, then we are in business! Often the solution to these problems is "just increase your memory." But in this case it seems a little more pathological.

Congratulations! You have discovered a secret dashboard page! Please proceed to /stealing to claim your prize!

🤣 Thanks Matt! It's always exciting to get to the next level of the Dask game!

@dcherian
Copy link
Contributor

Some documentation/tips on debugging this kind of situation would be useful.

I've run into some version of this and didn't know what to do... or where to look.

@rabernat
Copy link
Member Author

rabernat commented Sep 24, 2020

I've run into some version of this and didn't know what to do... or where to look.

I would go so far as to say that this [gestures wildly at the general problem] is one of the central challenges facing Pangeo and limiting us from having the impact we want. If you and I are getting stuck regularly, imagine how a first-year grad student feels!

@dcherian
Copy link
Contributor

I agree; we need a few 5-10 minutes youtube videos on "dask performance debugging techniques at scale".

@TomAugspurger
Copy link
Member

TomAugspurger commented Sep 24, 2020 via email

@TomAugspurger
Copy link
Member

TomAugspurger commented Sep 24, 2020

Actually, more likely a comms issue. The scheduler sends a message to the worker like "compute this <thing>!" and the worker logs "I got this new <thing> to work on!".

Looking through the logs / scheduler state, <thing> is set to processing (scheduler told the worker to do it) but the worker never says "I got <thing>"`.

Looking in Scheduler.worker_comms[worker], I see the BatchedComm has 1,326 pending messages.

Edit: The 1,326 wasn't right. That's an increasing number of messages sent counter. But the buffer had 906 messages in it, so this is still the right path.

Oh, and if you're wondering "Why didn't task stealing kick in?", it did, kinda.

[{'op': 'steal-request',
  'key': "('from-zarr-4668c0ce136bfff6d16e64ae669c3cd1', 0, 68, 10)"},
 {'op': 'steal-request',
  'key': "('from-zarr-4668c0ce136bfff6d16e64ae669c3cd1', 0, 64, 41)"},
 {'op': 'steal-request',
  'key': "('from-zarr-4668c0ce136bfff6d16e64ae669c3cd1', 0, 18, 57)"},
 {'op': 'steal-request',
  'key': "('from-zarr-4668c0ce136bfff6d16e64ae669c3cd1', 0, 24, 34)"},
 {'op': 'steal-request',
  'key': "('from-zarr-4668c0ce136bfff6d16e64ae669c3cd1', 0, 69, 51)"}]

but the steal requests never got to the worker. They were stuck in the message queue.

@TomAugspurger
Copy link
Member

And this managed to kick off the computation again.

async def drain(dask_scheduler):
    res = await dask_scheduler.stream_comms[worker]._background_send()
    return res

fut = client.run_on_scheduler(drain)

So something fishing is going on in BatchedSend.

@mrocklin
Copy link
Member

I would go so far as to say that this [gestures wildly at the general problem] is one of the central challenges facing Pangeo and limiting us from having the impact we want. If you and I are getting stuck regularly, imagine how a first-year grad student feels!

Given that, I wonder if the next time Pangeans apply for grant that funding for solving problems like this should be included as part of that grant.

TomAugspurger added a commit to TomAugspurger/distributed that referenced this issue Sep 25, 2020
As reported in pangeo-data/pangeo#788, users
were seeing tasks just not completing. After some debugging, I
discovered that the scheduler had assigned the "stuck" tasks to a worker,
but the worker never received the message. A bit more digging showed
that

1. The message was stuck in the worker BatchedSend comm's buffer
2. The BatchedSend.waker event was clear (awaiting it would wait)
3. The BatchedSend.next_deadline was set

I couldn't determine *why*, but this state is consistent with us
"missing" a deadline, i.e. the `BatchedSend.next_deadline` is set, but
the `_background_send` is already `awaiting` the event. So I'm very
shaky on the cause, but I'm hoping that this fixes the issue. Doing
some more manual testing.
@rabernat
Copy link
Member Author

Given that, I wonder if the next time Pangeans apply for grant that funding for solving problems like this should be included as part of that grant.

I think the reason we have not done this so far is that we don't have a clear vision for exactly how to address this challenge.

@rabernat
Copy link
Member Author

rabernat commented Sep 25, 2020

I also want to repeat how singularly valuable @TomAugspurger has become in this area. Through attending our meetings regularly, learning about the science questions, and helping deploy the infrastructure, Tom is perhaps the closest thing we have to a full-stack Pangeo engineer. His ability to get to the bottom of real problems is unparalleled.

But I'm not sure how to scale it. If I could clone Tom, I would. A better question I guess is, how do we train more people like Tom?

@mrocklin
Copy link
Member

I wrote a response earlier but then decided that it probably crosses a for-profit / open source line. My apologies, I've deleted it. In general, I encourage well funded groups asking for help with Dask to support professional paid developers working on Dask to help them solve their problems.

@rabernat
Copy link
Member Author

Matt I really appreciate your thoughts on this.

I think the pattern of using grant money to support private companies to do this type of work has been very effective for Pangeo.

Currently Pangeo is in a transition from its first generation of NASA / NSF grants to new sources of funding. So there are new opportunities on the horizon for this sort of collaboration.

@TomAugspurger
Copy link
Member

Back to the "how would a user debug this" question: regular users on pangeo's jupyterhubs don't have access to the stdout of the scheduler or worker pods. I'm guessing this is somewhat common.

I have kubectl access so I can see the output on the scheduler:

dask_gateway.dask_cli - INFO - Requesting scale to 30 workers from 1
distributed.batched - ERROR - Error in batched write
Traceback (most recent call last):
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/batched.py", line 93, in _background_send
    payload, serializers=self.serializers, on_error="raise"
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/tornado/gen.py", line 735, in run
    value = future.result()
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/comm/tcp.py", line 241, in write
    stream.write(b)
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/tornado/iostream.py", line 553, in write
    self._write_buffer.append(data)
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/tornado/iostream.py", line 177, in append
    b += data  # type: ignore
BufferError: Existing exports of data: object cannot be re-sized

which would have made things easier to pinpoint. client.get_scheduler_logs() only gets the logs from the distributed.scheduler, not all the logs from the schedulers process (which show up in stdout on the pod).

If you know a bit about logging / distributed, you can configure things

import logging

def setup_logging(dask_scheduler):
    handler = dask_scheduler._deque_handler
    logger = logging.getLogger("distributed")
    logger.addHandler(handler)
    logger.warning("test")

so that any call to a logger in distributed on the scheduler will be added to the scheduler's log.

I'll think about how we can make this easier for users (in addition to thinking about the cause of the comm error).

TomAugspurger added a commit to TomAugspurger/distributed that referenced this issue Sep 28, 2020
This changes exception handling in BatchedSend.

The problem: BatchedSend.send is a regular function. What happens if we
add a message to be sent, but that send fails at a later time? In the
current structure, that exception is logged and then lost to the void.

This PR makes things a bit more robust in two ways:

1. For unexpected errors, we retry a few times. In
   pangeo-data/pangeo#788 manual retries did
   fix the issue, so we try that here automatically.
2. For CommClosedErrors or repeated unexpected errors, we close the
   BatchedSend object. By closing the BatchedSend after an exception,
   *subsequent* `BatchedSend.send` calls will fail.

We still face the issue with a potential exception happening in the
background that *aren't* followed by another `BatchedSend.send` failing.
But I think that's unavoidable given the current design.
TomAugspurger added a commit to TomAugspurger/distributed that referenced this issue Sep 28, 2020
This changes exception handling in BatchedSend.

The problem: BatchedSend.send is a regular function. What happens if we
add a message to be sent, but that send fails at a later time? In the
current structure, that exception is logged and then lost to the void.

This PR makes things a bit more robust in two ways:

1. For unexpected errors, we retry a few times. In
   pangeo-data/pangeo#788 manual retries did
   fix the issue, so we try that here automatically.
2. For CommClosedErrors or repeated unexpected errors, we close the
   BatchedSend object. By closing the BatchedSend after an exception,
   *subsequent* `BatchedSend.send` calls will fail.

We still face the issue with a potential exception happening in the
background that *aren't* followed by another `BatchedSend.send` failing.
But I think that's unavoidable given the current design.
@TomAugspurger
Copy link
Member

dask/distributed#4135 is merged and deployed to us-central1-b.gcp.pangeo.io/. So I'm going to consider it fixed until we see more issues (there might be some semi-related issues linked from dask/distributed#4128 about workers connections, especially when rapidly scaling up).

@dcherian
Copy link
Contributor

dcherian commented Oct 9, 2020

Thanks Tom!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants