Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dask: Stalling Tasks? #5879

Open
rileyhun opened this issue Feb 28, 2022 · 30 comments
Open

Dask: Stalling Tasks? #5879

rileyhun opened this issue Feb 28, 2022 · 30 comments
Labels
bug Something is broken needs info Needs further information from the user

Comments

@rileyhun
Copy link

rileyhun commented Feb 28, 2022

What happened:
This is an issue I posted on StackOverflow but received feedback that I should post this as dask.distributed issue. I have a production dask batch inferencing work load that's applying machine learning predictions against 300K rows of features and am desperately seeking some guidance for an issue I'm facing. I'm using dask-yarn on an EMR instance on AWS. It happens on a random occurrence. - that is, there seems to be some stragglers that are holding up Dask and I am not sure how to deal with them. For the assign task, it's been stuck at 9994/10000, and it just gets stuck and doesn't process the remaining tasks for some reason. There have been other times when I've used Dask and it sometimes gets "stuck" on bigger datasets. I am using relatively small batches - 300K rows of data at a time.

P.S. Sometimes I'll stop the Dask processing and then retry running on the same batch it got stuck on, and then it will finally process. Then it gets stuck on the next batch, and then I'll have stop Dask again, retry and then that batch will work. It's quite annoying.

What you expected to happen:
I expected the jobs to run smoothly without any tasks getting stuck

Example Code (non-reproducible):

cluster = YarnCluster(environment='custom_env.tar.gz', 
                      worker_memory='16GiB', 
                      worker_vcores=8,
                      worker_env={"DASK_DISTRIBUTED__SCHEDULER__WORK_STEALING": "False",
                                  "DASK_DISTRIBUTED__SCHEDULER__ALLOWED_FAILURES": "10",
                                  "DASK_DISTRIBUTED__WORKER__DAEMON": "True"
                                 }
                     )
client = Client(cluster)

s3 = boto3.resource('s3')
bucket = s3.Bucket('my_data')
objs = list(bucket.objects.filter(Prefix='features-new/batch=1/'))
files = ['s3://my_data/'+ obj.key for obj in objs]

with open('model_config.yml') as f:
    base_config = yaml.safe_load(f)
    
model = DoctypeV2Predict(base_config)
model.download_models()
bert_model, bert_tokenizer, xgb_model = model.load_models()

bert = dask.delayed(bert_model)
tokenizer = dask.delayed(bert_tokenizer)
xgb = dask.delayed(xgb_model)

def predict_one(model_config, bert_model, bert_tokenizer, xgb_model, input_doc):
    model = DoctypeV2Predict(model_config)
    return model.predict(bert_model, bert_tokenizer, xgb_model, input_doc)

def model_predict(row, model_config, bert_model, bert_tokenizer, xgb_model):
    gc.collect()

    input_doc = row.to_dict()
    if "full_text" not in input_doc:
        return {'error_code': 410}
    if 'language' not in input_doc:
        return {'error_code': 411}
    if 'content_identifier' not in input_doc:
        return {'error_code': 412}
    if len(input_doc['full_text'].strip()) == 0:
        return {'error_code': 451}
    if pd.isnull(input_doc['language']):
        return {'error_code': 452}
    if input_doc['language'].lower() not in ['en', 'english']:
        return {'error_code': 452}
    
    try:
        return predict_one(model_config, bert_model, bert_tokenizer, xgb_model, input_doc)
    except:
        return {'error_code': 500}

batch = 0
for file in files:
    all_features_df = dd.read_csv(file)
    all_features_df = all_features_df.repartition(npartitions=10000).persist()
    time.sleep(10)
    all_features_df['max_element_length'] = all_features_df['max_element_length'].astype('float64')
    all_features_df['item_count'] = all_features_df['item_count'].astype('float64')
    all_features_df['bullet_count'] = all_features_df['bullet_count'].astype('float64')
    all_features_df['colon_count'] = all_features_df['colon_count'].astype('float64')
    all_features_df['description'] = all_features_df['description'].fillna('').astype('str')
    all_features_df['language'] = all_features_df['language'].fillna('').astype('str')
    all_features_df['full_text'] = all_features_df['full_text'].fillna('').astype('str')
    all_features_df['title'] = all_features_df['title'].fillna('').astype('str')
    all_features_df['prediction_response'] = all_features_df.apply(
        model_predict,
        model_config=base_config,
        bert_model=bert,
        bert_tokenizer=tokenizer,
        xgb_model=xgb,
        axis=1, 
        meta=('prediction_response', 'object'))
    
    all_features_df = all_features_df.persist()
    all_features_df.repartition(partition_size="100MB").to_parquet(f's3://my_data/predictions/batch={batch}/',
                                                       engine='pyarrow',
                                                       schema='infer')
    batch += 1
    client.restart()
    time.sleep(5)

Anything else we need to know?:
Screen Shot 2022-02-28 at 9 40 14 AM

Environment:

  • Dask version: dask-yarn 0.9
  • Python version: 3.7.10
  • Operating System:
  • Install method (conda, pip, source): pip
Cluster Dump State:
@rileyhun
Copy link
Author

rileyhun commented Mar 1, 2022

I reduced the number of Dask workers from 600 --> 350, and I don't encounter this issue anymore. Maybe this is some networking issue? Bandwidth? I have no clue.

@gjoseph92
Copy link
Collaborator

@rileyhun just to confirm the problem is actually Dask, and not the code you're running: when your cluster gets into a stuck state like this, could you use Client.processing() and Client.call_stack() to compare what the scheduler thinks should be assigned to workers, and what the workers are actually running, and see if there's an inconsistency?

Basically, if processing indicates a task has been assigned to a worker, but call_stack indicates that worker isn't doing anything, then something has gotten out of sync. However, if those states match up (Dask thinks there are still tasks to do, and the workers are running them, or Dask thinks there's nothing to run, and the workers aren't running anything), then the problem may be elsewhere.

Here's a script I have lying around to check on this (not tested, can't promise it works)
"Run `python check-deadlock.py <scheduler-address>` to check for deadlocks related to https://github.com/dask/distributed/issues/5481"

import sys
import itertools
from time import time

import distributed

if __name__ == "__main__":
    if len(sys.argv) != 2:
        raise ValueError("Supply scheduler address as the first CLI argument")

    client = distributed.Client(sys.argv[1])

    processing = client.processing()
    call_stacks = client.call_stack(
        keys=list(set(itertools.chain(*processing.values())))
    )
    readys = client.run(
        lambda dask_worker: [key for priority, key in dask_worker.ready]
    )

    problem_addrs = set()
    for addr, keys_processing in processing.items():
        if not keys_processing:
            continue

        ready = readys[addr]
        try:
            call_stack = call_stacks[addr]
        except KeyError:
            print(f"{addr} processing {keys_processing} but has no call stack")
            problem_addrs.add(addr)
            continue

        all_tasks_on_worker = set(itertools.chain(call_stack.keys(), ready))

        if set(keys_processing) != all_tasks_on_worker:
            print(
                f"*** {addr} should be have {len(keys_processing)} tasks, but has {len(all_tasks_on_worker)} ***",
                f"     Scheduler expects: {keys_processing}",
                f"     Worker executing: {list(call_stack)}",
                f"     Worker ready: {ready}",
                f"     Missing: {set(keys_processing) - all_tasks_on_worker}",
                f"     Extra: {all_tasks_on_worker - set(keys_processing)}",
                sep="\n",
            )
            problem_addrs.add(addr)

    batched_send_state = client.run(
        lambda dask_worker: (
            f"Batched stream buffer length: {len(dask_worker.batched_stream.buffer)}\n"
            f"please_stop = {dask_worker.batched_stream.please_stop}\n"
            f"waker.is_set() = {dask_worker.batched_stream.waker.is_set()}\n"
            f"time remaining = {nd - time() if (nd := dask_worker.batched_stream.next_deadline) else None}\n"
        )
    )

    print("BatchedSend info:")
    for addr, bs in batched_send_state.items():
        print(("*** PROBLEMATIC: " if addr in problem_addrs else "") + addr)
        print(bs)

    if problem_addrs:
        print(f"Inconsistency detected involving workers {problem_addrs}")
    else:
        print("No inconsistency detected")

One other note about your code: if you're having any issues with memory, you might want remove all_features_df = all_features_df.persist() at the end. That way, you'll be able to stream the results to S3, and release them as soon as they've been written to the bucket, instead of keeping everything in memory until all partitions are complete.

@rileyhun
Copy link
Author

rileyhun commented Mar 2, 2022

Thanks for the REALLY helpful advice @gjoseph92. I can confirm that memory is not the issue. I think I know what the problem is. It's that on the EMR cluster, I am using spot units to cut down on costs. What that means though is that workers die every so often, since there aren't enough resources available for the Dask workers. I have switched to on-demand for the time being, and so far, things seem to be more stable. I am guessing Dask workers aren't quite fault tolerant as I initially though.

@gjoseph92
Copy link
Collaborator

@rileyhun glad to hear things are working better. Dask should be able to handle workers coming and going like that, though it's a somewhat common trigger for bugs. Did you end up running that script? I'd be curious to see the output if so. These are the sorts of issues we want to track down and fix.

@rileyhun
Copy link
Author

rileyhun commented Mar 3, 2022

Hi @gjoseph92 - Thanks for your response. Apologies if this is a dumb question - I wanted to run your script - but how does one do that whilst in the midst of running the Dask job. I'm using a Jupyter Notebook as part of Dask-EMR and it's in the middle of running the batch job. Do I just open up a new terminal and run the script, while pointing to the scheduler address?

@gjoseph92
Copy link
Collaborator

Ah yes, you'd do that and just pass the address of the cluster. Or if there's some other way you connect to the cluster, you can modify the script accordingly.

@rileyhun
Copy link
Author

rileyhun commented Mar 3, 2022

I unfortunately get an error when I try running your script:

  File "check-deadlock.py", line 18, in <module>
    readys = client.run(
  File "/home/hadoop/miniconda/envs/doctype/lib/python3.8/site-packages/distributed/client.py", line 2750, in run
    return self.sync(
  File "/home/hadoop/miniconda/envs/doctype/lib/python3.8/site-packages/distributed/utils.py", line 309, in sync
    return sync(
  File "/home/hadoop/miniconda/envs/doctype/lib/python3.8/site-packages/distributed/utils.py", line 363, in sync
    raise exc.with_traceback(tb)
  File "/home/hadoop/miniconda/envs/doctype/lib/python3.8/site-packages/distributed/utils.py", line 348, in f
    result[0] = yield future
  File "/home/hadoop/miniconda/envs/doctype/lib/python3.8/site-packages/tornado/gen.py", line 762, in run
    value = future.result()
  File "/home/hadoop/miniconda/envs/doctype/lib/python3.8/site-packages/distributed/client.py", line 2655, in _run
    raise exc
  File "/home/hadoop/miniconda/envs/doctype-pred/lib/python3.7/site-packages/distributed/scheduler.py", line 6065, in send_message
    comm, close=True, serializers=serializers, **msg
  File "/home/hadoop/miniconda/envs/doctype-pred/lib/python3.7/site-packages/distributed/core.py", line 700, in send_recv
    raise Exception(response["exception_text"])
Exception: TypeError('code() takes at most 15 arguments (16 given)')

@gjoseph92
Copy link
Collaborator

Are you sure you ran it from the right Python environment? Same one your Jupyter notebook is running in that's connected to the cluster?

@rileyhun
Copy link
Author

rileyhun commented Mar 3, 2022

Oh my bad - I do think your script is incompatible with my python environment though. My Jupyter notebook that's connected to the cluster is using Python 3.7.10. There's syntax below that won't work, namely the assignment expression that was only introduced in Python 3.8. Will need to refactor this.

lambda dask_worker: (
            f"Batched stream buffer length: {len(dask_worker.batched_stream.buffer)}\n"
            f"please_stop = {dask_worker.batched_stream.please_stop}\n"
            f"waker.is_set() = {dask_worker.batched_stream.waker.is_set()}\n"
            f"time remaining = {nd - time() if (nd := dask_worker.batched_stream.next_deadline) else None}\n"
        )
    )

@gjoseph92
Copy link
Collaborator

Ah, that's because the latest versions of dask and distributed only support Python 3.8 now. Change it to

f"time remaining = {dask_worker.batched_stream.next_deadline - time() if dask_worker.batched_stream.next_deadline else None}\n"

@rileyhun
Copy link
Author

rileyhun commented Mar 3, 2022

Thanks @gjoseph92.

command:

miniconda/envs/doctype-pred/bin/python check-deadlock.py 172.20.45.119:8786 > output.log

Here is the return of that script:

output.log

@gjoseph92 gjoseph92 added bug Something is broken needs info Needs further information from the user labels Mar 3, 2022
@rileyhun
Copy link
Author

rileyhun commented Mar 4, 2022

Just wanted to reaffirm that after switching to on-demand nodes, I have not encountered the bug again. Ideally though, would be nice if it could work on spot units because we are essentially wasting a lot of money.

@gjoseph92
Copy link
Collaborator

Agreed that this should work with spot instances @rileyhun. Thanks again for reporting, and sorry about the bug. There are a few issues we’re aware of that could be causing this. Unfortunately, finding a root cause of these deadlocks is a very slow process right now, and we already have a few others to work on (and it's quite likely that yours is caused by one of the same underlying issues).

Once #5736 gets in, issues like this will be much easier to debug, so if you want to try it again in the future, we'll follow up when we could use more information or have a potential fix.

@rileyhun
Copy link
Author

rileyhun commented Mar 25, 2022

Hi @gjoseph92 - Wanted to report that this issue has started occurring again even after using on-demand instances. It's infrequent but does happen and the delay is a bit unwelcome because if it stalls while leaving a dask job overnight, it potentially costing my employer a lot of $.

Screen Shot 2022-03-25 at 1 43 25 PM

output.log

@gjoseph92
Copy link
Collaborator

@rileyhun can you confirm then that no workers left the cluster while it was running? Also, is work stealing disabled on this cluster like it was in your original post?

I recommend making a cluster dump for further debugging. However, I'm looking at some other deadlocks right now that will take my attention for a bit, so I can't give you a timeline on when I would be able to look into it.

To mitigate the overnight cost problem, you could consider writing a Scheduler plugin (or just run a script on the client) to try to detect the deadlocks and shut down, or restart all workers, when they happen. You'd want to do something similar to the script I linked: every few minutes, if there are processing tasks on the Scheduler, confirm that workers are all executing something. If all the worker callstacks are empty, but there are still processing tasks, then something's wrong. Ideally, you'd also call Scheduler.dump_cluster_state_to_url before shutting down for debugging purposes (#5983 for reference).

@rileyhun
Copy link
Author

rileyhun commented Mar 25, 2022

Unfortunately, I re-ran the batch after restarting the cluster, so can't confirm that anymore, but I somewhat recall that when I looked at the number of workers from the dashboard, there were 400+ which is consistent with the number of workers I set. I can also confirm that I disabled work stealing. I re-enabled it and (as indicated above) I re-ran the batch, and it was successful.

Thanks for the suggestion - that's a great idea. I will look into that.

@mrocklin
Copy link
Member

Looking at this, it would be highly valuable to try to reduce this down to a reproducible example. I suspect that if that is possible then it will be very easy to resolve whatever issue exists. No worries if you don't have time for that though @rileyhun

@n3rV3
Copy link

n3rV3 commented Jan 3, 2023

We are running on 2022.12.1, with work_stealing disabled. This same issue occurs for our smaller clusters too(with 60 workers). These workers are running on spot instances, but in our case the instances/workers weren't replaced during task execution, yet they get stuck infrequently.

I verified with dask client as mentioned in one of the above comments.
Here is what i run:

In [7]: client.processing()
Out[7]:
{
 ...
 'tcp://10.10.1.205:37161': (),
 'tcp://10.10.1.205:41993': ("('repartition-300-7159aacfd17ad37fc0fb840597119420', 129)",
 "('repartition-300-7159aacfd17ad37fc0fb840597119420', 190)"),
 'tcp://10.10.1.205:42537': (),
 ...
}

In [15]: client.call_stack()
Out[15]: {}

The output of 2 don't match, scheduler thinks a worker is executing tasks whereas the call_stack is empty.

As a quick fix, I've written a small script to restart workers which don't show up in call_stack.

Is there any other configuration that can help me avoid this inconsistent state between scheduler and worker altogether?

@fjetter
Copy link
Member

fjetter commented Jan 3, 2023

@n3rV3 a lot of things changed since this issue was first opened and I doubt you are running into the exact same issue. In fact, I hope that the original report was fixed by now since the effort around #5736 concluded. The most critical stability fixes were merged and released already in 2022.6.0

If you are still seeing your computation to be stuck, this is very troubling. To debug this we'll need a bit more information. Can I ask you to open a new ticket to track your problem?

It would be perfect if you could reproduce the issue with some minimal code. If that's not possible, please provide a dashboard screenshot as a starting point and describe the computation with some pseudo code. The diff between processing and call_stack is unfortunately not sufficient.

Is there any other configuration that can help me avoid this inconsistent state between scheduler and worker altogether?

There is only one configuration option that can sometimes help diagnose inconsistent state which is called validate. This flag basically enables internal state validations at various steps and raises assertion errors if some invariants are no longer true. You could try enabling this once to see if you can hit a problem with it. However, with validation on, the entire system is prohibitively slow and I would not recommend using this for a large computation (we have this enabled in our test suite but never in actual real world runs).

@n3rV3
Copy link

n3rV3 commented Jan 3, 2023

@fjetter give me a few days, I don't have a sample dag, which i'll be able to share.
I will try to write a sample dag and test it, will share it if I am able to reproduce this issue in it.

Thanks

@jakirkham
Copy link
Member

Should this issue be closed if it is fixed?

@fjetter
Copy link
Member

fjetter commented Jan 4, 2023

Should this issue be closed if it is fixed?

I'm inclined to close since I assume it should be fixed but don't have any confirmation.

@rileyhun did you encounter this issue again with a reasonably recent version? (>=2022.6.0)

@bstadlbauer
Copy link
Contributor

@fjetter We've also just experienced issues with tasks that got stuck (distributed == 2022.12.1).

The error we're seeing is that out of ~170_000 tasks (which are submitted in batches of 50_000), one task in the second to last batch gets stuck. I ran the above script (with a slight adaption to work with the newest dask version) and it finds an issue where the worker that should process the task does not have a call stack (similar to what @n3rV3 is seeing). Is there any way to further debug this?
In case it helps, we also restart our workers (using lifetime) every 30+-10 minutes (as that has previously helped with stuck tasks/deadlocks), but this has not helped in this case.
The dag is super simple, all tasks (delayed functions; runtime ~10 to 60 seconds) are embarrassingly parallel and have no output. Our example only breaks for large DAGs after a few hours (on quite the large cluster), so it's a bit hard to reproduce.

We know that all tasks would pass when running them in smaller batches, each batch on a dedicated cluster.

@fjetter
Copy link
Member

fjetter commented Jan 12, 2023

cc @crusaderky

There is a way to produce a snapshot of the running cluster, see Client.dump_cluster_state which could help us diagnose the issue. @bstadlbauer @n3rV3 if either one of you runs into the problem again, could you try extracting this? This essentially tries to pickle the state of the entire cluster, i.e. it will contain potentially sensitive information like IP addresses, serialized code, dask config, etc. so only share if you are comfortable with this. Once you see the cluster being stuck, you can execute this command and it will create a local file with the serialized state.
Digging through this is not trivial and I cannot promise that we can do this quickly.

If anybody is able to create a minimal reproducer we will have a much easier time fixing this.

@bstadlbauer
Copy link
Contributor

Thanks @fjetter! I will try another run today to see if I can reproduce and dump the state 👍

@Victor-Le-Coz
Copy link

Hi,

I encounter a similar deadlock issue. I use dask distributed on an HPC cluster. I have 500 single threaded workers. A run millions of tasks by batches of 90 000 tasks. As some point, after a couple of batches (4 to 6), one the last tasks in a given batch blocks the code, and the cluster never finishes his work.

Here are the versions I am using for dask and python:
dask 2022.11.0 pyhd8ed1ab_0 https://conda.anaconda.org/conda-forge
dask-core 2022.11.0 pyhd8ed1ab_0 https://conda.anaconda.org/conda-forge
dask-jobqueue 0.8.1 pyhd8ed1ab_0 https://conda.anaconda.org/conda-forge
python 3.8.13 h582c2e5_0_cpython (https://conda.anaconda.org/conda-forge)

I have tried to dump the cluster state with client.dump_cluster_state(filename), yet the code is running for 1 hour and I still cannot access the dump file.

I cannot copy past the dashboard screenshots I have prepared, github says it can process the file.

Kind regards,

@fjetter
Copy link
Member

fjetter commented Feb 28, 2023

@Victor-Le-Coz sorry to hear that. Without more information there is only little we can do.

The one thing I can recommend is to upgrade to a more recent dask version. 2022.11.0 is already a bit dated and there is at least one known issue (#7348) that can lead to a deadlock which was fixed in #7348 and released in 2022.12.0
From what you are describing you should only encounter this issue if the batches you are submitting have a key collision (dask typically hashes function + input so a collision should be very unlikely unless you are setting key names yourself). Possibly there are also other conditions to trigger this so I would recommend upgrading either way.

@Victor-Le-Coz
Copy link

Ok thank you a lot, I will update the Dask version and keep you informed!
Best,

@Victor-Le-Coz
Copy link

Hi @fjetter,

Is the version 2022.12.0 of dask available on conda forge ? I could not find it unfortunately

The issue persist, and I observed that it appends generally when some worker died unexpectedly, here is an example of error log:

_The above exception was the direct cause of the following exception:

Traceback (most recent call last):
File "/mnt/live/user/vlecoz/conda_env/latest/lib/python3.8/site-packages/distributed/worker.py", line 1215, in heartbeat
response = await retry_operation(
File "/mnt/live/user/vlecoz/conda_env/latest/lib/python3.8/site-packages/distributed/utils_comm.py", line 386, in retry_operation
return await retry(
File "/mnt/live/user/vlecoz/conda_env/latest/lib/python3.8/site-packages/distributed/utils_comm.py", line 371, in retry
return await coro()
File "/mnt/live/user/vlecoz/conda_env/latest/lib/python3.8/site-packages/distributed/core.py", line 1163, in send_recv_from_rpc
return await send_recv(comm=comm, op=key, **kwargs)
File "/mnt/live/user/vlecoz/conda_env/latest/lib/python3.8/site-packages/distributed/core.py", line 928, in send_recv
response = await comm.read(deserializers=deserializers)
File "/mnt/live/user/vlecoz/conda_env/latest/lib/python3.8/site-packages/distributed/comm/tcp.py", line 241, in read
convert_stream_closed_error(self, e)
File "/mnt/live/user/vlecoz/conda_env/latest/lib/python3.8/site-packages/distributed/comm/tcp.py", line 142, in convert_stream_closed_error
raise CommClosedError(f"in {obj}: {exc.class.name}: {exc}") from exc
distributed.comm.core.CommClosedError: in <TCP (closed) ConnectionPool.heartbeat_worker local=tcp://10.70.213.28:52528 remote=tcp://10.70.211.9:46817>: ConnectionResetError: [Errno 104] Connection reset by peer
2023-03-07 18:25:34,913 - distributed.nanny - INFO - Closing Nanny gracefully at 'tcp://10.70.213.28:39365'. Reason: worker-handle-scheduler-connection-broken
2023-03-07 18:25:34,945 - distributed.nanny - INFO - Worker closed
2023-03-07 18:25:36,963 - distributed.nanny - ERROR - Worker process died unexpectedly
2023-03-07 18:25:39,516 - distributed.nanny - INFO - Closing Nanny at 'tcp://10.70.213.28:39365'. Reason: nanny-close-gracefully
2023-03-07 18:25:39,518 - distributed.dask_worker - INFO - End worker_

@fjetter
Copy link
Member

fjetter commented Mar 10, 2023

Is the version 2022.12.0 of dask available on conda forge ? I could not find it unfortunately

yes, it should be one conda-forge. It's not important to pick this particular version. anything more recent should do. In fact, in such a situation, please also try the latest version first.


FWIW We are currently working on #6790 which fixes a problem that lets computations stall when workers are leaving/dying and the user configured too large timeouts.
If you increased timeouts to very large values yourself or set retries this might help you

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something is broken needs info Needs further information from the user
Projects
None yet
Development

No branches or pull requests

8 participants